1
// Copyright Joyent, Inc. and other Node contributors.
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the
5
// "Software"), to deal in the Software without restriction, including
6
// without limitation the rights to use, copy, modify, merge, publish,
7
// distribute, sublicense, and/or sell copies of the Software, and to permit
8
// persons to whom the Software is furnished to do so, subject to the
9
// following conditions:
11
// The above copyright notice and this permission notice shall be included
12
// in all copies or substantial portions of the Software.
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20
// USE OR OTHER DEALINGS IN THE SOFTWARE.
22
module.exports = Readable;
25
var isArray = require('isarray');
30
var Buffer = require('buffer').Buffer;
33
Readable.ReadableState = ReadableState;
35
var EE = require('events').EventEmitter;
38
if (!EE.listenerCount) EE.listenerCount = function(emitter, type) {
39
return emitter.listeners(type).length;
43
var Stream = require('stream');
46
var util = require('core-util-is');
47
util.inherits = require('inherits');
52
util.inherits(Readable, Stream);
54
function ReadableState(options, stream) {
55
options = options || {};
57
// the point at which it stops calling _read() to fill the buffer
58
// Note: 0 is a valid value, means "don't call _read preemptively ever"
59
var hwm = options.highWaterMark;
60
this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
63
this.highWaterMark = ~~this.highWaterMark;
71
this.endEmitted = false;
74
// In streams that never have any data, and do push(null) right away,
75
// the consumer can miss the 'end' event if they do some I/O before
76
// consuming the stream. So, we don't emit('end') until some reading
78
this.calledRead = false;
80
// a flag to be able to tell if the onwrite cb is called immediately,
81
// or on a later tick. We set this to true at first, becuase any
82
// actions that shouldn't happen until "later" should generally also
83
// not happen before the first write call.
86
// whenever we return null, then we set a flag to say
87
// that we're awaiting a 'readable' event emission.
88
this.needReadable = false;
89
this.emittedReadable = false;
90
this.readableListening = false;
93
// object stream flag. Used to make read(n) ignore n and to
94
// make all the buffer merging and length checks go away
95
this.objectMode = !!options.objectMode;
97
// Crypto is kind of old and crusty. Historically, its default string
98
// encoding is 'binary' so we have to make this configurable.
99
// Everything else in the universe uses 'utf8', though.
100
this.defaultEncoding = options.defaultEncoding || 'utf8';
102
// when piping, we only care about 'readable' events that happen
103
// after read()ing all the bytes and not getting any pushback.
106
// the number of writers that are awaiting a drain event in .pipe()s
109
// if true, a maybeReadMore has been scheduled
110
this.readingMore = false;
113
this.encoding = null;
114
if (options.encoding) {
116
StringDecoder = require('string_decoder/').StringDecoder;
117
this.decoder = new StringDecoder(options.encoding);
118
this.encoding = options.encoding;
122
function Readable(options) {
123
if (!(this instanceof Readable))
124
return new Readable(options);
126
this._readableState = new ReadableState(options, this);
129
this.readable = true;
134
// Manually shove something into the read() buffer.
135
// This returns true if the highWaterMark has not been hit yet,
136
// similar to how Writable.write() returns true if you should
137
// write() some more.
138
Readable.prototype.push = function(chunk, encoding) {
139
var state = this._readableState;
141
if (typeof chunk === 'string' && !state.objectMode) {
142
encoding = encoding || state.defaultEncoding;
143
if (encoding !== state.encoding) {
144
chunk = new Buffer(chunk, encoding);
149
return readableAddChunk(this, state, chunk, encoding, false);
152
// Unshift should *always* be something directly out of read()
153
Readable.prototype.unshift = function(chunk) {
154
var state = this._readableState;
155
return readableAddChunk(this, state, chunk, '', true);
158
function readableAddChunk(stream, state, chunk, encoding, addToFront) {
159
var er = chunkInvalid(state, chunk);
161
stream.emit('error', er);
162
} else if (chunk === null || chunk === undefined) {
163
state.reading = false;
165
onEofChunk(stream, state);
166
} else if (state.objectMode || chunk && chunk.length > 0) {
167
if (state.ended && !addToFront) {
168
var e = new Error('stream.push() after EOF');
169
stream.emit('error', e);
170
} else if (state.endEmitted && addToFront) {
171
var e = new Error('stream.unshift() after end event');
172
stream.emit('error', e);
174
if (state.decoder && !addToFront && !encoding)
175
chunk = state.decoder.write(chunk);
177
// update the buffer info.
178
state.length += state.objectMode ? 1 : chunk.length;
180
state.buffer.unshift(chunk);
182
state.reading = false;
183
state.buffer.push(chunk);
186
if (state.needReadable)
187
emitReadable(stream);
189
maybeReadMore(stream, state);
191
} else if (!addToFront) {
192
state.reading = false;
195
return needMoreData(state);
200
// if it's past the high water mark, we can push in some more.
201
// Also, if we have no data yet, we can stand some
202
// more bytes. This is to work around cases where hwm=0,
203
// such as the repl. Also, if the push() triggered a
204
// readable event, and the user called read(largeNumber) such that
205
// needReadable was set, then we ought to push more, so that another
206
// 'readable' event will be triggered.
207
function needMoreData(state) {
208
return !state.ended &&
209
(state.needReadable ||
210
state.length < state.highWaterMark ||
214
// backwards compatibility.
215
Readable.prototype.setEncoding = function(enc) {
217
StringDecoder = require('string_decoder/').StringDecoder;
218
this._readableState.decoder = new StringDecoder(enc);
219
this._readableState.encoding = enc;
222
// Don't raise the hwm > 128MB
223
var MAX_HWM = 0x800000;
224
function roundUpToNextPowerOf2(n) {
228
// Get the next highest power of 2
230
for (var p = 1; p < 32; p <<= 1) n |= n >> p;
236
function howMuchToRead(n, state) {
237
if (state.length === 0 && state.ended)
240
if (state.objectMode)
241
return n === 0 ? 0 : 1;
243
if (isNaN(n) || n === null) {
244
// only flow one buffer at a time
245
if (state.flowing && state.buffer.length)
246
return state.buffer[0].length;
254
// If we're asking for more than the target buffer level,
255
// then raise the water mark. Bump up to the next highest
256
// power of 2, to prevent increasing it excessively in tiny
258
if (n > state.highWaterMark)
259
state.highWaterMark = roundUpToNextPowerOf2(n);
261
// don't have that much. return null, unless we've ended.
262
if (n > state.length) {
264
state.needReadable = true;
273
// you can override either this method, or the async _read(n) below.
274
Readable.prototype.read = function(n) {
275
var state = this._readableState;
276
state.calledRead = true;
279
if (typeof n !== 'number' || n > 0)
280
state.emittedReadable = false;
282
// if we're doing read(0) to trigger a readable event, but we
283
// already have a bunch of data in the buffer, then just trigger
284
// the 'readable' event and move on.
286
state.needReadable &&
287
(state.length >= state.highWaterMark || state.ended)) {
292
n = howMuchToRead(n, state);
294
// if we've ended, and we're now clear, then finish it up.
295
if (n === 0 && state.ended) {
296
if (state.length === 0)
301
// All the actual chunk generation logic needs to be
302
// *below* the call to _read. The reason is that in certain
303
// synthetic stream cases, such as passthrough streams, _read
304
// may be a completely synchronous operation which may change
305
// the state of the read buffer, providing enough data when
306
// before there was *not* enough.
308
// So, the steps are:
309
// 1. Figure out what the state of things will be after we do
310
// a read from the buffer.
312
// 2. If that resulting state will trigger a _read, then call _read.
313
// Note that this may be asynchronous, or synchronous. Yes, it is
314
// deeply ugly to write APIs this way, but that still doesn't mean
315
// that the Readable class should behave improperly, as streams are
316
// designed to be sync/async agnostic.
317
// Take note if the _read call is sync or async (ie, if the read call
318
// has returned yet), so that we know whether or not it's safe to emit
321
// 3. Actually pull the requested chunks out of the buffer and return.
323
// if we need a readable event, then we need to do some reading.
324
var doRead = state.needReadable;
326
// if we currently have less than the highWaterMark, then also read some
327
if (state.length - n <= state.highWaterMark)
330
// however, if we've ended, then there's no point, and if we're already
331
// reading, then it's unnecessary.
332
if (state.ended || state.reading)
336
state.reading = true;
338
// if the length is currently zero, then we *need* a readable event.
339
if (state.length === 0)
340
state.needReadable = true;
341
// call internal read method
342
this._read(state.highWaterMark);
346
// If _read called its callback synchronously, then `reading`
347
// will be false, and we need to re-evaluate how much data we
348
// can return to the user.
349
if (doRead && !state.reading)
350
n = howMuchToRead(nOrig, state);
354
ret = fromList(n, state);
359
state.needReadable = true;
365
// If we have nothing in the buffer, then we want to know
366
// as soon as we *do* get something into the buffer.
367
if (state.length === 0 && !state.ended)
368
state.needReadable = true;
370
// If we happened to read() exactly the remaining amount in the
371
// buffer, and the EOF has been seen at this point, then make sure
372
// that we emit 'end' on the very next tick.
373
if (state.ended && !state.endEmitted && state.length === 0)
379
function chunkInvalid(state, chunk) {
381
if (!Buffer.isBuffer(chunk) &&
382
'string' !== typeof chunk &&
384
chunk !== undefined &&
387
er = new TypeError('Invalid non-string/buffer chunk');
393
function onEofChunk(stream, state) {
394
if (state.decoder && !state.ended) {
395
var chunk = state.decoder.end();
396
if (chunk && chunk.length) {
397
state.buffer.push(chunk);
398
state.length += state.objectMode ? 1 : chunk.length;
403
// if we've ended and we have some data left, then emit
404
// 'readable' now to make sure it gets picked up.
405
if (state.length > 0)
406
emitReadable(stream);
411
// Don't emit readable right away in sync mode, because this can trigger
412
// another read() call => stack overflow. This way, it might trigger
413
// a nextTick recursion warning, but that's not so bad.
414
function emitReadable(stream) {
415
var state = stream._readableState;
416
state.needReadable = false;
417
if (state.emittedReadable)
420
state.emittedReadable = true;
422
process.nextTick(function() {
423
emitReadable_(stream);
426
emitReadable_(stream);
429
function emitReadable_(stream) {
430
stream.emit('readable');
434
// at this point, the user has presumably seen the 'readable' event,
435
// and called read() to consume some data. that may have triggered
436
// in turn another _read(n) call, in which case reading = true if
438
// However, if we're not ended, or reading, and the length < hwm,
439
// then go ahead and try to read some more preemptively.
440
function maybeReadMore(stream, state) {
441
if (!state.readingMore) {
442
state.readingMore = true;
443
process.nextTick(function() {
444
maybeReadMore_(stream, state);
449
function maybeReadMore_(stream, state) {
450
var len = state.length;
451
while (!state.reading && !state.flowing && !state.ended &&
452
state.length < state.highWaterMark) {
454
if (len === state.length)
455
// didn't get any data, stop spinning.
460
state.readingMore = false;
463
// abstract method. to be overridden in specific implementation classes.
464
// call cb(er, data) where data is <= n in length.
465
// for virtual (non-string, non-buffer) streams, "length" is somewhat
466
// arbitrary, and perhaps not very meaningful.
467
Readable.prototype._read = function(n) {
468
this.emit('error', new Error('not implemented'));
471
Readable.prototype.pipe = function(dest, pipeOpts) {
473
var state = this._readableState;
475
switch (state.pipesCount) {
480
state.pipes = [state.pipes, dest];
483
state.pipes.push(dest);
486
state.pipesCount += 1;
488
var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
489
dest !== process.stdout &&
490
dest !== process.stderr;
492
var endFn = doEnd ? onend : cleanup;
493
if (state.endEmitted)
494
process.nextTick(endFn);
496
src.once('end', endFn);
498
dest.on('unpipe', onunpipe);
499
function onunpipe(readable) {
500
if (readable !== src) return;
508
// when the dest drains, it reduces the awaitDrain counter
509
// on the source. This would be more elegant with a .once()
510
// handler in flow(), but adding and removing repeatedly is
512
var ondrain = pipeOnDrain(src);
513
dest.on('drain', ondrain);
516
// cleanup event handlers once the pipe is broken
517
dest.removeListener('close', onclose);
518
dest.removeListener('finish', onfinish);
519
dest.removeListener('drain', ondrain);
520
dest.removeListener('error', onerror);
521
dest.removeListener('unpipe', onunpipe);
522
src.removeListener('end', onend);
523
src.removeListener('end', cleanup);
525
// if the reader is waiting for a drain event from this
526
// specific writer, then it would cause it to never start
528
// So, if this is awaiting a drain, then we just call it now.
529
// If we don't know, then assume that we are waiting for one.
530
if (!dest._writableState || dest._writableState.needDrain)
534
// if the dest has an error, then stop piping into it.
535
// however, don't suppress the throwing behavior for this.
536
function onerror(er) {
538
dest.removeListener('error', onerror);
539
if (EE.listenerCount(dest, 'error') === 0)
540
dest.emit('error', er);
542
// This is a brutally ugly hack to make sure that our error handler
543
// is attached before any userland ones. NEVER DO THIS.
544
if (!dest._events || !dest._events.error)
545
dest.on('error', onerror);
546
else if (isArray(dest._events.error))
547
dest._events.error.unshift(onerror);
549
dest._events.error = [onerror, dest._events.error];
553
// Both close and finish should trigger unpipe, but only once.
555
dest.removeListener('finish', onfinish);
558
dest.once('close', onclose);
559
function onfinish() {
560
dest.removeListener('close', onclose);
563
dest.once('finish', onfinish);
569
// tell the dest that it's being piped to
570
dest.emit('pipe', src);
572
// start the flow if it hasn't been started already.
573
if (!state.flowing) {
574
// the handler that waits for readable events after all
575
// the data gets sucked out in flow.
576
// This would be easier to follow with a .once() handler
577
// in flow(), but that is too slow.
578
this.on('readable', pipeOnReadable);
580
state.flowing = true;
581
process.nextTick(function() {
589
function pipeOnDrain(src) {
592
var state = src._readableState;
594
if (state.awaitDrain === 0)
600
var state = src._readableState;
602
state.awaitDrain = 0;
604
function write(dest, i, list) {
605
var written = dest.write(chunk);
606
if (false === written) {
611
while (state.pipesCount && null !== (chunk = src.read())) {
613
if (state.pipesCount === 1)
614
write(state.pipes, 0, null);
616
forEach(state.pipes, write);
618
src.emit('data', chunk);
620
// if anyone needs a drain, then we have to wait for that.
621
if (state.awaitDrain > 0)
625
// if every destination was unpiped, either before entering this
626
// function, or in the while loop, then stop flowing.
628
// NB: This is a pretty rare edge case.
629
if (state.pipesCount === 0) {
630
state.flowing = false;
632
// if there were data event listeners added, then switch to old mode.
633
if (EE.listenerCount(src, 'data') > 0)
638
// at this point, no one needed a drain, so we just ran out of data
639
// on the next readable event, start it over again.
643
function pipeOnReadable() {
644
if (this._readableState.ranOut) {
645
this._readableState.ranOut = false;
651
Readable.prototype.unpipe = function(dest) {
652
var state = this._readableState;
654
// if we're not piping anywhere, then do nothing.
655
if (state.pipesCount === 0)
658
// just one destination. most common case.
659
if (state.pipesCount === 1) {
660
// passed in one, but it's not the right one.
661
if (dest && dest !== state.pipes)
669
state.pipesCount = 0;
670
this.removeListener('readable', pipeOnReadable);
671
state.flowing = false;
673
dest.emit('unpipe', this);
677
// slow case. multiple pipe destinations.
681
var dests = state.pipes;
682
var len = state.pipesCount;
684
state.pipesCount = 0;
685
this.removeListener('readable', pipeOnReadable);
686
state.flowing = false;
688
for (var i = 0; i < len; i++)
689
dests[i].emit('unpipe', this);
693
// try to find the right one.
694
var i = indexOf(state.pipes, dest);
698
state.pipes.splice(i, 1);
699
state.pipesCount -= 1;
700
if (state.pipesCount === 1)
701
state.pipes = state.pipes[0];
703
dest.emit('unpipe', this);
708
// set up data events if they are asked for
709
// Ensure readable listeners eventually get something
710
Readable.prototype.on = function(ev, fn) {
711
var res = Stream.prototype.on.call(this, ev, fn);
713
if (ev === 'data' && !this._readableState.flowing)
714
emitDataEvents(this);
716
if (ev === 'readable' && this.readable) {
717
var state = this._readableState;
718
if (!state.readableListening) {
719
state.readableListening = true;
720
state.emittedReadable = false;
721
state.needReadable = true;
722
if (!state.reading) {
724
} else if (state.length) {
725
emitReadable(this, state);
732
Readable.prototype.addListener = Readable.prototype.on;
734
// pause() and resume() are remnants of the legacy readable stream API
735
// If the user uses them, then switch into old mode.
736
Readable.prototype.resume = function() {
737
emitDataEvents(this);
742
Readable.prototype.pause = function() {
743
emitDataEvents(this, true);
747
function emitDataEvents(stream, startPaused) {
748
var state = stream._readableState;
751
// https://github.com/isaacs/readable-stream/issues/16
752
throw new Error('Cannot switch to old mode now.');
755
var paused = startPaused || false;
756
var readable = false;
758
// convert to an old-style stream.
759
stream.readable = true;
760
stream.pipe = Stream.prototype.pipe;
761
stream.on = stream.addListener = Stream.prototype.on;
763
stream.on('readable', function() {
767
while (!paused && (null !== (c = stream.read())))
768
stream.emit('data', c);
772
stream._readableState.needReadable = true;
776
stream.pause = function() {
781
stream.resume = function() {
784
process.nextTick(function() {
785
stream.emit('readable');
792
// now make it start, just in case it hadn't already.
793
stream.emit('readable');
796
// wrap an old-style stream as the async data source.
797
// This is *not* part of the readable stream interface.
798
// It is an ugly unfortunate mess of history.
799
Readable.prototype.wrap = function(stream) {
800
var state = this._readableState;
804
stream.on('end', function() {
805
if (state.decoder && !state.ended) {
806
var chunk = state.decoder.end();
807
if (chunk && chunk.length)
814
stream.on('data', function(chunk) {
816
chunk = state.decoder.write(chunk);
817
if (!chunk || !state.objectMode && !chunk.length)
820
var ret = self.push(chunk);
827
// proxy all the other methods.
828
// important when wrapping filters and duplexes.
829
for (var i in stream) {
830
if (typeof stream[i] === 'function' &&
831
typeof this[i] === 'undefined') {
832
this[i] = function(method) { return function() {
833
return stream[method].apply(stream, arguments);
838
// proxy certain important events.
839
var events = ['error', 'close', 'destroy', 'pause', 'resume'];
840
forEach(events, function(ev) {
841
stream.on(ev, self.emit.bind(self, ev));
844
// when we try to consume some more bytes, simply unpause the
845
// underlying stream.
846
self._read = function(n) {
858
// exposed for testing purposes only.
859
Readable._fromList = fromList;
861
// Pluck off n bytes from an array of buffers.
862
// Length is the combined lengths of all the buffers in the list.
863
function fromList(n, state) {
864
var list = state.buffer;
865
var length = state.length;
866
var stringMode = !!state.decoder;
867
var objectMode = !!state.objectMode;
870
// nothing in the list, definitely empty.
871
if (list.length === 0)
878
else if (!n || n >= length) {
879
// read it all, truncate the array.
883
ret = Buffer.concat(list, length);
886
// read just some of it.
887
if (n < list[0].length) {
888
// just take a part of the first list item.
889
// slice is the same for buffers and strings.
891
ret = buf.slice(0, n);
892
list[0] = buf.slice(n);
893
} else if (n === list[0].length) {
894
// first list is a perfect match
898
// we have enough to cover it, but it spans past the first buffer.
905
for (var i = 0, l = list.length; i < l && c < n; i++) {
907
var cpy = Math.min(n - c, buf.length);
910
ret += buf.slice(0, cpy);
912
buf.copy(ret, c, 0, cpy);
914
if (cpy < buf.length)
915
list[0] = buf.slice(cpy);
927
function endReadable(stream) {
928
var state = stream._readableState;
930
// If we get here before consuming all the bytes, then that is a
931
// bug in node. Should never happen.
932
if (state.length > 0)
933
throw new Error('endReadable called on non-empty stream');
935
if (!state.endEmitted && state.calledRead) {
937
process.nextTick(function() {
938
// Check that we didn't get one last unshift.
939
if (!state.endEmitted && state.length === 0) {
940
state.endEmitted = true;
941
stream.readable = false;
948
function forEach (xs, f) {
949
for (var i = 0, l = xs.length; i < l; i++) {
954
function indexOf (xs, x) {
955
for (var i = 0, l = xs.length; i < l; i++) {
956
if (xs[i] === x) return i;