Getting familiar with Readable and Writable streams in Node.js

posted 8 min read

Table of contents



With Node.js streams, we can process the data as soon as it arrives from the resource.

There are four types of streams in Node.js. In this post, we'll look at only two of them: Readable and Writable.

Streams have two modes of operation:

  • Binary mode, which lets us stream data in chunks
  • Object mode, which lets us stream data as a sequence of objects

Readable streams

Readable streams are an abstraction for a source from which data is consumed.
Node.js docs

A Readable stream operates in either the flowing mode or the paused mode.

One example of a Readable stream is process.stdin. Let's see how it works in two modes.

Paused mode

Paused mode is the default mode for reading from a Readable stream.

We can use the read() method to read chunks of data "on demand."

For example:

process.stdin
  .on('readable', () => {
    let chunk;
    while ((chunk = process.stdin.read()) !== null) {
      console.log(`Chunk read: "${chunk.toString()}" in ${chunk.length} bytes`);
    }
  })
  .on('end', () => {
    console.log('End of stream');
  });

The read() method here reads the data from the internal buffer, and returns a Buffer object by default if the encoding is not specified. If there is no data available, it returns null.

In the code above, we read from process.stdin and log a message as long as data is available.
Note that we can fire an end event with an EOF signal by pressing Ctrl + D (or Ctrl + Z on Windows).

Flowing mode

If we listen for the data event, the Readable stream will switch to flowing mode.

For example:

process.stdin
  .on('data', (chunk) => {
    console.log(`Chunk read: ${chunk.toString()} in ${chunk.length} bytes`);
  })
  .on('end', () => {
    console.log('End of stream');
  });

To stop the flowing mode, we can call the pause() method to go back to paused mode. To switch to flowing mode again, we can call resume():

process.stdin
  .on('data', (chunk) => {
    console.log(`Chunk read: ${chunk.toString()} in ${chunk.length} bytes`);
    process.stdin.pause();
    console.log('Switched to paused mode');
    setTimeout(() => {
      console.log('Data will start flowing again.');
      process.stdin.resume();
    }, 1000);
  })
  .on('end', () => {
    console.log('End of stream');
  });

The Node.js documentation advises again using multiples methods to read data from a single stream as combining different modes can lead to unexpected behavior.

Creating a simple Readable stream

We can create a Readable stream like this:

import { Readable } from 'node:stream';

const strings = ['one', 'two', 'three'];

const readableStream = new Readable({
  read() {
    for (const s of strings) {
      this.push(s, 'utf8');
    }
    this.push(null);
  }
});

readableStream
  .on('data', (chunk) => {
    console.log(`Chunk: ${chunk}`);
  })
  .on('end', () => console.log('End of stream'));

This is also called the simplified constructor approach.

Even though it's a very simple and impractical example, but we can bear with it.

Readable constructor takes an options argument, which we here only include the read method. It simply pushes three strings from the strings array to the internal buffer. We also push null as it will signal EOF, and mark the end of stream.

So, when we run it, the output will be:

Chunk: one
Chunk: two
Chunk: three
End of stream

Since we're using an array, we can even more easily create a Readable stream with a very handy method called from() which takes an iterable as argument:

import { Readable } from 'node:stream';

const strings = ['one', 'two', 'three'];

const readableStream = Readable.from(strings);

readableStream
  .on('data', (chunk) => {
    console.log(`Chunk: ${chunk}`);
  })
  .on('end', () => console.log('End of stream'));

We can also iterate over it using for...await since a readable stream is also an async iterator:

async function readChunks(readableChunks) {
  for await (const chunk of readableChunks) {
    console.log(`Chunk: ${chunk.toString()}`);
  }
  console.log('End of stream')
}

readChunks(readableStream);

| Note |
| :-- |
| Remember that we mentioned that there are two different modes for a stream: binary and object mode? Readable.from() sets the objectMode option to true by default since it takes an iterable object. |

Writable streams

Writable streams are an abstraction for a destination to which data is written.
Node.js docs

There are two common methods to work with a Writable stream: write() and end():

writableStream.write('some data');
writableStream.write('some more data');
writableStream.end('done writing data');

Creating a simple Writable stream

We can simply create a Writable stream by adding write() method to a Writable constructor:

import { Writable } from 'node:stream';

const writableStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

Note that we don't use the encoding option and use toString() function in this example for simplicity. By default, it's 'utf8'.

And, now we can write to stdout like this (example adapted from Node.js docs):

function writeAThousandTimes(writer, data, encoding, callback) {
  let i = 1000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // Last time!
        writer.write(data, encoding, callback);
      } else {
        // See if we should continue, or wait.
        // Don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // Had to stop early!
      // Write some more once it drains.
      writer.once('drain', write);
    }
  }
}

writeAThousandTimes(writableStream, 'hey', 'utf8', () => {
  console.log('Done!');
});

We write hey to stdout a thousand times, that's pretty much it.

But, the drain event looks a bit unfamiliar, so let's look at two more concepts.

highWaterMark

The highWaterMark option in a stream specifies the total number of bytes, (or the total number of objects if we're using the object mode). It's not a strict limit, so, when the highWaterMark is reached, the stream won't be blocked, but it dictates the amount of data that a stream buffers before it stops asking for more data.

Backpressure

When the data stream is faster than we can consume it, it might result in unwanted memory usage.

In that case, backpressure is the important mechanism that is triggered.
For example, with a Writable stream, write() method returns false if the internal buffer exceeds the highWaterMark limit.
Once the internal buffer is emptied, the drain event is emitted, meaning that we can safely continue writing.

Backpressure is also triggered with a Readable stream when the push() method (inside read()) returns false.


Conclusion

Streams might be a bit difficult to grasp, but they are very fundamental to the Node.js ecosystem. There are a lot more than the two types of streams we have just looked at, but being familiar with these two can help us understand even more advanced concepts.
There are other common methods that you can take a look at, like createReadStream and createWriteStream, be sure to check out the official docs.

If you read this far, tweet to the author to show them you care. Tweet a Thanks

More Posts

Centralized Notifications: Unifying Email, SMS, and FCM Messaging

caleb erioluwa - Jun 16

Enhancing User Experience in Next.js: Adding a Seamless Loading Indicator

George Isiguzo - Jun 4

How to display success message after form submit in javascript?

Curtis L - Feb 19

How to redirect to another page in javascript on button click?

prince yadav - Feb 17

How to check if string contains only decimal numbers in JavaScript

shaker - Jan 27
chevron_left