1
// Copyright Joyent, Inc. and other Node contributors.
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the
5
// "Software"), to deal in the Software without restriction, including
6
// without limitation the rights to use, copy, modify, merge, publish,
7
// distribute, sublicense, and/or sell copies of the Software, and to permit
8
// persons to whom the Software is furnished to do so, subject to the
9
// following conditions:
11
// The above copyright notice and this permission notice shall be included
12
// in all copies or substantial portions of the Software.
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20
// USE OR OTHER DEALINGS IN THE SOFTWARE.
23
var common = require('../common.js');
24
var R = require('_stream_readable');
25
var assert = require('assert');
27
var util = require('util');
28
var EE = require('events').EventEmitter;
30
function TestReader(n) {
32
this._buffer = new Buffer(n || 100);
33
this._buffer.fill('x');
38
util.inherits(TestReader, R);
40
TestReader.prototype.read = function(n) {
41
if (n === 0) return null;
42
var max = this._buffer.length - this._pos;
45
var toRead = Math.min(n, max);
47
// simulate the read buffer filling up with some more bytes some time
49
setTimeout(function() {
52
if (this._bufs <= 0) {
59
this.emit('readable');
65
var ret = this._buffer.slice(this._pos, this._pos + toRead);
72
function TestWriter() {
78
util.inherits(TestWriter, EE);
80
TestWriter.prototype.write = function(c) {
81
this.received.push(c.toString());
82
this.emit('write', c);
86
TestWriter.prototype.end = function(c) {
88
this.emit('end', this.received);
93
// tiny node-tap lookalike.
97
function test(name, fn) {
99
tests.push([name, fn]);
103
var next = tests.shift();
105
return console.error('ok');
109
console.log('# %s', name);
111
same: assert.deepEqual,
121
// ensure all tests have run
122
process.on("exit", function () {
123
assert.equal(count, 0);
126
process.nextTick(run);
129
test('a most basic test', function(t) {
130
var r = new TestReader(20);
146
'xxxxxxxxxxxxxxxxxx',
148
'xxxxxxxxxxxxxxxxxxxx',
149
'xxxxxxxxxxxxxxxxxxxx',
150
'xxxxxxxxxxxxxxxxxxxx',
151
'xxxxxxxxxxxxxxxxxxxx',
152
'xxxxxxxxxxxxxxxxxxxx' ];
154
r.on('end', function() {
155
t.same(reads, expect);
162
while (null !== (res = r.read(readSize++))) {
163
reads.push(res.toString());
165
r.once('readable', flow);
171
test('pipe', function(t) {
172
var r = new TestReader(5);
174
var expect = [ 'xxxxx',
185
var w = new TestWriter;
188
w.on('end', function(received) {
189
t.same(received, expect);
198
[1,2,3,4,5,6,7,8,9].forEach(function(SPLIT) {
199
test('unpipe', function(t) {
200
var r = new TestReader(5);
202
// unpipe after 3 writes, then write to another stream instead.
203
var expect = [ 'xxxxx',
213
expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ];
215
var w = [ new TestWriter(), new TestWriter() ];
218
w[0].on('write', function() {
219
if (--writes === 0) {
221
t.equal(r._readableState.pipes, null);
224
t.equal(r._readableState.pipes, w[1]);
232
w[0].on('end', function(results) {
233
t.equal(ended0, false);
236
t.same(results, expect[0]);
239
w[1].on('end', function(results) {
240
t.equal(ended1, false);
244
t.same(results, expect[1]);
253
// both writers should get the same exact data.
254
test('multipipe', function(t) {
255
var r = new TestReader(5);
256
var w = [ new TestWriter, new TestWriter ];
258
var expect = [ 'xxxxx',
270
w[0].on('end', function(received) {
271
t.same(received, expect, 'first');
272
if (--c === 0) t.end();
274
w[1].on('end', function(received) {
275
t.same(received, expect, 'second');
276
if (--c === 0) t.end();
284
[1,2,3,4,5,6,7,8,9].forEach(function(SPLIT) {
285
test('multi-unpipe', function(t) {
286
var r = new TestReader(5);
288
// unpipe after 3 writes, then write to another stream instead.
289
var expect = [ 'xxxxx',
299
expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ];
301
var w = [ new TestWriter(), new TestWriter(), new TestWriter() ];
304
w[0].on('write', function() {
305
if (--writes === 0) {
314
w[0].on('end', function(results) {
316
t.same(results, expect[0]);
319
w[1].on('end', function(results) {
322
t.same(results, expect[1]);
331
test('back pressure respected', function (t) {
334
var r = new R({ objectMode: true });
344
w1.write = function (chunk) {
345
assert.equal(chunk[0], "one");
347
process.nextTick(function () {
356
var expected = ["two", "two", "three", "three", "four", "four"];
359
w2.write = function (chunk) {
360
assert.equal(chunk[0], expected.shift());
361
assert.equal(counter, 0);
365
if (chunk[0] === "four") {
369
setTimeout(function () {
379
w3.write = function (chunk) {
380
assert.equal(chunk[0], expected.shift());
381
assert.equal(counter, 1);
385
if (chunk[0] === "four") {
389
setTimeout(function () {
396
w3.end = function () {
397
assert.equal(counter, 2);
398
assert.equal(expected.length, 0);
403
test('read(0) for ended streams', function (t) {
407
r._read = function (n) {};
409
r.push(new Buffer("foo"));
414
assert.equal(v, null);
418
w.write = function (buffer) {
420
assert.equal(ended, false);
421
assert.equal(buffer.toString(), "foo")
424
w.end = function () {
426
assert.equal(written, true);
433
test('sync _read ending', function (t) {
436
r._read = function (n) {
440
r.once('end', function () {
446
process.nextTick(function () {
447
assert.equal(called, true);
452
test('adding readable triggers data flow', function(t) {
453
var r = new R({ highWaterMark: 5 });
454
var onReadable = false;
457
r._read = function(n) {
458
if (readCalled++ === 2)
461
r.push(new Buffer('asdf'));
465
r.on('readable', function() {
470
r.on('end', function() {
471
t.equal(readCalled, 3);