~yolanda.robla/ubuntu/trusty/nodejs/add_distribution

« back to all changes in this revision

Viewing changes to doc/blog/feature/streams2.md

  • Committer: Package Import Robot
  • Author(s): Jérémy Lal
  • Date: 2013-08-14 00:16:46 UTC
  • mfrom: (7.1.40 sid)
  • Revision ID: package-import@ubuntu.com-20130814001646-bzlysfh8sd6mukbo
Tags: 0.10.15~dfsg1-4
* Update 2005 patch, adding a handful of tests that can fail on
  slow platforms.
* Add 1004 patch to fix test failures when writing NaN to buffer
  on mipsel.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
title: A New Streaming API for Node v0.10
 
2
author: Isaac Z. Schlueter
 
3
date: Fri Dec 21 00:45:13 UTC 2012
 
4
slug: streams2
 
5
category: feature
 
6
 
 
7
**tl;dr**
 
8
 
 
9
* Node streams are great, except for all the ways in which they're
 
10
  terrible.
 
11
* A new Stream implementation is coming in 0.10, that has gotten the
 
12
  nickname "streams2".
 
13
* Readable streams have a `read()` method that returns a buffer or
 
14
  null.  (More documentation included below.)
 
15
* `'data'` events, `pause()`, and `resume()` will still work as before
 
16
  (except that they'll actully work how you'd expect).
 
17
* Old programs will **almost always** work without modification, but
 
18
  streams start out in a paused state, and need to be read from to be
 
19
  consumed.
 
20
* **WARNING**: If you never add a `'data'` event handler, or call
 
21
  `resume()`, then it'll sit in a paused state forever and never
 
22
  emit `'end'`.
 
23
 
 
24
-------
 
25
 
 
26
Throughout the life of Node, we've been gradually iterating on the
 
27
ideal event-based API for handling data.  Over time, this developed
 
28
into the "Stream" interface that you see throughout Node's core
 
29
modules and many of the modules in npm.
 
30
 
 
31
Consistent interfaces increase the portability and reliability of our
 
32
programs and libraries.  Overall, the move from domain-specific events
 
33
and methods towards a unified stream interface was a huge win.
 
34
However, there are still several problems with Node's streams as of
 
35
v0.8.  In a nutshell:
 
36
 
 
37
1. The `pause()` method doesn't pause.  It is advisory-only.  In
 
38
   Node's implementation, this makes things much simpler, but it's
 
39
   confusing to users, and doesn't do what it looks like it does.
 
40
2. `'data'` events come right away (whether you're ready or not).
 
41
   This makes it unreasonably difficult to do common tasks like load a
 
42
   user's session before deciding how to handle their request.
 
43
3. There is no way to consume a specific number of bytes, and then
 
44
   leave the rest for some other part of the program to deal with.
 
45
4. It's unreasonably difficult to implement streams and get all the
 
46
   intricacies of pause, resume, write-buffering, and data events
 
47
   correct.  The lack of shared classes mean that we all have to solve
 
48
   the same problems repeatedly, making similar mistakes and similar
 
49
   bugs.
 
50
 
 
51
Common simple tasks should be easy, or we aren't doing our job.
 
52
People often say that Node is better than most other platforms at this
 
53
stuff, but in my opinion, that is less of a compliment and more of an
 
54
indictment of the current state of software.  Being better than the
 
55
next guy isn't enough; we have to be the best imaginable.  While they
 
56
were a big step in the right direction, the Streams in Node up until
 
57
now leave a lot wanting.
 
58
 
 
59
So, just fix it, right?
 
60
 
 
61
Well, we are sitting on the results of several years of explosive
 
62
growth in the Node community, so any changes have to be made very
 
63
carefully.  If we break all the Node programs in 0.10, then no one
 
64
will ever want to upgrade to 0.10, and it's all pointless.  We had
 
65
this conversation around 0.4, then again around 0.6, then again around
 
66
0.8.  Every time, the conclusion has been "Too much work, too hard to
 
67
make backwards-compatible", and we always had more pressing problems
 
68
to solve.
 
69
 
 
70
In 0.10, we cannot put it off any longer.  We've bitten the bullet and
 
71
are making a significant change to the Stream implementation.  You may
 
72
have seen conversations on twitter or IRC or the mailing list about
 
73
"streams2".  I also gave [a talk in
 
74
November](https://dl.dropbox.com/u/3685/presentations/streams2/streams2-ko.pdf)
 
75
about this subject.  A lot of node module authors have been involved
 
76
with the development of streams2 (and of course the node core team).
 
77
 
 
78
## streams2
 
79
 
 
80
The feature is described pretty thoroughly in the documentation, so
 
81
I'm including it below.  Please read it, especially the section on
 
82
"compatibility".  There's a caveat there that is unfortunately
 
83
unavoidable, but hopefully enough of an edge case that it's easily
 
84
worked around.
 
85
 
 
86
The first preview release with this change will be 0.9.4.  I highly
 
87
recommend trying this release and providing feedback before it lands
 
88
in a stable version.
 
89
 
 
90
As of writing this post, there are some known performance regressions,
 
91
especially in the http module.  We are fanatical about maintaining
 
92
performance in Node.js, so of course this will have to be fixed before
 
93
the v0.10 stable release.  (Watch for a future blog post on the tools
 
94
and techniques that have been useful in tracking down these issues.)
 
95
 
 
96
There may be minor changes as necessary to fix bugs and improve
 
97
performance, but the API at this point should be considered feature
 
98
complete.  It correctly does all the things we need it to do, it just
 
99
doesn't do them quite well enough yet.  As always, be wary of running
 
100
unstable releases in production, of course, but I encourage you to try
 
101
it out and see what you think.  Especially, if you have tests that you
 
102
can run on your modules and libraries, that would be extremely useful
 
103
feedback.
 
104
 
 
105
--------
 
106
 
 
107
# Stream
 
108
 
 
109
    Stability: 2 - Unstable
 
110
 
 
111
A stream is an abstract interface implemented by various objects in
 
112
Node.  For example a request to an HTTP server is a stream, as is
 
113
stdout. Streams are readable, writable, or both. All streams are
 
114
instances of [EventEmitter][]
 
115
 
 
116
You can load the Stream base classes by doing `require('stream')`.
 
117
There are base classes provided for Readable streams, Writable
 
118
streams, Duplex streams, and Transform streams.
 
119
 
 
120
## Compatibility
 
121
 
 
122
In earlier versions of Node, the Readable stream interface was
 
123
simpler, but also less powerful and less useful.
 
124
 
 
125
* Rather than waiting for you to call the `read()` method, `'data'`
 
126
  events would start emitting immediately.  If you needed to do some
 
127
  I/O to decide how to handle data, then you had to store the chunks
 
128
  in some kind of buffer so that they would not be lost.
 
129
* The `pause()` method was advisory, rather than guaranteed.  This
 
130
  meant that you still had to be prepared to receive `'data'` events
 
131
  even when the stream was in a paused state.
 
132
 
 
133
In Node v0.10, the Readable class described below was added.  For
 
134
backwards compatibility with older Node programs, Readable streams
 
135
switch into "old mode" when a `'data'` event handler is added, or when
 
136
the `pause()` or `resume()` methods are called.  The effect is that,
 
137
even if you are not using the new `read()` method and `'readable'`
 
138
event, you no longer have to worry about losing `'data'` chunks.
 
139
 
 
140
Most programs will continue to function normally.  However, this
 
141
introduces an edge case in the following conditions:
 
142
 
 
143
* No `'data'` event handler is added.
 
144
* The `pause()` and `resume()` methods are never called.
 
145
 
 
146
For example, consider the following code:
 
147
 
 
148
```javascript
 
149
// WARNING!  BROKEN!
 
150
net.createServer(function(socket) {
 
151
 
 
152
  // we add an 'end' method, but never consume the data
 
153
  socket.on('end', function() {
 
154
    // It will never get here.
 
155
    socket.end('I got your message (but didnt read it)\n');
 
156
  });
 
157
 
 
158
}).listen(1337);
 
159
```
 
160
 
 
161
In versions of node prior to v0.10, the incoming message data would be
 
162
simply discarded.  However, in Node v0.10 and beyond, the socket will
 
163
remain paused forever.
 
164
 
 
165
The workaround in this situation is to call the `resume()` method to
 
166
trigger "old mode" behavior:
 
167
 
 
168
```javascript
 
169
// Workaround
 
170
net.createServer(function(socket) {
 
171
 
 
172
  socket.on('end', function() {
 
173
    socket.end('I got your message (but didnt read it)\n');
 
174
  });
 
175
 
 
176
  // start the flow of data, discarding it.
 
177
  socket.resume();
 
178
 
 
179
}).listen(1337);
 
180
```
 
181
 
 
182
In addition to new Readable streams switching into old-mode, pre-v0.10
 
183
style streams can be wrapped in a Readable class using the `wrap()`
 
184
method.
 
185
 
 
186
## Class: stream.Readable
 
187
 
 
188
<!--type=class-->
 
189
 
 
190
A `Readable Stream` has the following methods, members, and events.
 
191
 
 
192
Note that `stream.Readable` is an abstract class designed to be
 
193
extended with an underlying implementation of the `_read(size)`
 
194
method. (See below.)
 
195
 
 
196
### new stream.Readable([options])
 
197
 
 
198
* `options` {Object}
 
199
  * `highWaterMark` {Number} The maximum number of bytes to store in
 
200
    the internal buffer before ceasing to read from the underlying
 
201
    resource.  Default=16kb
 
202
  * `encoding` {String} If specified, then buffers will be decoded to
 
203
    strings using the specified encoding.  Default=null
 
204
  * `objectMode` {Boolean} Whether this stream should behave
 
205
    as a stream of objects. Meaning that stream.read(n) returns
 
206
    a single value instead of a Buffer of size n
 
207
 
 
208
In classes that extend the Readable class, make sure to call the
 
209
constructor so that the buffering settings can be properly
 
210
initialized.
 
211
 
 
212
### readable.\_read(size)
 
213
 
 
214
* `size` {Number} Number of bytes to read asynchronously
 
215
 
 
216
Note: **This function should NOT be called directly.**  It should be
 
217
implemented by child classes, and called by the internal Readable
 
218
class methods only.
 
219
 
 
220
All Readable stream implementations must provide a `_read` method
 
221
to fetch data from the underlying resource.
 
222
 
 
223
This method is prefixed with an underscore because it is internal to
 
224
the class that defines it, and should not be called directly by user
 
225
programs.  However, you **are** expected to override this method in
 
226
your own extension classes.
 
227
 
 
228
When data is available, put it into the read queue by calling
 
229
`readable.push(chunk)`.  If `push` returns false, then you should stop
 
230
reading.  When `_read` is called again, you should start pushing more
 
231
data.
 
232
 
 
233
The `size` argument is advisory.  Implementations where a "read" is a
 
234
single call that returns data can use this to know how much data to
 
235
fetch.  Implementations where that is not relevant, such as TCP or
 
236
TLS, may ignore this argument, and simply provide data whenever it
 
237
becomes available.  There is no need, for example to "wait" until
 
238
`size` bytes are available before calling `stream.push(chunk)`.
 
239
 
 
240
### readable.push(chunk)
 
241
 
 
242
* `chunk` {Buffer | null | String} Chunk of data to push into the read queue
 
243
* return {Boolean} Whether or not more pushes should be performed
 
244
 
 
245
Note: **This function should be called by Readable implementors, NOT
 
246
by consumers of Readable subclasses.**  The `_read()` function will not
 
247
be called again until at least one `push(chunk)` call is made.  If no
 
248
data is available, then you MAY call `push('')` (an empty string) to
 
249
allow a future `_read` call, without adding any data to the queue.
 
250
 
 
251
The `Readable` class works by putting data into a read queue to be
 
252
pulled out later by calling the `read()` method when the `'readable'`
 
253
event fires.
 
254
 
 
255
The `push()` method will explicitly insert some data into the read
 
256
queue.  If it is called with `null` then it will signal the end of the
 
257
data.
 
258
 
 
259
In some cases, you may be wrapping a lower-level source which has some
 
260
sort of pause/resume mechanism, and a data callback.  In those cases,
 
261
you could wrap the low-level source object by doing something like
 
262
this:
 
263
 
 
264
```javascript
 
265
// source is an object with readStop() and readStart() methods,
 
266
// and an `ondata` member that gets called when it has data, and
 
267
// an `onend` member that gets called when the data is over.
 
268
 
 
269
var stream = new Readable();
 
270
 
 
271
source.ondata = function(chunk) {
 
272
  // if push() returns false, then we need to stop reading from source
 
273
  if (!stream.push(chunk))
 
274
    source.readStop();
 
275
};
 
276
 
 
277
source.onend = function() {
 
278
  stream.push(null);
 
279
};
 
280
 
 
281
// _read will be called when the stream wants to pull more data in
 
282
// the advisory size argument is ignored in this case.
 
283
stream._read = function(n) {
 
284
  source.readStart();
 
285
};
 
286
```
 
287
 
 
288
### readable.unshift(chunk)
 
289
 
 
290
* `chunk` {Buffer | null | String} Chunk of data to unshift onto the read queue
 
291
* return {Boolean} Whether or not more pushes should be performed
 
292
 
 
293
This is the corollary of `readable.push(chunk)`.  Rather than putting
 
294
the data at the *end* of the read queue, it puts it at the *front* of
 
295
the read queue.
 
296
 
 
297
This is useful in certain use-cases where a stream is being consumed
 
298
by a parser, which needs to "un-consume" some data that it has
 
299
optimistically pulled out of the source.
 
300
 
 
301
```javascript
 
302
// A parser for a simple data protocol.
 
303
// The "header" is a JSON object, followed by 2 \n characters, and
 
304
// then a message body.
 
305
//
 
306
// Note: This can be done more simply as a Transform stream.  See below.
 
307
 
 
308
function SimpleProtocol(source, options) {
 
309
  if (!(this instanceof SimpleProtocol))
 
310
    return new SimpleProtocol(options);
 
311
 
 
312
  Readable.call(this, options);
 
313
  this._inBody = false;
 
314
  this._sawFirstCr = false;
 
315
 
 
316
  // source is a readable stream, such as a socket or file
 
317
  this._source = source;
 
318
 
 
319
  var self = this;
 
320
  source.on('end', function() {
 
321
    self.push(null);
 
322
  });
 
323
 
 
324
  // give it a kick whenever the source is readable
 
325
  // read(0) will not consume any bytes
 
326
  source.on('readable', function() {
 
327
    self.read(0);
 
328
  });
 
329
 
 
330
  this._rawHeader = [];
 
331
  this.header = null;
 
332
}
 
333
 
 
334
SimpleProtocol.prototype = Object.create(
 
335
  Readable.prototype, { constructor: { value: SimpleProtocol }});
 
336
 
 
337
SimpleProtocol.prototype._read = function(n) {
 
338
  if (!this._inBody) {
 
339
    var chunk = this._source.read();
 
340
 
 
341
    // if the source doesn't have data, we don't have data yet.
 
342
    if (chunk === null)
 
343
      return this.push('');
 
344
 
 
345
    // check if the chunk has a \n\n
 
346
    var split = -1;
 
347
    for (var i = 0; i < chunk.length; i++) {
 
348
      if (chunk[i] === 10) { // '\n'
 
349
        if (this._sawFirstCr) {
 
350
          split = i;
 
351
          break;
 
352
        } else {
 
353
          this._sawFirstCr = true;
 
354
        }
 
355
      } else {
 
356
        this._sawFirstCr = false;
 
357
      }
 
358
    }
 
359
 
 
360
    if (split === -1) {
 
361
      // still waiting for the \n\n
 
362
      // stash the chunk, and try again.
 
363
      this._rawHeader.push(chunk);
 
364
      this.push('');
 
365
    } else {
 
366
      this._inBody = true;
 
367
      var h = chunk.slice(0, split);
 
368
      this._rawHeader.push(h);
 
369
      var header = Buffer.concat(this._rawHeader).toString();
 
370
      try {
 
371
        this.header = JSON.parse(header);
 
372
      } catch (er) {
 
373
        this.emit('error', new Error('invalid simple protocol data'));
 
374
        return;
 
375
      }
 
376
      // now, because we got some extra data, unshift the rest
 
377
      // back into the read queue so that our consumer will see it.
 
378
      var b = chunk.slice(split);
 
379
      this.unshift(b);
 
380
 
 
381
      // and let them know that we are done parsing the header.
 
382
      this.emit('header', this.header);
 
383
    }
 
384
  } else {
 
385
    // from there on, just provide the data to our consumer.
 
386
    // careful not to push(null), since that would indicate EOF.
 
387
    var chunk = this._source.read();
 
388
    if (chunk) this.push(chunk);
 
389
  }
 
390
};
 
391
 
 
392
// Usage:
 
393
var parser = new SimpleProtocol(source);
 
394
// Now parser is a readable stream that will emit 'header'
 
395
// with the parsed header data.
 
396
```
 
397
 
 
398
### readable.wrap(stream)
 
399
 
 
400
* `stream` {Stream} An "old style" readable stream
 
401
 
 
402
If you are using an older Node library that emits `'data'` events and
 
403
has a `pause()` method that is advisory only, then you can use the
 
404
`wrap()` method to create a Readable stream that uses the old stream
 
405
as its data source.
 
406
 
 
407
For example:
 
408
 
 
409
```javascript
 
410
var OldReader = require('./old-api-module.js').OldReader;
 
411
var oreader = new OldReader;
 
412
var Readable = require('stream').Readable;
 
413
var myReader = new Readable().wrap(oreader);
 
414
 
 
415
myReader.on('readable', function() {
 
416
  myReader.read(); // etc.
 
417
});
 
418
```
 
419
 
 
420
### Event: 'readable'
 
421
 
 
422
When there is data ready to be consumed, this event will fire.
 
423
 
 
424
When this event emits, call the `read()` method to consume the data.
 
425
 
 
426
### Event: 'end'
 
427
 
 
428
Emitted when the stream has received an EOF (FIN in TCP terminology).
 
429
Indicates that no more `'data'` events will happen. If the stream is
 
430
also writable, it may be possible to continue writing.
 
431
 
 
432
### Event: 'data'
 
433
 
 
434
The `'data'` event emits either a `Buffer` (by default) or a string if
 
435
`setEncoding()` was used.
 
436
 
 
437
Note that adding a `'data'` event listener will switch the Readable
 
438
stream into "old mode", where data is emitted as soon as it is
 
439
available, rather than waiting for you to call `read()` to consume it.
 
440
 
 
441
### Event: 'error'
 
442
 
 
443
Emitted if there was an error receiving data.
 
444
 
 
445
### Event: 'close'
 
446
 
 
447
Emitted when the underlying resource (for example, the backing file
 
448
descriptor) has been closed. Not all streams will emit this.
 
449
 
 
450
### readable.setEncoding(encoding)
 
451
 
 
452
Makes the `'data'` event emit a string instead of a `Buffer`. `encoding`
 
453
can be `'utf8'`, `'utf16le'` (`'ucs2'`), `'ascii'`, or `'hex'`.
 
454
 
 
455
The encoding can also be set by specifying an `encoding` field to the
 
456
constructor.
 
457
 
 
458
### readable.read([size])
 
459
 
 
460
* `size` {Number | null} Optional number of bytes to read.
 
461
* Return: {Buffer | String | null}
 
462
 
 
463
Note: **This function SHOULD be called by Readable stream users.**
 
464
 
 
465
Call this method to consume data once the `'readable'` event is
 
466
emitted.
 
467
 
 
468
The `size` argument will set a minimum number of bytes that you are
 
469
interested in.  If not set, then the entire content of the internal
 
470
buffer is returned.
 
471
 
 
472
If there is no data to consume, or if there are fewer bytes in the
 
473
internal buffer than the `size` argument, then `null` is returned, and
 
474
a future `'readable'` event will be emitted when more is available.
 
475
 
 
476
Calling `stream.read(0)` will always return `null`, and will trigger a
 
477
refresh of the internal buffer, but otherwise be a no-op.
 
478
 
 
479
### readable.pipe(destination, [options])
 
480
 
 
481
* `destination` {Writable Stream}
 
482
* `options` {Object} Optional
 
483
  * `end` {Boolean} Default=true
 
484
 
 
485
Connects this readable stream to `destination` WriteStream. Incoming
 
486
data on this stream gets written to `destination`.  Properly manages
 
487
back-pressure so that a slow destination will not be overwhelmed by a
 
488
fast readable stream.
 
489
 
 
490
This function returns the `destination` stream.
 
491
 
 
492
For example, emulating the Unix `cat` command:
 
493
 
 
494
    process.stdin.pipe(process.stdout);
 
495
 
 
496
By default `end()` is called on the destination when the source stream
 
497
emits `end`, so that `destination` is no longer writable. Pass `{ end:
 
498
false }` as `options` to keep the destination stream open.
 
499
 
 
500
This keeps `writer` open so that "Goodbye" can be written at the
 
501
end.
 
502
 
 
503
    reader.pipe(writer, { end: false });
 
504
    reader.on("end", function() {
 
505
      writer.end("Goodbye\n");
 
506
    });
 
507
 
 
508
Note that `process.stderr` and `process.stdout` are never closed until
 
509
the process exits, regardless of the specified options.
 
510
 
 
511
### readable.unpipe([destination])
 
512
 
 
513
* `destination` {Writable Stream} Optional
 
514
 
 
515
Undo a previously established `pipe()`.  If no destination is
 
516
provided, then all previously established pipes are removed.
 
517
 
 
518
### readable.pause()
 
519
 
 
520
Switches the readable stream into "old mode", where data is emitted
 
521
using a `'data'` event rather than being buffered for consumption via
 
522
the `read()` method.
 
523
 
 
524
Ceases the flow of data.  No `'data'` events are emitted while the
 
525
stream is in a paused state.
 
526
 
 
527
### readable.resume()
 
528
 
 
529
Switches the readable stream into "old mode", where data is emitted
 
530
using a `'data'` event rather than being buffered for consumption via
 
531
the `read()` method.
 
532
 
 
533
Resumes the incoming `'data'` events after a `pause()`.
 
534
 
 
535
 
 
536
## Class: stream.Writable
 
537
 
 
538
<!--type=class-->
 
539
 
 
540
A `Writable` Stream has the following methods, members, and events.
 
541
 
 
542
Note that `stream.Writable` is an abstract class designed to be
 
543
extended with an underlying implementation of the
 
544
`_write(chunk, encoding, cb)` method. (See below.)
 
545
 
 
546
### new stream.Writable([options])
 
547
 
 
548
* `options` {Object}
 
549
  * `highWaterMark` {Number} Buffer level when `write()` starts
 
550
    returning false. Default=16kb
 
551
  * `decodeStrings` {Boolean} Whether or not to decode strings into
 
552
    Buffers before passing them to `_write()`.  Default=true
 
553
 
 
554
In classes that extend the Writable class, make sure to call the
 
555
constructor so that the buffering settings can be properly
 
556
initialized.
 
557
 
 
558
### writable.\_write(chunk, encoding, callback)
 
559
 
 
560
* `chunk` {Buffer | String} The chunk to be written.  Will always
 
561
  be a buffer unless the `decodeStrings` option was set to `false`.
 
562
* `encoding` {String} If the chunk is a string, then this is the
 
563
  encoding type.  Ignore chunk is a buffer.  Note that chunk will
 
564
  **always** be a buffer unless the `decodeStrings` option is
 
565
  explicitly set to `false`.
 
566
* `callback` {Function} Call this function (optionally with an error
 
567
  argument) when you are done processing the supplied chunk.
 
568
 
 
569
All Writable stream implementations must provide a `_write` method to
 
570
send data to the underlying resource.
 
571
 
 
572
Note: **This function MUST NOT be called directly.**  It should be
 
573
implemented by child classes, and called by the internal Writable
 
574
class methods only.
 
575
 
 
576
Call the callback using the standard `callback(error)` pattern to
 
577
signal that the write completed successfully or with an error.
 
578
 
 
579
If the `decodeStrings` flag is set in the constructor options, then
 
580
`chunk` may be a string rather than a Buffer, and `encoding` will
 
581
indicate the sort of string that it is.  This is to support
 
582
implementations that have an optimized handling for certain string
 
583
data encodings.  If you do not explicitly set the `decodeStrings`
 
584
option to `false`, then you can safely ignore the `encoding` argument,
 
585
and assume that `chunk` will always be a Buffer.
 
586
 
 
587
This method is prefixed with an underscore because it is internal to
 
588
the class that defines it, and should not be called directly by user
 
589
programs.  However, you **are** expected to override this method in
 
590
your own extension classes.
 
591
 
 
592
 
 
593
### writable.write(chunk, [encoding], [callback])
 
594
 
 
595
* `chunk` {Buffer | String} Data to be written
 
596
* `encoding` {String} Optional.  If `chunk` is a string, then encoding
 
597
  defaults to `'utf8'`
 
598
* `callback` {Function} Optional.  Called when this chunk is
 
599
  successfully written.
 
600
* Returns {Boolean}
 
601
 
 
602
Writes `chunk` to the stream.  Returns `true` if the data has been
 
603
flushed to the underlying resource.  Returns `false` to indicate that
 
604
the buffer is full, and the data will be sent out in the future. The
 
605
`'drain'` event will indicate when the buffer is empty again.
 
606
 
 
607
The specifics of when `write()` will return false, is determined by
 
608
the `highWaterMark` option provided to the constructor.
 
609
 
 
610
### writable.end([chunk], [encoding], [callback])
 
611
 
 
612
* `chunk` {Buffer | String} Optional final data to be written
 
613
* `encoding` {String} Optional.  If `chunk` is a string, then encoding
 
614
  defaults to `'utf8'`
 
615
* `callback` {Function} Optional.  Called when the final chunk is
 
616
  successfully written.
 
617
 
 
618
Call this method to signal the end of the data being written to the
 
619
stream.
 
620
 
 
621
### Event: 'drain'
 
622
 
 
623
Emitted when the stream's write queue empties and it's safe to write
 
624
without buffering again. Listen for it when `stream.write()` returns
 
625
`false`.
 
626
 
 
627
### Event: 'close'
 
628
 
 
629
Emitted when the underlying resource (for example, the backing file
 
630
descriptor) has been closed. Not all streams will emit this.
 
631
 
 
632
### Event: 'finish'
 
633
 
 
634
When `end()` is called and there are no more chunks to write, this
 
635
event is emitted.
 
636
 
 
637
### Event: 'pipe'
 
638
 
 
639
* `source` {Readable Stream}
 
640
 
 
641
Emitted when the stream is passed to a readable stream's pipe method.
 
642
 
 
643
### Event 'unpipe'
 
644
 
 
645
* `source` {Readable Stream}
 
646
 
 
647
Emitted when a previously established `pipe()` is removed using the
 
648
source Readable stream's `unpipe()` method.
 
649
 
 
650
## Class: stream.Duplex
 
651
 
 
652
<!--type=class-->
 
653
 
 
654
A "duplex" stream is one that is both Readable and Writable, such as a
 
655
TCP socket connection.
 
656
 
 
657
Note that `stream.Duplex` is an abstract class designed to be
 
658
extended with an underlying implementation of the `_read(size)`
 
659
and `_write(chunk, encoding, callback)` methods as you would with a Readable or
 
660
Writable stream class.
 
661
 
 
662
Since JavaScript doesn't have multiple prototypal inheritance, this
 
663
class prototypally inherits from Readable, and then parasitically from
 
664
Writable.  It is thus up to the user to implement both the lowlevel
 
665
`_read(n)` method as well as the lowlevel `_write(chunk, encoding, cb)` method
 
666
on extension duplex classes.
 
667
 
 
668
### new stream.Duplex(options)
 
669
 
 
670
* `options` {Object} Passed to both Writable and Readable
 
671
  constructors. Also has the following fields:
 
672
  * `allowHalfOpen` {Boolean} Default=true.  If set to `false`, then
 
673
    the stream will automatically end the readable side when the
 
674
    writable side ends and vice versa.
 
675
 
 
676
In classes that extend the Duplex class, make sure to call the
 
677
constructor so that the buffering settings can be properly
 
678
initialized.
 
679
 
 
680
## Class: stream.Transform
 
681
 
 
682
A "transform" stream is a duplex stream where the output is causally
 
683
connected in some way to the input, such as a zlib stream or a crypto
 
684
stream.
 
685
 
 
686
There is no requirement that the output be the same size as the input,
 
687
the same number of chunks, or arrive at the same time.  For example, a
 
688
Hash stream will only ever have a single chunk of output which is
 
689
provided when the input is ended.  A zlib stream will either produce
 
690
much smaller or much larger than its input.
 
691
 
 
692
Rather than implement the `_read()` and `_write()` methods, Transform
 
693
classes must implement the `_transform()` method, and may optionally
 
694
also implement the `_flush()` method.  (See below.)
 
695
 
 
696
### new stream.Transform([options])
 
697
 
 
698
* `options` {Object} Passed to both Writable and Readable
 
699
  constructors.
 
700
 
 
701
In classes that extend the Transform class, make sure to call the
 
702
constructor so that the buffering settings can be properly
 
703
initialized.
 
704
 
 
705
### transform.\_transform(chunk, encoding, callback)
 
706
 
 
707
* `chunk` {Buffer | String} The chunk to be transformed.  Will always
 
708
  be a buffer unless the `decodeStrings` option was set to `false`.
 
709
* `encoding` {String} If the chunk is a string, then this is the
 
710
  encoding type.  (Ignore if `decodeStrings` chunk is a buffer.)
 
711
* `callback` {Function} Call this function (optionally with an error
 
712
  argument) when you are done processing the supplied chunk.
 
713
 
 
714
Note: **This function MUST NOT be called directly.**  It should be
 
715
implemented by child classes, and called by the internal Transform
 
716
class methods only.
 
717
 
 
718
All Transform stream implementations must provide a `_transform`
 
719
method to accept input and produce output.
 
720
 
 
721
`_transform` should do whatever has to be done in this specific
 
722
Transform class, to handle the bytes being written, and pass them off
 
723
to the readable portion of the interface.  Do asynchronous I/O,
 
724
process things, and so on.
 
725
 
 
726
Call `transform.push(outputChunk)` 0 or more times to generate output
 
727
from this input chunk, depending on how much data you want to output
 
728
as a result of this chunk.
 
729
 
 
730
Call the callback function only when the current chunk is completely
 
731
consumed.  Note that there may or may not be output as a result of any
 
732
particular input chunk.
 
733
 
 
734
This method is prefixed with an underscore because it is internal to
 
735
the class that defines it, and should not be called directly by user
 
736
programs.  However, you **are** expected to override this method in
 
737
your own extension classes.
 
738
 
 
739
### transform.\_flush(callback)
 
740
 
 
741
* `callback` {Function} Call this function (optionally with an error
 
742
  argument) when you are done flushing any remaining data.
 
743
 
 
744
Note: **This function MUST NOT be called directly.**  It MAY be implemented
 
745
by child classes, and if so, will be called by the internal Transform
 
746
class methods only.
 
747
 
 
748
In some cases, your transform operation may need to emit a bit more
 
749
data at the end of the stream.  For example, a `Zlib` compression
 
750
stream will store up some internal state so that it can optimally
 
751
compress the output.  At the end, however, it needs to do the best it
 
752
can with what is left, so that the data will be complete.
 
753
 
 
754
In those cases, you can implement a `_flush` method, which will be
 
755
called at the very end, after all the written data is consumed, but
 
756
before emitting `end` to signal the end of the readable side.  Just
 
757
like with `_transform`, call `transform.push(chunk)` zero or more
 
758
times, as appropriate, and call `callback` when the flush operation is
 
759
complete.
 
760
 
 
761
This method is prefixed with an underscore because it is internal to
 
762
the class that defines it, and should not be called directly by user
 
763
programs.  However, you **are** expected to override this method in
 
764
your own extension classes.
 
765
 
 
766
### Example: `SimpleProtocol` parser
 
767
 
 
768
The example above of a simple protocol parser can be implemented much
 
769
more simply by using the higher level `Transform` stream class.
 
770
 
 
771
In this example, rather than providing the input as an argument, it
 
772
would be piped into the parser, which is a more idiomatic Node stream
 
773
approach.
 
774
 
 
775
```javascript
 
776
function SimpleProtocol(options) {
 
777
  if (!(this instanceof SimpleProtocol))
 
778
    return new SimpleProtocol(options);
 
779
 
 
780
  Transform.call(this, options);
 
781
  this._inBody = false;
 
782
  this._sawFirstCr = false;
 
783
  this._rawHeader = [];
 
784
  this.header = null;
 
785
}
 
786
 
 
787
SimpleProtocol.prototype = Object.create(
 
788
  Transform.prototype, { constructor: { value: SimpleProtocol }});
 
789
 
 
790
SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
 
791
  if (!this._inBody) {
 
792
    // check if the chunk has a \n\n
 
793
    var split = -1;
 
794
    for (var i = 0; i < chunk.length; i++) {
 
795
      if (chunk[i] === 10) { // '\n'
 
796
        if (this._sawFirstCr) {
 
797
          split = i;
 
798
          break;
 
799
        } else {
 
800
          this._sawFirstCr = true;
 
801
        }
 
802
      } else {
 
803
        this._sawFirstCr = false;
 
804
      }
 
805
    }
 
806
 
 
807
    if (split === -1) {
 
808
      // still waiting for the \n\n
 
809
      // stash the chunk, and try again.
 
810
      this._rawHeader.push(chunk);
 
811
    } else {
 
812
      this._inBody = true;
 
813
      var h = chunk.slice(0, split);
 
814
      this._rawHeader.push(h);
 
815
      var header = Buffer.concat(this._rawHeader).toString();
 
816
      try {
 
817
        this.header = JSON.parse(header);
 
818
      } catch (er) {
 
819
        this.emit('error', new Error('invalid simple protocol data'));
 
820
        return;
 
821
      }
 
822
      // and let them know that we are done parsing the header.
 
823
      this.emit('header', this.header);
 
824
 
 
825
      // now, because we got some extra data, emit this first.
 
826
      this.push(b);
 
827
    }
 
828
  } else {
 
829
    // from there on, just provide the data to our consumer as-is.
 
830
    this.push(b);
 
831
  }
 
832
  done();
 
833
};
 
834
 
 
835
var parser = new SimpleProtocol();
 
836
source.pipe(parser)
 
837
 
 
838
// Now parser is a readable stream that will emit 'header'
 
839
// with the parsed header data.
 
840
```
 
841
 
 
842
 
 
843
## Class: stream.PassThrough
 
844
 
 
845
This is a trivial implementation of a `Transform` stream that simply
 
846
passes the input bytes across to the output.  Its purpose is mainly
 
847
for examples and testing, but there are occasionally use cases where
 
848
it can come in handy.
 
849
 
 
850
 
 
851
[EventEmitter]: http://nodejs.org/api/events.html#events_class_events_eventemitter