Table of contents
With Node.js streams, we can process the data as soon as it arrives from the resource.
There are four types of streams in Node.js. In this post, we'll look at only two of them: Readable
and Writable
.
Streams have two modes of operation:
- Binary mode, which lets us stream data in chunks
- Object mode, which lets us stream data as a sequence of objects
Readable streams
Readable streams are an abstraction for a source from which data is consumed.
— Node.js docs
A Readable stream operates in either the flowing mode or the paused mode.
One example of a Readable stream is process.stdin
. Let's see how it works in two modes.
Paused mode
Paused mode is the default mode for reading from a Readable stream.
We can use the read()
method to read chunks of data "on demand."
For example:
process.stdin
.on('readable', () => {
let chunk;
while ((chunk = process.stdin.read()) !== null) {
console.log(`Chunk read: "${chunk.toString()}" in ${chunk.length} bytes`);
}
})
.on('end', () => {
console.log('End of stream');
});
The read()
method here reads the data from the internal buffer, and returns a Buffer object by default if the encoding is not specified. If there is no data available, it returns null
.
In the code above, we read from process.stdin
and log a message as long as data is available.
Note that we can fire an end
event with an EOF signal by pressing Ctrl + D (or Ctrl + Z on Windows).
Flowing mode
If we listen for the data
event, the Readable stream will switch to flowing mode.
For example:
process.stdin
.on('data', (chunk) => {
console.log(`Chunk read: ${chunk.toString()} in ${chunk.length} bytes`);
})
.on('end', () => {
console.log('End of stream');
});
To stop the flowing mode, we can call the pause()
method to go back to paused mode. To switch to flowing mode again, we can call resume()
:
process.stdin
.on('data', (chunk) => {
console.log(`Chunk read: ${chunk.toString()} in ${chunk.length} bytes`);
process.stdin.pause();
console.log('Switched to paused mode');
setTimeout(() => {
console.log('Data will start flowing again.');
process.stdin.resume();
}, 1000);
})
.on('end', () => {
console.log('End of stream');
});
The Node.js documentation advises again using multiples methods to read data from a single stream as combining different modes can lead to unexpected behavior.
Creating a simple Readable stream
We can create a Readable stream like this:
import { Readable } from 'node:stream';
const strings = ['one', 'two', 'three'];
const readableStream = new Readable({
read() {
for (const s of strings) {
this.push(s, 'utf8');
}
this.push(null);
}
});
readableStream
.on('data', (chunk) => {
console.log(`Chunk: ${chunk}`);
})
.on('end', () => console.log('End of stream'));
This is also called the simplified constructor approach.
Even though it's a very simple and impractical example, but we can bear with it.
Readable constructor takes an options
argument, which we here only include the read
method. It simply pushes three strings from the strings
array to the internal buffer. We also push null
as it will signal EOF, and mark the end of stream.
So, when we run it, the output will be:
Chunk: one
Chunk: two
Chunk: three
End of stream
Since we're using an array, we can even more easily create a Readable stream with a very handy method called from()
which takes an iterable as argument:
import { Readable } from 'node:stream';
const strings = ['one', 'two', 'three'];
const readableStream = Readable.from(strings);
readableStream
.on('data', (chunk) => {
console.log(`Chunk: ${chunk}`);
})
.on('end', () => console.log('End of stream'));
We can also iterate over it using for...await
since a readable stream is also an async iterator:
async function readChunks(readableChunks) {
for await (const chunk of readableChunks) {
console.log(`Chunk: ${chunk.toString()}`);
}
console.log('End of stream')
}
readChunks(readableStream);
| Note |
| :-- |
| Remember that we mentioned that there are two different modes for a stream: binary and object mode? Readable.from()
sets the objectMode
option to true
by default since it takes an iterable object. |
Writable streams
Writable streams are an abstraction for a destination to which data is written.
— Node.js docs
There are two common methods to work with a Writable stream: write()
and end()
:
writableStream.write('some data');
writableStream.write('some more data');
writableStream.end('done writing data');
Creating a simple Writable stream
We can simply create a Writable stream by adding write()
method to a Writable
constructor:
import { Writable } from 'node:stream';
const writableStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
Note that we don't use the encoding
option and use toString()
function in this example for simplicity. By default, it's 'utf8'
.
And, now we can write to stdout
like this (example adapted from Node.js docs):
function writeAThousandTimes(writer, data, encoding, callback) {
let i = 1000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
writer.write(data, encoding, callback);
} else {
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
writer.once('drain', write);
}
}
}
writeAThousandTimes(writableStream, 'hey', 'utf8', () => {
console.log('Done!');
});
We write hey
to stdout
a thousand times, that's pretty much it.
But, the drain
event looks a bit unfamiliar, so let's look at two more concepts.
highWaterMark
The highWaterMark
option in a stream specifies the total number of bytes, (or the total number of objects if we're using the object mode). It's not a strict limit, so, when the highWaterMark is reached, the stream won't be blocked, but it dictates the amount of data that a stream buffers before it stops asking for more data.
Backpressure
When the data stream is faster than we can consume it, it might result in unwanted memory usage.
In that case, backpressure is the important mechanism that is triggered.
For example, with a Writable stream, write()
method returns false
if the internal buffer exceeds the highWaterMark
limit.
Once the internal buffer is emptied, the drain
event is emitted, meaning that we can safely continue writing.
Backpressure is also triggered with a Readable stream when the push()
method (inside read()
) returns false
.
Conclusion
Streams might be a bit difficult to grasp, but they are very fundamental to the Node.js ecosystem. There are a lot more than the two types of streams we have just looked at, but being familiar with these two can help us understand even more advanced concepts.
There are other common methods that you can take a look at, like createReadStream
and createWriteStream
, be sure to check out the official docs.