~ps-jenkins/ubuntu-push/ubuntu-vivid-proposed

« back to all changes in this revision

Viewing changes to docs/example-server/node_modules/mongodb/node_modules/readable-stream/lib/_stream_readable.js

  • Committer: Roberto Alsina
  • Date: 2014-09-05 14:57:17 UTC
  • mto: (91.179.25 automatic)
  • mto: This revision was merged to the branch mainline in revision 129.
  • Revision ID: roberto.alsina@canonical.com-20140905145717-0ufcsv27w25i1nnu
added example app server

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright Joyent, Inc. and other Node contributors.
 
2
//
 
3
// Permission is hereby granted, free of charge, to any person obtaining a
 
4
// copy of this software and associated documentation files (the
 
5
// "Software"), to deal in the Software without restriction, including
 
6
// without limitation the rights to use, copy, modify, merge, publish,
 
7
// distribute, sublicense, and/or sell copies of the Software, and to permit
 
8
// persons to whom the Software is furnished to do so, subject to the
 
9
// following conditions:
 
10
//
 
11
// The above copyright notice and this permission notice shall be included
 
12
// in all copies or substantial portions of the Software.
 
13
//
 
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 
15
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 
16
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
 
17
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 
18
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
 
19
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
 
20
// USE OR OTHER DEALINGS IN THE SOFTWARE.
 
21
 
 
22
module.exports = Readable;
 
23
 
 
24
/*<replacement>*/
 
25
var isArray = require('isarray');
 
26
/*</replacement>*/
 
27
 
 
28
 
 
29
/*<replacement>*/
 
30
var Buffer = require('buffer').Buffer;
 
31
/*</replacement>*/
 
32
 
 
33
Readable.ReadableState = ReadableState;
 
34
 
 
35
var EE = require('events').EventEmitter;
 
36
 
 
37
/*<replacement>*/
 
38
if (!EE.listenerCount) EE.listenerCount = function(emitter, type) {
 
39
  return emitter.listeners(type).length;
 
40
};
 
41
/*</replacement>*/
 
42
 
 
43
var Stream = require('stream');
 
44
 
 
45
/*<replacement>*/
 
46
var util = require('core-util-is');
 
47
util.inherits = require('inherits');
 
48
/*</replacement>*/
 
49
 
 
50
var StringDecoder;
 
51
 
 
52
util.inherits(Readable, Stream);
 
53
 
 
54
function ReadableState(options, stream) {
 
55
  options = options || {};
 
56
 
 
57
  // the point at which it stops calling _read() to fill the buffer
 
58
  // Note: 0 is a valid value, means "don't call _read preemptively ever"
 
59
  var hwm = options.highWaterMark;
 
60
  this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
 
61
 
 
62
  // cast to ints.
 
63
  this.highWaterMark = ~~this.highWaterMark;
 
64
 
 
65
  this.buffer = [];
 
66
  this.length = 0;
 
67
  this.pipes = null;
 
68
  this.pipesCount = 0;
 
69
  this.flowing = false;
 
70
  this.ended = false;
 
71
  this.endEmitted = false;
 
72
  this.reading = false;
 
73
 
 
74
  // In streams that never have any data, and do push(null) right away,
 
75
  // the consumer can miss the 'end' event if they do some I/O before
 
76
  // consuming the stream.  So, we don't emit('end') until some reading
 
77
  // happens.
 
78
  this.calledRead = false;
 
79
 
 
80
  // a flag to be able to tell if the onwrite cb is called immediately,
 
81
  // or on a later tick.  We set this to true at first, becuase any
 
82
  // actions that shouldn't happen until "later" should generally also
 
83
  // not happen before the first write call.
 
84
  this.sync = true;
 
85
 
 
86
  // whenever we return null, then we set a flag to say
 
87
  // that we're awaiting a 'readable' event emission.
 
88
  this.needReadable = false;
 
89
  this.emittedReadable = false;
 
90
  this.readableListening = false;
 
91
 
 
92
 
 
93
  // object stream flag. Used to make read(n) ignore n and to
 
94
  // make all the buffer merging and length checks go away
 
95
  this.objectMode = !!options.objectMode;
 
96
 
 
97
  // Crypto is kind of old and crusty.  Historically, its default string
 
98
  // encoding is 'binary' so we have to make this configurable.
 
99
  // Everything else in the universe uses 'utf8', though.
 
100
  this.defaultEncoding = options.defaultEncoding || 'utf8';
 
101
 
 
102
  // when piping, we only care about 'readable' events that happen
 
103
  // after read()ing all the bytes and not getting any pushback.
 
104
  this.ranOut = false;
 
105
 
 
106
  // the number of writers that are awaiting a drain event in .pipe()s
 
107
  this.awaitDrain = 0;
 
108
 
 
109
  // if true, a maybeReadMore has been scheduled
 
110
  this.readingMore = false;
 
111
 
 
112
  this.decoder = null;
 
113
  this.encoding = null;
 
114
  if (options.encoding) {
 
115
    if (!StringDecoder)
 
116
      StringDecoder = require('string_decoder/').StringDecoder;
 
117
    this.decoder = new StringDecoder(options.encoding);
 
118
    this.encoding = options.encoding;
 
119
  }
 
120
}
 
121
 
 
122
function Readable(options) {
 
123
  if (!(this instanceof Readable))
 
124
    return new Readable(options);
 
125
 
 
126
  this._readableState = new ReadableState(options, this);
 
127
 
 
128
  // legacy
 
129
  this.readable = true;
 
130
 
 
131
  Stream.call(this);
 
132
}
 
133
 
 
134
// Manually shove something into the read() buffer.
 
135
// This returns true if the highWaterMark has not been hit yet,
 
136
// similar to how Writable.write() returns true if you should
 
137
// write() some more.
 
138
Readable.prototype.push = function(chunk, encoding) {
 
139
  var state = this._readableState;
 
140
 
 
141
  if (typeof chunk === 'string' && !state.objectMode) {
 
142
    encoding = encoding || state.defaultEncoding;
 
143
    if (encoding !== state.encoding) {
 
144
      chunk = new Buffer(chunk, encoding);
 
145
      encoding = '';
 
146
    }
 
147
  }
 
148
 
 
149
  return readableAddChunk(this, state, chunk, encoding, false);
 
150
};
 
151
 
 
152
// Unshift should *always* be something directly out of read()
 
153
Readable.prototype.unshift = function(chunk) {
 
154
  var state = this._readableState;
 
155
  return readableAddChunk(this, state, chunk, '', true);
 
156
};
 
157
 
 
158
function readableAddChunk(stream, state, chunk, encoding, addToFront) {
 
159
  var er = chunkInvalid(state, chunk);
 
160
  if (er) {
 
161
    stream.emit('error', er);
 
162
  } else if (chunk === null || chunk === undefined) {
 
163
    state.reading = false;
 
164
    if (!state.ended)
 
165
      onEofChunk(stream, state);
 
166
  } else if (state.objectMode || chunk && chunk.length > 0) {
 
167
    if (state.ended && !addToFront) {
 
168
      var e = new Error('stream.push() after EOF');
 
169
      stream.emit('error', e);
 
170
    } else if (state.endEmitted && addToFront) {
 
171
      var e = new Error('stream.unshift() after end event');
 
172
      stream.emit('error', e);
 
173
    } else {
 
174
      if (state.decoder && !addToFront && !encoding)
 
175
        chunk = state.decoder.write(chunk);
 
176
 
 
177
      // update the buffer info.
 
178
      state.length += state.objectMode ? 1 : chunk.length;
 
179
      if (addToFront) {
 
180
        state.buffer.unshift(chunk);
 
181
      } else {
 
182
        state.reading = false;
 
183
        state.buffer.push(chunk);
 
184
      }
 
185
 
 
186
      if (state.needReadable)
 
187
        emitReadable(stream);
 
188
 
 
189
      maybeReadMore(stream, state);
 
190
    }
 
191
  } else if (!addToFront) {
 
192
    state.reading = false;
 
193
  }
 
194
 
 
195
  return needMoreData(state);
 
196
}
 
197
 
 
198
 
 
199
 
 
200
// if it's past the high water mark, we can push in some more.
 
201
// Also, if we have no data yet, we can stand some
 
202
// more bytes.  This is to work around cases where hwm=0,
 
203
// such as the repl.  Also, if the push() triggered a
 
204
// readable event, and the user called read(largeNumber) such that
 
205
// needReadable was set, then we ought to push more, so that another
 
206
// 'readable' event will be triggered.
 
207
function needMoreData(state) {
 
208
  return !state.ended &&
 
209
         (state.needReadable ||
 
210
          state.length < state.highWaterMark ||
 
211
          state.length === 0);
 
212
}
 
213
 
 
214
// backwards compatibility.
 
215
Readable.prototype.setEncoding = function(enc) {
 
216
  if (!StringDecoder)
 
217
    StringDecoder = require('string_decoder/').StringDecoder;
 
218
  this._readableState.decoder = new StringDecoder(enc);
 
219
  this._readableState.encoding = enc;
 
220
};
 
221
 
 
222
// Don't raise the hwm > 128MB
 
223
var MAX_HWM = 0x800000;
 
224
function roundUpToNextPowerOf2(n) {
 
225
  if (n >= MAX_HWM) {
 
226
    n = MAX_HWM;
 
227
  } else {
 
228
    // Get the next highest power of 2
 
229
    n--;
 
230
    for (var p = 1; p < 32; p <<= 1) n |= n >> p;
 
231
    n++;
 
232
  }
 
233
  return n;
 
234
}
 
235
 
 
236
function howMuchToRead(n, state) {
 
237
  if (state.length === 0 && state.ended)
 
238
    return 0;
 
239
 
 
240
  if (state.objectMode)
 
241
    return n === 0 ? 0 : 1;
 
242
 
 
243
  if (isNaN(n) || n === null) {
 
244
    // only flow one buffer at a time
 
245
    if (state.flowing && state.buffer.length)
 
246
      return state.buffer[0].length;
 
247
    else
 
248
      return state.length;
 
249
  }
 
250
 
 
251
  if (n <= 0)
 
252
    return 0;
 
253
 
 
254
  // If we're asking for more than the target buffer level,
 
255
  // then raise the water mark.  Bump up to the next highest
 
256
  // power of 2, to prevent increasing it excessively in tiny
 
257
  // amounts.
 
258
  if (n > state.highWaterMark)
 
259
    state.highWaterMark = roundUpToNextPowerOf2(n);
 
260
 
 
261
  // don't have that much.  return null, unless we've ended.
 
262
  if (n > state.length) {
 
263
    if (!state.ended) {
 
264
      state.needReadable = true;
 
265
      return 0;
 
266
    } else
 
267
      return state.length;
 
268
  }
 
269
 
 
270
  return n;
 
271
}
 
272
 
 
273
// you can override either this method, or the async _read(n) below.
 
274
Readable.prototype.read = function(n) {
 
275
  var state = this._readableState;
 
276
  state.calledRead = true;
 
277
  var nOrig = n;
 
278
 
 
279
  if (typeof n !== 'number' || n > 0)
 
280
    state.emittedReadable = false;
 
281
 
 
282
  // if we're doing read(0) to trigger a readable event, but we
 
283
  // already have a bunch of data in the buffer, then just trigger
 
284
  // the 'readable' event and move on.
 
285
  if (n === 0 &&
 
286
      state.needReadable &&
 
287
      (state.length >= state.highWaterMark || state.ended)) {
 
288
    emitReadable(this);
 
289
    return null;
 
290
  }
 
291
 
 
292
  n = howMuchToRead(n, state);
 
293
 
 
294
  // if we've ended, and we're now clear, then finish it up.
 
295
  if (n === 0 && state.ended) {
 
296
    if (state.length === 0)
 
297
      endReadable(this);
 
298
    return null;
 
299
  }
 
300
 
 
301
  // All the actual chunk generation logic needs to be
 
302
  // *below* the call to _read.  The reason is that in certain
 
303
  // synthetic stream cases, such as passthrough streams, _read
 
304
  // may be a completely synchronous operation which may change
 
305
  // the state of the read buffer, providing enough data when
 
306
  // before there was *not* enough.
 
307
  //
 
308
  // So, the steps are:
 
309
  // 1. Figure out what the state of things will be after we do
 
310
  // a read from the buffer.
 
311
  //
 
312
  // 2. If that resulting state will trigger a _read, then call _read.
 
313
  // Note that this may be asynchronous, or synchronous.  Yes, it is
 
314
  // deeply ugly to write APIs this way, but that still doesn't mean
 
315
  // that the Readable class should behave improperly, as streams are
 
316
  // designed to be sync/async agnostic.
 
317
  // Take note if the _read call is sync or async (ie, if the read call
 
318
  // has returned yet), so that we know whether or not it's safe to emit
 
319
  // 'readable' etc.
 
320
  //
 
321
  // 3. Actually pull the requested chunks out of the buffer and return.
 
322
 
 
323
  // if we need a readable event, then we need to do some reading.
 
324
  var doRead = state.needReadable;
 
325
 
 
326
  // if we currently have less than the highWaterMark, then also read some
 
327
  if (state.length - n <= state.highWaterMark)
 
328
    doRead = true;
 
329
 
 
330
  // however, if we've ended, then there's no point, and if we're already
 
331
  // reading, then it's unnecessary.
 
332
  if (state.ended || state.reading)
 
333
    doRead = false;
 
334
 
 
335
  if (doRead) {
 
336
    state.reading = true;
 
337
    state.sync = true;
 
338
    // if the length is currently zero, then we *need* a readable event.
 
339
    if (state.length === 0)
 
340
      state.needReadable = true;
 
341
    // call internal read method
 
342
    this._read(state.highWaterMark);
 
343
    state.sync = false;
 
344
  }
 
345
 
 
346
  // If _read called its callback synchronously, then `reading`
 
347
  // will be false, and we need to re-evaluate how much data we
 
348
  // can return to the user.
 
349
  if (doRead && !state.reading)
 
350
    n = howMuchToRead(nOrig, state);
 
351
 
 
352
  var ret;
 
353
  if (n > 0)
 
354
    ret = fromList(n, state);
 
355
  else
 
356
    ret = null;
 
357
 
 
358
  if (ret === null) {
 
359
    state.needReadable = true;
 
360
    n = 0;
 
361
  }
 
362
 
 
363
  state.length -= n;
 
364
 
 
365
  // If we have nothing in the buffer, then we want to know
 
366
  // as soon as we *do* get something into the buffer.
 
367
  if (state.length === 0 && !state.ended)
 
368
    state.needReadable = true;
 
369
 
 
370
  // If we happened to read() exactly the remaining amount in the
 
371
  // buffer, and the EOF has been seen at this point, then make sure
 
372
  // that we emit 'end' on the very next tick.
 
373
  if (state.ended && !state.endEmitted && state.length === 0)
 
374
    endReadable(this);
 
375
 
 
376
  return ret;
 
377
};
 
378
 
 
379
function chunkInvalid(state, chunk) {
 
380
  var er = null;
 
381
  if (!Buffer.isBuffer(chunk) &&
 
382
      'string' !== typeof chunk &&
 
383
      chunk !== null &&
 
384
      chunk !== undefined &&
 
385
      !state.objectMode &&
 
386
      !er) {
 
387
    er = new TypeError('Invalid non-string/buffer chunk');
 
388
  }
 
389
  return er;
 
390
}
 
391
 
 
392
 
 
393
function onEofChunk(stream, state) {
 
394
  if (state.decoder && !state.ended) {
 
395
    var chunk = state.decoder.end();
 
396
    if (chunk && chunk.length) {
 
397
      state.buffer.push(chunk);
 
398
      state.length += state.objectMode ? 1 : chunk.length;
 
399
    }
 
400
  }
 
401
  state.ended = true;
 
402
 
 
403
  // if we've ended and we have some data left, then emit
 
404
  // 'readable' now to make sure it gets picked up.
 
405
  if (state.length > 0)
 
406
    emitReadable(stream);
 
407
  else
 
408
    endReadable(stream);
 
409
}
 
410
 
 
411
// Don't emit readable right away in sync mode, because this can trigger
 
412
// another read() call => stack overflow.  This way, it might trigger
 
413
// a nextTick recursion warning, but that's not so bad.
 
414
function emitReadable(stream) {
 
415
  var state = stream._readableState;
 
416
  state.needReadable = false;
 
417
  if (state.emittedReadable)
 
418
    return;
 
419
 
 
420
  state.emittedReadable = true;
 
421
  if (state.sync)
 
422
    process.nextTick(function() {
 
423
      emitReadable_(stream);
 
424
    });
 
425
  else
 
426
    emitReadable_(stream);
 
427
}
 
428
 
 
429
function emitReadable_(stream) {
 
430
  stream.emit('readable');
 
431
}
 
432
 
 
433
 
 
434
// at this point, the user has presumably seen the 'readable' event,
 
435
// and called read() to consume some data.  that may have triggered
 
436
// in turn another _read(n) call, in which case reading = true if
 
437
// it's in progress.
 
438
// However, if we're not ended, or reading, and the length < hwm,
 
439
// then go ahead and try to read some more preemptively.
 
440
function maybeReadMore(stream, state) {
 
441
  if (!state.readingMore) {
 
442
    state.readingMore = true;
 
443
    process.nextTick(function() {
 
444
      maybeReadMore_(stream, state);
 
445
    });
 
446
  }
 
447
}
 
448
 
 
449
function maybeReadMore_(stream, state) {
 
450
  var len = state.length;
 
451
  while (!state.reading && !state.flowing && !state.ended &&
 
452
         state.length < state.highWaterMark) {
 
453
    stream.read(0);
 
454
    if (len === state.length)
 
455
      // didn't get any data, stop spinning.
 
456
      break;
 
457
    else
 
458
      len = state.length;
 
459
  }
 
460
  state.readingMore = false;
 
461
}
 
462
 
 
463
// abstract method.  to be overridden in specific implementation classes.
 
464
// call cb(er, data) where data is <= n in length.
 
465
// for virtual (non-string, non-buffer) streams, "length" is somewhat
 
466
// arbitrary, and perhaps not very meaningful.
 
467
Readable.prototype._read = function(n) {
 
468
  this.emit('error', new Error('not implemented'));
 
469
};
 
470
 
 
471
Readable.prototype.pipe = function(dest, pipeOpts) {
 
472
  var src = this;
 
473
  var state = this._readableState;
 
474
 
 
475
  switch (state.pipesCount) {
 
476
    case 0:
 
477
      state.pipes = dest;
 
478
      break;
 
479
    case 1:
 
480
      state.pipes = [state.pipes, dest];
 
481
      break;
 
482
    default:
 
483
      state.pipes.push(dest);
 
484
      break;
 
485
  }
 
486
  state.pipesCount += 1;
 
487
 
 
488
  var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
 
489
              dest !== process.stdout &&
 
490
              dest !== process.stderr;
 
491
 
 
492
  var endFn = doEnd ? onend : cleanup;
 
493
  if (state.endEmitted)
 
494
    process.nextTick(endFn);
 
495
  else
 
496
    src.once('end', endFn);
 
497
 
 
498
  dest.on('unpipe', onunpipe);
 
499
  function onunpipe(readable) {
 
500
    if (readable !== src) return;
 
501
    cleanup();
 
502
  }
 
503
 
 
504
  function onend() {
 
505
    dest.end();
 
506
  }
 
507
 
 
508
  // when the dest drains, it reduces the awaitDrain counter
 
509
  // on the source.  This would be more elegant with a .once()
 
510
  // handler in flow(), but adding and removing repeatedly is
 
511
  // too slow.
 
512
  var ondrain = pipeOnDrain(src);
 
513
  dest.on('drain', ondrain);
 
514
 
 
515
  function cleanup() {
 
516
    // cleanup event handlers once the pipe is broken
 
517
    dest.removeListener('close', onclose);
 
518
    dest.removeListener('finish', onfinish);
 
519
    dest.removeListener('drain', ondrain);
 
520
    dest.removeListener('error', onerror);
 
521
    dest.removeListener('unpipe', onunpipe);
 
522
    src.removeListener('end', onend);
 
523
    src.removeListener('end', cleanup);
 
524
 
 
525
    // if the reader is waiting for a drain event from this
 
526
    // specific writer, then it would cause it to never start
 
527
    // flowing again.
 
528
    // So, if this is awaiting a drain, then we just call it now.
 
529
    // If we don't know, then assume that we are waiting for one.
 
530
    if (!dest._writableState || dest._writableState.needDrain)
 
531
      ondrain();
 
532
  }
 
533
 
 
534
  // if the dest has an error, then stop piping into it.
 
535
  // however, don't suppress the throwing behavior for this.
 
536
  function onerror(er) {
 
537
    unpipe();
 
538
    dest.removeListener('error', onerror);
 
539
    if (EE.listenerCount(dest, 'error') === 0)
 
540
      dest.emit('error', er);
 
541
  }
 
542
  // This is a brutally ugly hack to make sure that our error handler
 
543
  // is attached before any userland ones.  NEVER DO THIS.
 
544
  if (!dest._events || !dest._events.error)
 
545
    dest.on('error', onerror);
 
546
  else if (isArray(dest._events.error))
 
547
    dest._events.error.unshift(onerror);
 
548
  else
 
549
    dest._events.error = [onerror, dest._events.error];
 
550
 
 
551
 
 
552
 
 
553
  // Both close and finish should trigger unpipe, but only once.
 
554
  function onclose() {
 
555
    dest.removeListener('finish', onfinish);
 
556
    unpipe();
 
557
  }
 
558
  dest.once('close', onclose);
 
559
  function onfinish() {
 
560
    dest.removeListener('close', onclose);
 
561
    unpipe();
 
562
  }
 
563
  dest.once('finish', onfinish);
 
564
 
 
565
  function unpipe() {
 
566
    src.unpipe(dest);
 
567
  }
 
568
 
 
569
  // tell the dest that it's being piped to
 
570
  dest.emit('pipe', src);
 
571
 
 
572
  // start the flow if it hasn't been started already.
 
573
  if (!state.flowing) {
 
574
    // the handler that waits for readable events after all
 
575
    // the data gets sucked out in flow.
 
576
    // This would be easier to follow with a .once() handler
 
577
    // in flow(), but that is too slow.
 
578
    this.on('readable', pipeOnReadable);
 
579
 
 
580
    state.flowing = true;
 
581
    process.nextTick(function() {
 
582
      flow(src);
 
583
    });
 
584
  }
 
585
 
 
586
  return dest;
 
587
};
 
588
 
 
589
function pipeOnDrain(src) {
 
590
  return function() {
 
591
    var dest = this;
 
592
    var state = src._readableState;
 
593
    state.awaitDrain--;
 
594
    if (state.awaitDrain === 0)
 
595
      flow(src);
 
596
  };
 
597
}
 
598
 
 
599
function flow(src) {
 
600
  var state = src._readableState;
 
601
  var chunk;
 
602
  state.awaitDrain = 0;
 
603
 
 
604
  function write(dest, i, list) {
 
605
    var written = dest.write(chunk);
 
606
    if (false === written) {
 
607
      state.awaitDrain++;
 
608
    }
 
609
  }
 
610
 
 
611
  while (state.pipesCount && null !== (chunk = src.read())) {
 
612
 
 
613
    if (state.pipesCount === 1)
 
614
      write(state.pipes, 0, null);
 
615
    else
 
616
      forEach(state.pipes, write);
 
617
 
 
618
    src.emit('data', chunk);
 
619
 
 
620
    // if anyone needs a drain, then we have to wait for that.
 
621
    if (state.awaitDrain > 0)
 
622
      return;
 
623
  }
 
624
 
 
625
  // if every destination was unpiped, either before entering this
 
626
  // function, or in the while loop, then stop flowing.
 
627
  //
 
628
  // NB: This is a pretty rare edge case.
 
629
  if (state.pipesCount === 0) {
 
630
    state.flowing = false;
 
631
 
 
632
    // if there were data event listeners added, then switch to old mode.
 
633
    if (EE.listenerCount(src, 'data') > 0)
 
634
      emitDataEvents(src);
 
635
    return;
 
636
  }
 
637
 
 
638
  // at this point, no one needed a drain, so we just ran out of data
 
639
  // on the next readable event, start it over again.
 
640
  state.ranOut = true;
 
641
}
 
642
 
 
643
function pipeOnReadable() {
 
644
  if (this._readableState.ranOut) {
 
645
    this._readableState.ranOut = false;
 
646
    flow(this);
 
647
  }
 
648
}
 
649
 
 
650
 
 
651
Readable.prototype.unpipe = function(dest) {
 
652
  var state = this._readableState;
 
653
 
 
654
  // if we're not piping anywhere, then do nothing.
 
655
  if (state.pipesCount === 0)
 
656
    return this;
 
657
 
 
658
  // just one destination.  most common case.
 
659
  if (state.pipesCount === 1) {
 
660
    // passed in one, but it's not the right one.
 
661
    if (dest && dest !== state.pipes)
 
662
      return this;
 
663
 
 
664
    if (!dest)
 
665
      dest = state.pipes;
 
666
 
 
667
    // got a match.
 
668
    state.pipes = null;
 
669
    state.pipesCount = 0;
 
670
    this.removeListener('readable', pipeOnReadable);
 
671
    state.flowing = false;
 
672
    if (dest)
 
673
      dest.emit('unpipe', this);
 
674
    return this;
 
675
  }
 
676
 
 
677
  // slow case. multiple pipe destinations.
 
678
 
 
679
  if (!dest) {
 
680
    // remove all.
 
681
    var dests = state.pipes;
 
682
    var len = state.pipesCount;
 
683
    state.pipes = null;
 
684
    state.pipesCount = 0;
 
685
    this.removeListener('readable', pipeOnReadable);
 
686
    state.flowing = false;
 
687
 
 
688
    for (var i = 0; i < len; i++)
 
689
      dests[i].emit('unpipe', this);
 
690
    return this;
 
691
  }
 
692
 
 
693
  // try to find the right one.
 
694
  var i = indexOf(state.pipes, dest);
 
695
  if (i === -1)
 
696
    return this;
 
697
 
 
698
  state.pipes.splice(i, 1);
 
699
  state.pipesCount -= 1;
 
700
  if (state.pipesCount === 1)
 
701
    state.pipes = state.pipes[0];
 
702
 
 
703
  dest.emit('unpipe', this);
 
704
 
 
705
  return this;
 
706
};
 
707
 
 
708
// set up data events if they are asked for
 
709
// Ensure readable listeners eventually get something
 
710
Readable.prototype.on = function(ev, fn) {
 
711
  var res = Stream.prototype.on.call(this, ev, fn);
 
712
 
 
713
  if (ev === 'data' && !this._readableState.flowing)
 
714
    emitDataEvents(this);
 
715
 
 
716
  if (ev === 'readable' && this.readable) {
 
717
    var state = this._readableState;
 
718
    if (!state.readableListening) {
 
719
      state.readableListening = true;
 
720
      state.emittedReadable = false;
 
721
      state.needReadable = true;
 
722
      if (!state.reading) {
 
723
        this.read(0);
 
724
      } else if (state.length) {
 
725
        emitReadable(this, state);
 
726
      }
 
727
    }
 
728
  }
 
729
 
 
730
  return res;
 
731
};
 
732
Readable.prototype.addListener = Readable.prototype.on;
 
733
 
 
734
// pause() and resume() are remnants of the legacy readable stream API
 
735
// If the user uses them, then switch into old mode.
 
736
Readable.prototype.resume = function() {
 
737
  emitDataEvents(this);
 
738
  this.read(0);
 
739
  this.emit('resume');
 
740
};
 
741
 
 
742
Readable.prototype.pause = function() {
 
743
  emitDataEvents(this, true);
 
744
  this.emit('pause');
 
745
};
 
746
 
 
747
function emitDataEvents(stream, startPaused) {
 
748
  var state = stream._readableState;
 
749
 
 
750
  if (state.flowing) {
 
751
    // https://github.com/isaacs/readable-stream/issues/16
 
752
    throw new Error('Cannot switch to old mode now.');
 
753
  }
 
754
 
 
755
  var paused = startPaused || false;
 
756
  var readable = false;
 
757
 
 
758
  // convert to an old-style stream.
 
759
  stream.readable = true;
 
760
  stream.pipe = Stream.prototype.pipe;
 
761
  stream.on = stream.addListener = Stream.prototype.on;
 
762
 
 
763
  stream.on('readable', function() {
 
764
    readable = true;
 
765
 
 
766
    var c;
 
767
    while (!paused && (null !== (c = stream.read())))
 
768
      stream.emit('data', c);
 
769
 
 
770
    if (c === null) {
 
771
      readable = false;
 
772
      stream._readableState.needReadable = true;
 
773
    }
 
774
  });
 
775
 
 
776
  stream.pause = function() {
 
777
    paused = true;
 
778
    this.emit('pause');
 
779
  };
 
780
 
 
781
  stream.resume = function() {
 
782
    paused = false;
 
783
    if (readable)
 
784
      process.nextTick(function() {
 
785
        stream.emit('readable');
 
786
      });
 
787
    else
 
788
      this.read(0);
 
789
    this.emit('resume');
 
790
  };
 
791
 
 
792
  // now make it start, just in case it hadn't already.
 
793
  stream.emit('readable');
 
794
}
 
795
 
 
796
// wrap an old-style stream as the async data source.
 
797
// This is *not* part of the readable stream interface.
 
798
// It is an ugly unfortunate mess of history.
 
799
Readable.prototype.wrap = function(stream) {
 
800
  var state = this._readableState;
 
801
  var paused = false;
 
802
 
 
803
  var self = this;
 
804
  stream.on('end', function() {
 
805
    if (state.decoder && !state.ended) {
 
806
      var chunk = state.decoder.end();
 
807
      if (chunk && chunk.length)
 
808
        self.push(chunk);
 
809
    }
 
810
 
 
811
    self.push(null);
 
812
  });
 
813
 
 
814
  stream.on('data', function(chunk) {
 
815
    if (state.decoder)
 
816
      chunk = state.decoder.write(chunk);
 
817
    if (!chunk || !state.objectMode && !chunk.length)
 
818
      return;
 
819
 
 
820
    var ret = self.push(chunk);
 
821
    if (!ret) {
 
822
      paused = true;
 
823
      stream.pause();
 
824
    }
 
825
  });
 
826
 
 
827
  // proxy all the other methods.
 
828
  // important when wrapping filters and duplexes.
 
829
  for (var i in stream) {
 
830
    if (typeof stream[i] === 'function' &&
 
831
        typeof this[i] === 'undefined') {
 
832
      this[i] = function(method) { return function() {
 
833
        return stream[method].apply(stream, arguments);
 
834
      }}(i);
 
835
    }
 
836
  }
 
837
 
 
838
  // proxy certain important events.
 
839
  var events = ['error', 'close', 'destroy', 'pause', 'resume'];
 
840
  forEach(events, function(ev) {
 
841
    stream.on(ev, self.emit.bind(self, ev));
 
842
  });
 
843
 
 
844
  // when we try to consume some more bytes, simply unpause the
 
845
  // underlying stream.
 
846
  self._read = function(n) {
 
847
    if (paused) {
 
848
      paused = false;
 
849
      stream.resume();
 
850
    }
 
851
  };
 
852
 
 
853
  return self;
 
854
};
 
855
 
 
856
 
 
857
 
 
858
// exposed for testing purposes only.
 
859
Readable._fromList = fromList;
 
860
 
 
861
// Pluck off n bytes from an array of buffers.
 
862
// Length is the combined lengths of all the buffers in the list.
 
863
function fromList(n, state) {
 
864
  var list = state.buffer;
 
865
  var length = state.length;
 
866
  var stringMode = !!state.decoder;
 
867
  var objectMode = !!state.objectMode;
 
868
  var ret;
 
869
 
 
870
  // nothing in the list, definitely empty.
 
871
  if (list.length === 0)
 
872
    return null;
 
873
 
 
874
  if (length === 0)
 
875
    ret = null;
 
876
  else if (objectMode)
 
877
    ret = list.shift();
 
878
  else if (!n || n >= length) {
 
879
    // read it all, truncate the array.
 
880
    if (stringMode)
 
881
      ret = list.join('');
 
882
    else
 
883
      ret = Buffer.concat(list, length);
 
884
    list.length = 0;
 
885
  } else {
 
886
    // read just some of it.
 
887
    if (n < list[0].length) {
 
888
      // just take a part of the first list item.
 
889
      // slice is the same for buffers and strings.
 
890
      var buf = list[0];
 
891
      ret = buf.slice(0, n);
 
892
      list[0] = buf.slice(n);
 
893
    } else if (n === list[0].length) {
 
894
      // first list is a perfect match
 
895
      ret = list.shift();
 
896
    } else {
 
897
      // complex case.
 
898
      // we have enough to cover it, but it spans past the first buffer.
 
899
      if (stringMode)
 
900
        ret = '';
 
901
      else
 
902
        ret = new Buffer(n);
 
903
 
 
904
      var c = 0;
 
905
      for (var i = 0, l = list.length; i < l && c < n; i++) {
 
906
        var buf = list[0];
 
907
        var cpy = Math.min(n - c, buf.length);
 
908
 
 
909
        if (stringMode)
 
910
          ret += buf.slice(0, cpy);
 
911
        else
 
912
          buf.copy(ret, c, 0, cpy);
 
913
 
 
914
        if (cpy < buf.length)
 
915
          list[0] = buf.slice(cpy);
 
916
        else
 
917
          list.shift();
 
918
 
 
919
        c += cpy;
 
920
      }
 
921
    }
 
922
  }
 
923
 
 
924
  return ret;
 
925
}
 
926
 
 
927
function endReadable(stream) {
 
928
  var state = stream._readableState;
 
929
 
 
930
  // If we get here before consuming all the bytes, then that is a
 
931
  // bug in node.  Should never happen.
 
932
  if (state.length > 0)
 
933
    throw new Error('endReadable called on non-empty stream');
 
934
 
 
935
  if (!state.endEmitted && state.calledRead) {
 
936
    state.ended = true;
 
937
    process.nextTick(function() {
 
938
      // Check that we didn't get one last unshift.
 
939
      if (!state.endEmitted && state.length === 0) {
 
940
        state.endEmitted = true;
 
941
        stream.readable = false;
 
942
        stream.emit('end');
 
943
      }
 
944
    });
 
945
  }
 
946
}
 
947
 
 
948
function forEach (xs, f) {
 
949
  for (var i = 0, l = xs.length; i < l; i++) {
 
950
    f(xs[i], i);
 
951
  }
 
952
}
 
953
 
 
954
function indexOf (xs, x) {
 
955
  for (var i = 0, l = xs.length; i < l; i++) {
 
956
    if (xs[i] === x) return i;
 
957
  }
 
958
  return -1;
 
959
}