1
title: A New Streaming API for Node v0.10
2
author: Isaac Z. Schlueter
3
date: Fri Dec 21 00:45:13 UTC 2012
9
* Node streams are great, except for all the ways in which they're
11
* A new Stream implementation is coming in 0.10, that has gotten the
13
* Readable streams have a `read()` method that returns a buffer or
14
null. (More documentation included below.)
15
* `'data'` events, `pause()`, and `resume()` will still work as before
16
(except that they'll actully work how you'd expect).
17
* Old programs will **almost always** work without modification, but
18
streams start out in a paused state, and need to be read from to be
20
* **WARNING**: If you never add a `'data'` event handler, or call
21
`resume()`, then it'll sit in a paused state forever and never
26
Throughout the life of Node, we've been gradually iterating on the
27
ideal event-based API for handling data. Over time, this developed
28
into the "Stream" interface that you see throughout Node's core
29
modules and many of the modules in npm.
31
Consistent interfaces increase the portability and reliability of our
32
programs and libraries. Overall, the move from domain-specific events
33
and methods towards a unified stream interface was a huge win.
34
However, there are still several problems with Node's streams as of
37
1. The `pause()` method doesn't pause. It is advisory-only. In
38
Node's implementation, this makes things much simpler, but it's
39
confusing to users, and doesn't do what it looks like it does.
40
2. `'data'` events come right away (whether you're ready or not).
41
This makes it unreasonably difficult to do common tasks like load a
42
user's session before deciding how to handle their request.
43
3. There is no way to consume a specific number of bytes, and then
44
leave the rest for some other part of the program to deal with.
45
4. It's unreasonably difficult to implement streams and get all the
46
intricacies of pause, resume, write-buffering, and data events
47
correct. The lack of shared classes mean that we all have to solve
48
the same problems repeatedly, making similar mistakes and similar
51
Common simple tasks should be easy, or we aren't doing our job.
52
People often say that Node is better than most other platforms at this
53
stuff, but in my opinion, that is less of a compliment and more of an
54
indictment of the current state of software. Being better than the
55
next guy isn't enough; we have to be the best imaginable. While they
56
were a big step in the right direction, the Streams in Node up until
57
now leave a lot wanting.
59
So, just fix it, right?
61
Well, we are sitting on the results of several years of explosive
62
growth in the Node community, so any changes have to be made very
63
carefully. If we break all the Node programs in 0.10, then no one
64
will ever want to upgrade to 0.10, and it's all pointless. We had
65
this conversation around 0.4, then again around 0.6, then again around
66
0.8. Every time, the conclusion has been "Too much work, too hard to
67
make backwards-compatible", and we always had more pressing problems
70
In 0.10, we cannot put it off any longer. We've bitten the bullet and
71
are making a significant change to the Stream implementation. You may
72
have seen conversations on twitter or IRC or the mailing list about
73
"streams2". I also gave [a talk in
74
November](https://dl.dropbox.com/u/3685/presentations/streams2/streams2-ko.pdf)
75
about this subject. A lot of node module authors have been involved
76
with the development of streams2 (and of course the node core team).
80
The feature is described pretty thoroughly in the documentation, so
81
I'm including it below. Please read it, especially the section on
82
"compatibility". There's a caveat there that is unfortunately
83
unavoidable, but hopefully enough of an edge case that it's easily
86
The first preview release with this change will be 0.9.4. I highly
87
recommend trying this release and providing feedback before it lands
90
As of writing this post, there are some known performance regressions,
91
especially in the http module. We are fanatical about maintaining
92
performance in Node.js, so of course this will have to be fixed before
93
the v0.10 stable release. (Watch for a future blog post on the tools
94
and techniques that have been useful in tracking down these issues.)
96
There may be minor changes as necessary to fix bugs and improve
97
performance, but the API at this point should be considered feature
98
complete. It correctly does all the things we need it to do, it just
99
doesn't do them quite well enough yet. As always, be wary of running
100
unstable releases in production, of course, but I encourage you to try
101
it out and see what you think. Especially, if you have tests that you
102
can run on your modules and libraries, that would be extremely useful
109
Stability: 2 - Unstable
111
A stream is an abstract interface implemented by various objects in
112
Node. For example a request to an HTTP server is a stream, as is
113
stdout. Streams are readable, writable, or both. All streams are
114
instances of [EventEmitter][]
116
You can load the Stream base classes by doing `require('stream')`.
117
There are base classes provided for Readable streams, Writable
118
streams, Duplex streams, and Transform streams.
122
In earlier versions of Node, the Readable stream interface was
123
simpler, but also less powerful and less useful.
125
* Rather than waiting for you to call the `read()` method, `'data'`
126
events would start emitting immediately. If you needed to do some
127
I/O to decide how to handle data, then you had to store the chunks
128
in some kind of buffer so that they would not be lost.
129
* The `pause()` method was advisory, rather than guaranteed. This
130
meant that you still had to be prepared to receive `'data'` events
131
even when the stream was in a paused state.
133
In Node v0.10, the Readable class described below was added. For
134
backwards compatibility with older Node programs, Readable streams
135
switch into "old mode" when a `'data'` event handler is added, or when
136
the `pause()` or `resume()` methods are called. The effect is that,
137
even if you are not using the new `read()` method and `'readable'`
138
event, you no longer have to worry about losing `'data'` chunks.
140
Most programs will continue to function normally. However, this
141
introduces an edge case in the following conditions:
143
* No `'data'` event handler is added.
144
* The `pause()` and `resume()` methods are never called.
146
For example, consider the following code:
150
net.createServer(function(socket) {
152
// we add an 'end' method, but never consume the data
153
socket.on('end', function() {
154
// It will never get here.
155
socket.end('I got your message (but didnt read it)\n');
161
In versions of node prior to v0.10, the incoming message data would be
162
simply discarded. However, in Node v0.10 and beyond, the socket will
163
remain paused forever.
165
The workaround in this situation is to call the `resume()` method to
166
trigger "old mode" behavior:
170
net.createServer(function(socket) {
172
socket.on('end', function() {
173
socket.end('I got your message (but didnt read it)\n');
176
// start the flow of data, discarding it.
182
In addition to new Readable streams switching into old-mode, pre-v0.10
183
style streams can be wrapped in a Readable class using the `wrap()`
186
## Class: stream.Readable
190
A `Readable Stream` has the following methods, members, and events.
192
Note that `stream.Readable` is an abstract class designed to be
193
extended with an underlying implementation of the `_read(size)`
196
### new stream.Readable([options])
199
* `highWaterMark` {Number} The maximum number of bytes to store in
200
the internal buffer before ceasing to read from the underlying
201
resource. Default=16kb
202
* `encoding` {String} If specified, then buffers will be decoded to
203
strings using the specified encoding. Default=null
204
* `objectMode` {Boolean} Whether this stream should behave
205
as a stream of objects. Meaning that stream.read(n) returns
206
a single value instead of a Buffer of size n
208
In classes that extend the Readable class, make sure to call the
209
constructor so that the buffering settings can be properly
212
### readable.\_read(size)
214
* `size` {Number} Number of bytes to read asynchronously
216
Note: **This function should NOT be called directly.** It should be
217
implemented by child classes, and called by the internal Readable
220
All Readable stream implementations must provide a `_read` method
221
to fetch data from the underlying resource.
223
This method is prefixed with an underscore because it is internal to
224
the class that defines it, and should not be called directly by user
225
programs. However, you **are** expected to override this method in
226
your own extension classes.
228
When data is available, put it into the read queue by calling
229
`readable.push(chunk)`. If `push` returns false, then you should stop
230
reading. When `_read` is called again, you should start pushing more
233
The `size` argument is advisory. Implementations where a "read" is a
234
single call that returns data can use this to know how much data to
235
fetch. Implementations where that is not relevant, such as TCP or
236
TLS, may ignore this argument, and simply provide data whenever it
237
becomes available. There is no need, for example to "wait" until
238
`size` bytes are available before calling `stream.push(chunk)`.
240
### readable.push(chunk)
242
* `chunk` {Buffer | null | String} Chunk of data to push into the read queue
243
* return {Boolean} Whether or not more pushes should be performed
245
Note: **This function should be called by Readable implementors, NOT
246
by consumers of Readable subclasses.** The `_read()` function will not
247
be called again until at least one `push(chunk)` call is made. If no
248
data is available, then you MAY call `push('')` (an empty string) to
249
allow a future `_read` call, without adding any data to the queue.
251
The `Readable` class works by putting data into a read queue to be
252
pulled out later by calling the `read()` method when the `'readable'`
255
The `push()` method will explicitly insert some data into the read
256
queue. If it is called with `null` then it will signal the end of the
259
In some cases, you may be wrapping a lower-level source which has some
260
sort of pause/resume mechanism, and a data callback. In those cases,
261
you could wrap the low-level source object by doing something like
265
// source is an object with readStop() and readStart() methods,
266
// and an `ondata` member that gets called when it has data, and
267
// an `onend` member that gets called when the data is over.
269
var stream = new Readable();
271
source.ondata = function(chunk) {
272
// if push() returns false, then we need to stop reading from source
273
if (!stream.push(chunk))
277
source.onend = function() {
281
// _read will be called when the stream wants to pull more data in
282
// the advisory size argument is ignored in this case.
283
stream._read = function(n) {
288
### readable.unshift(chunk)
290
* `chunk` {Buffer | null | String} Chunk of data to unshift onto the read queue
291
* return {Boolean} Whether or not more pushes should be performed
293
This is the corollary of `readable.push(chunk)`. Rather than putting
294
the data at the *end* of the read queue, it puts it at the *front* of
297
This is useful in certain use-cases where a stream is being consumed
298
by a parser, which needs to "un-consume" some data that it has
299
optimistically pulled out of the source.
302
// A parser for a simple data protocol.
303
// The "header" is a JSON object, followed by 2 \n characters, and
304
// then a message body.
306
// Note: This can be done more simply as a Transform stream. See below.
308
function SimpleProtocol(source, options) {
309
if (!(this instanceof SimpleProtocol))
310
return new SimpleProtocol(options);
312
Readable.call(this, options);
313
this._inBody = false;
314
this._sawFirstCr = false;
316
// source is a readable stream, such as a socket or file
317
this._source = source;
320
source.on('end', function() {
324
// give it a kick whenever the source is readable
325
// read(0) will not consume any bytes
326
source.on('readable', function() {
330
this._rawHeader = [];
334
SimpleProtocol.prototype = Object.create(
335
Readable.prototype, { constructor: { value: SimpleProtocol }});
337
SimpleProtocol.prototype._read = function(n) {
339
var chunk = this._source.read();
341
// if the source doesn't have data, we don't have data yet.
343
return this.push('');
345
// check if the chunk has a \n\n
347
for (var i = 0; i < chunk.length; i++) {
348
if (chunk[i] === 10) { // '\n'
349
if (this._sawFirstCr) {
353
this._sawFirstCr = true;
356
this._sawFirstCr = false;
361
// still waiting for the \n\n
362
// stash the chunk, and try again.
363
this._rawHeader.push(chunk);
367
var h = chunk.slice(0, split);
368
this._rawHeader.push(h);
369
var header = Buffer.concat(this._rawHeader).toString();
371
this.header = JSON.parse(header);
373
this.emit('error', new Error('invalid simple protocol data'));
376
// now, because we got some extra data, unshift the rest
377
// back into the read queue so that our consumer will see it.
378
var b = chunk.slice(split);
381
// and let them know that we are done parsing the header.
382
this.emit('header', this.header);
385
// from there on, just provide the data to our consumer.
386
// careful not to push(null), since that would indicate EOF.
387
var chunk = this._source.read();
388
if (chunk) this.push(chunk);
393
var parser = new SimpleProtocol(source);
394
// Now parser is a readable stream that will emit 'header'
395
// with the parsed header data.
398
### readable.wrap(stream)
400
* `stream` {Stream} An "old style" readable stream
402
If you are using an older Node library that emits `'data'` events and
403
has a `pause()` method that is advisory only, then you can use the
404
`wrap()` method to create a Readable stream that uses the old stream
410
var OldReader = require('./old-api-module.js').OldReader;
411
var oreader = new OldReader;
412
var Readable = require('stream').Readable;
413
var myReader = new Readable().wrap(oreader);
415
myReader.on('readable', function() {
416
myReader.read(); // etc.
420
### Event: 'readable'
422
When there is data ready to be consumed, this event will fire.
424
When this event emits, call the `read()` method to consume the data.
428
Emitted when the stream has received an EOF (FIN in TCP terminology).
429
Indicates that no more `'data'` events will happen. If the stream is
430
also writable, it may be possible to continue writing.
434
The `'data'` event emits either a `Buffer` (by default) or a string if
435
`setEncoding()` was used.
437
Note that adding a `'data'` event listener will switch the Readable
438
stream into "old mode", where data is emitted as soon as it is
439
available, rather than waiting for you to call `read()` to consume it.
443
Emitted if there was an error receiving data.
447
Emitted when the underlying resource (for example, the backing file
448
descriptor) has been closed. Not all streams will emit this.
450
### readable.setEncoding(encoding)
452
Makes the `'data'` event emit a string instead of a `Buffer`. `encoding`
453
can be `'utf8'`, `'utf16le'` (`'ucs2'`), `'ascii'`, or `'hex'`.
455
The encoding can also be set by specifying an `encoding` field to the
458
### readable.read([size])
460
* `size` {Number | null} Optional number of bytes to read.
461
* Return: {Buffer | String | null}
463
Note: **This function SHOULD be called by Readable stream users.**
465
Call this method to consume data once the `'readable'` event is
468
The `size` argument will set a minimum number of bytes that you are
469
interested in. If not set, then the entire content of the internal
472
If there is no data to consume, or if there are fewer bytes in the
473
internal buffer than the `size` argument, then `null` is returned, and
474
a future `'readable'` event will be emitted when more is available.
476
Calling `stream.read(0)` will always return `null`, and will trigger a
477
refresh of the internal buffer, but otherwise be a no-op.
479
### readable.pipe(destination, [options])
481
* `destination` {Writable Stream}
482
* `options` {Object} Optional
483
* `end` {Boolean} Default=true
485
Connects this readable stream to `destination` WriteStream. Incoming
486
data on this stream gets written to `destination`. Properly manages
487
back-pressure so that a slow destination will not be overwhelmed by a
488
fast readable stream.
490
This function returns the `destination` stream.
492
For example, emulating the Unix `cat` command:
494
process.stdin.pipe(process.stdout);
496
By default `end()` is called on the destination when the source stream
497
emits `end`, so that `destination` is no longer writable. Pass `{ end:
498
false }` as `options` to keep the destination stream open.
500
This keeps `writer` open so that "Goodbye" can be written at the
503
reader.pipe(writer, { end: false });
504
reader.on("end", function() {
505
writer.end("Goodbye\n");
508
Note that `process.stderr` and `process.stdout` are never closed until
509
the process exits, regardless of the specified options.
511
### readable.unpipe([destination])
513
* `destination` {Writable Stream} Optional
515
Undo a previously established `pipe()`. If no destination is
516
provided, then all previously established pipes are removed.
520
Switches the readable stream into "old mode", where data is emitted
521
using a `'data'` event rather than being buffered for consumption via
524
Ceases the flow of data. No `'data'` events are emitted while the
525
stream is in a paused state.
527
### readable.resume()
529
Switches the readable stream into "old mode", where data is emitted
530
using a `'data'` event rather than being buffered for consumption via
533
Resumes the incoming `'data'` events after a `pause()`.
536
## Class: stream.Writable
540
A `Writable` Stream has the following methods, members, and events.
542
Note that `stream.Writable` is an abstract class designed to be
543
extended with an underlying implementation of the
544
`_write(chunk, encoding, cb)` method. (See below.)
546
### new stream.Writable([options])
549
* `highWaterMark` {Number} Buffer level when `write()` starts
550
returning false. Default=16kb
551
* `decodeStrings` {Boolean} Whether or not to decode strings into
552
Buffers before passing them to `_write()`. Default=true
554
In classes that extend the Writable class, make sure to call the
555
constructor so that the buffering settings can be properly
558
### writable.\_write(chunk, encoding, callback)
560
* `chunk` {Buffer | String} The chunk to be written. Will always
561
be a buffer unless the `decodeStrings` option was set to `false`.
562
* `encoding` {String} If the chunk is a string, then this is the
563
encoding type. Ignore chunk is a buffer. Note that chunk will
564
**always** be a buffer unless the `decodeStrings` option is
565
explicitly set to `false`.
566
* `callback` {Function} Call this function (optionally with an error
567
argument) when you are done processing the supplied chunk.
569
All Writable stream implementations must provide a `_write` method to
570
send data to the underlying resource.
572
Note: **This function MUST NOT be called directly.** It should be
573
implemented by child classes, and called by the internal Writable
576
Call the callback using the standard `callback(error)` pattern to
577
signal that the write completed successfully or with an error.
579
If the `decodeStrings` flag is set in the constructor options, then
580
`chunk` may be a string rather than a Buffer, and `encoding` will
581
indicate the sort of string that it is. This is to support
582
implementations that have an optimized handling for certain string
583
data encodings. If you do not explicitly set the `decodeStrings`
584
option to `false`, then you can safely ignore the `encoding` argument,
585
and assume that `chunk` will always be a Buffer.
587
This method is prefixed with an underscore because it is internal to
588
the class that defines it, and should not be called directly by user
589
programs. However, you **are** expected to override this method in
590
your own extension classes.
593
### writable.write(chunk, [encoding], [callback])
595
* `chunk` {Buffer | String} Data to be written
596
* `encoding` {String} Optional. If `chunk` is a string, then encoding
598
* `callback` {Function} Optional. Called when this chunk is
599
successfully written.
602
Writes `chunk` to the stream. Returns `true` if the data has been
603
flushed to the underlying resource. Returns `false` to indicate that
604
the buffer is full, and the data will be sent out in the future. The
605
`'drain'` event will indicate when the buffer is empty again.
607
The specifics of when `write()` will return false, is determined by
608
the `highWaterMark` option provided to the constructor.
610
### writable.end([chunk], [encoding], [callback])
612
* `chunk` {Buffer | String} Optional final data to be written
613
* `encoding` {String} Optional. If `chunk` is a string, then encoding
615
* `callback` {Function} Optional. Called when the final chunk is
616
successfully written.
618
Call this method to signal the end of the data being written to the
623
Emitted when the stream's write queue empties and it's safe to write
624
without buffering again. Listen for it when `stream.write()` returns
629
Emitted when the underlying resource (for example, the backing file
630
descriptor) has been closed. Not all streams will emit this.
634
When `end()` is called and there are no more chunks to write, this
639
* `source` {Readable Stream}
641
Emitted when the stream is passed to a readable stream's pipe method.
645
* `source` {Readable Stream}
647
Emitted when a previously established `pipe()` is removed using the
648
source Readable stream's `unpipe()` method.
650
## Class: stream.Duplex
654
A "duplex" stream is one that is both Readable and Writable, such as a
655
TCP socket connection.
657
Note that `stream.Duplex` is an abstract class designed to be
658
extended with an underlying implementation of the `_read(size)`
659
and `_write(chunk, encoding, callback)` methods as you would with a Readable or
660
Writable stream class.
662
Since JavaScript doesn't have multiple prototypal inheritance, this
663
class prototypally inherits from Readable, and then parasitically from
664
Writable. It is thus up to the user to implement both the lowlevel
665
`_read(n)` method as well as the lowlevel `_write(chunk, encoding, cb)` method
666
on extension duplex classes.
668
### new stream.Duplex(options)
670
* `options` {Object} Passed to both Writable and Readable
671
constructors. Also has the following fields:
672
* `allowHalfOpen` {Boolean} Default=true. If set to `false`, then
673
the stream will automatically end the readable side when the
674
writable side ends and vice versa.
676
In classes that extend the Duplex class, make sure to call the
677
constructor so that the buffering settings can be properly
680
## Class: stream.Transform
682
A "transform" stream is a duplex stream where the output is causally
683
connected in some way to the input, such as a zlib stream or a crypto
686
There is no requirement that the output be the same size as the input,
687
the same number of chunks, or arrive at the same time. For example, a
688
Hash stream will only ever have a single chunk of output which is
689
provided when the input is ended. A zlib stream will either produce
690
much smaller or much larger than its input.
692
Rather than implement the `_read()` and `_write()` methods, Transform
693
classes must implement the `_transform()` method, and may optionally
694
also implement the `_flush()` method. (See below.)
696
### new stream.Transform([options])
698
* `options` {Object} Passed to both Writable and Readable
701
In classes that extend the Transform class, make sure to call the
702
constructor so that the buffering settings can be properly
705
### transform.\_transform(chunk, encoding, callback)
707
* `chunk` {Buffer | String} The chunk to be transformed. Will always
708
be a buffer unless the `decodeStrings` option was set to `false`.
709
* `encoding` {String} If the chunk is a string, then this is the
710
encoding type. (Ignore if `decodeStrings` chunk is a buffer.)
711
* `callback` {Function} Call this function (optionally with an error
712
argument) when you are done processing the supplied chunk.
714
Note: **This function MUST NOT be called directly.** It should be
715
implemented by child classes, and called by the internal Transform
718
All Transform stream implementations must provide a `_transform`
719
method to accept input and produce output.
721
`_transform` should do whatever has to be done in this specific
722
Transform class, to handle the bytes being written, and pass them off
723
to the readable portion of the interface. Do asynchronous I/O,
724
process things, and so on.
726
Call `transform.push(outputChunk)` 0 or more times to generate output
727
from this input chunk, depending on how much data you want to output
728
as a result of this chunk.
730
Call the callback function only when the current chunk is completely
731
consumed. Note that there may or may not be output as a result of any
732
particular input chunk.
734
This method is prefixed with an underscore because it is internal to
735
the class that defines it, and should not be called directly by user
736
programs. However, you **are** expected to override this method in
737
your own extension classes.
739
### transform.\_flush(callback)
741
* `callback` {Function} Call this function (optionally with an error
742
argument) when you are done flushing any remaining data.
744
Note: **This function MUST NOT be called directly.** It MAY be implemented
745
by child classes, and if so, will be called by the internal Transform
748
In some cases, your transform operation may need to emit a bit more
749
data at the end of the stream. For example, a `Zlib` compression
750
stream will store up some internal state so that it can optimally
751
compress the output. At the end, however, it needs to do the best it
752
can with what is left, so that the data will be complete.
754
In those cases, you can implement a `_flush` method, which will be
755
called at the very end, after all the written data is consumed, but
756
before emitting `end` to signal the end of the readable side. Just
757
like with `_transform`, call `transform.push(chunk)` zero or more
758
times, as appropriate, and call `callback` when the flush operation is
761
This method is prefixed with an underscore because it is internal to
762
the class that defines it, and should not be called directly by user
763
programs. However, you **are** expected to override this method in
764
your own extension classes.
766
### Example: `SimpleProtocol` parser
768
The example above of a simple protocol parser can be implemented much
769
more simply by using the higher level `Transform` stream class.
771
In this example, rather than providing the input as an argument, it
772
would be piped into the parser, which is a more idiomatic Node stream
776
function SimpleProtocol(options) {
777
if (!(this instanceof SimpleProtocol))
778
return new SimpleProtocol(options);
780
Transform.call(this, options);
781
this._inBody = false;
782
this._sawFirstCr = false;
783
this._rawHeader = [];
787
SimpleProtocol.prototype = Object.create(
788
Transform.prototype, { constructor: { value: SimpleProtocol }});
790
SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
792
// check if the chunk has a \n\n
794
for (var i = 0; i < chunk.length; i++) {
795
if (chunk[i] === 10) { // '\n'
796
if (this._sawFirstCr) {
800
this._sawFirstCr = true;
803
this._sawFirstCr = false;
808
// still waiting for the \n\n
809
// stash the chunk, and try again.
810
this._rawHeader.push(chunk);
813
var h = chunk.slice(0, split);
814
this._rawHeader.push(h);
815
var header = Buffer.concat(this._rawHeader).toString();
817
this.header = JSON.parse(header);
819
this.emit('error', new Error('invalid simple protocol data'));
822
// and let them know that we are done parsing the header.
823
this.emit('header', this.header);
825
// now, because we got some extra data, emit this first.
829
// from there on, just provide the data to our consumer as-is.
835
var parser = new SimpleProtocol();
838
// Now parser is a readable stream that will emit 'header'
839
// with the parsed header data.
843
## Class: stream.PassThrough
845
This is a trivial implementation of a `Transform` stream that simply
846
passes the input bytes across to the output. Its purpose is mainly
847
for examples and testing, but there are occasionally use cases where
848
it can come in handy.
851
[EventEmitter]: http://nodejs.org/api/events.html#events_class_events_eventemitter