1
by Jérémy Lal
Import upstream version 0.0.5 |
1 |
// write data to it, and it'll emit data in 512 byte blocks.
|
2 |
// if you .end() or .flush(), it'll emit whatever it's got,
|
|
3 |
// padded with nulls to 512 bytes.
|
|
4 |
||
5 |
module.exports = BlockStream |
|
6 |
||
7 |
var Stream = require("stream").Stream |
|
8 |
, inherits = require("inherits") |
|
9 |
, assert = require("assert").ok |
|
10 |
, debug = process.env.DEBUG ? console.error : function () {} |
|
11 |
||
12 |
function BlockStream (size, opt) { |
|
13 |
this.writable = this.readable = true |
|
14 |
this._opt = opt || {} |
|
15 |
this._chunkSize = size || 512 |
|
16 |
this._offset = 0 |
|
17 |
this._buffer = [] |
|
18 |
this._bufferLength = 0 |
|
19 |
if (this._opt.nopad) this._zeroes = false |
|
20 |
else { |
|
21 |
this._zeroes = new Buffer(this._chunkSize) |
|
22 |
for (var i = 0; i < this._chunkSize; i ++) { |
|
23 |
this._zeroes[i] = 0 |
|
24 |
}
|
|
25 |
}
|
|
26 |
}
|
|
27 |
||
28 |
inherits(BlockStream, Stream) |
|
29 |
||
30 |
BlockStream.prototype.write = function (c) { |
|
31 |
// debug(" BS write", c)
|
|
32 |
if (this._ended) throw new Error("BlockStream: write after end") |
|
33 |
if (c && !Buffer.isBuffer(c)) c = new Buffer(c + "") |
|
34 |
if (c.length) { |
|
35 |
this._buffer.push(c) |
|
36 |
this._bufferLength += c.length |
|
37 |
}
|
|
38 |
// debug("pushed onto buffer", this._bufferLength)
|
|
39 |
if (this._bufferLength >= this._chunkSize) { |
|
40 |
if (this._paused) { |
|
41 |
// debug(" BS paused, return false, need drain")
|
|
42 |
this._needDrain = true |
|
43 |
return false |
|
44 |
}
|
|
45 |
this._emitChunk() |
|
46 |
}
|
|
47 |
return true |
|
48 |
}
|
|
49 |
||
50 |
BlockStream.prototype.pause = function () { |
|
51 |
// debug(" BS pausing")
|
|
52 |
this._paused = true |
|
53 |
}
|
|
54 |
||
55 |
BlockStream.prototype.resume = function () { |
|
56 |
// debug(" BS resume")
|
|
57 |
this._paused = false |
|
58 |
return this._emitChunk() |
|
59 |
}
|
|
60 |
||
61 |
BlockStream.prototype.end = function (chunk) { |
|
62 |
// debug("end", chunk)
|
|
63 |
if (typeof chunk === "function") cb = chunk, chunk = null |
|
64 |
if (chunk) this.write(chunk) |
|
65 |
this._ended = true |
|
66 |
this.flush() |
|
67 |
}
|
|
68 |
||
69 |
BlockStream.prototype.flush = function () { |
|
70 |
this._emitChunk(true) |
|
71 |
}
|
|
72 |
||
73 |
BlockStream.prototype._emitChunk = function (flush) { |
|
74 |
// debug("emitChunk flush=%j emitting=%j paused=%j", flush, this._emitting, this._paused)
|
|
75 |
||
76 |
// emit a <chunkSize> chunk
|
|
77 |
if (flush && this._zeroes) { |
|
78 |
// debug(" BS push zeroes", this._bufferLength)
|
|
79 |
// push a chunk of zeroes
|
|
80 |
var padBytes = (this._bufferLength % this._chunkSize) |
|
81 |
if (padBytes !== 0) padBytes = this._chunkSize - padBytes |
|
82 |
if (padBytes > 0) { |
|
83 |
// debug("padBytes", padBytes, this._zeroes.slice(0, padBytes))
|
|
84 |
this._buffer.push(this._zeroes.slice(0, padBytes)) |
|
85 |
this._bufferLength += padBytes |
|
86 |
// debug(this._buffer[this._buffer.length - 1].length, this._bufferLength)
|
|
87 |
}
|
|
88 |
}
|
|
89 |
||
90 |
if (this._emitting || this._paused) return |
|
91 |
this._emitting = true |
|
92 |
||
93 |
// debug(" BS entering loops")
|
|
94 |
var bufferIndex = 0 |
|
95 |
while (this._bufferLength >= this._chunkSize && |
|
96 |
(flush || !this._paused)) { |
|
97 |
// debug(" BS data emission loop", this._bufferLength)
|
|
98 |
||
99 |
var out |
|
100 |
, outOffset = 0 |
|
101 |
, outHas = this._chunkSize |
|
102 |
||
103 |
while (outHas > 0 && (flush || !this._paused) ) { |
|
104 |
// debug(" BS data inner emit loop", this._bufferLength)
|
|
105 |
var cur = this._buffer[bufferIndex] |
|
106 |
, curHas = cur.length - this._offset |
|
107 |
// debug("cur=", cur)
|
|
108 |
// debug("curHas=%j", curHas)
|
|
109 |
// If it's not big enough to fill the whole thing, then we'll need
|
|
110 |
// to copy multiple buffers into one. However, if it is big enough,
|
|
111 |
// then just slice out the part we want, to save unnecessary copying.
|
|
112 |
// Also, need to copy if we've already done some copying, since buffers
|
|
113 |
// can't be joined like cons strings.
|
|
114 |
if (out || curHas < outHas) { |
|
115 |
out = out || new Buffer(this._chunkSize) |
|
116 |
cur.copy(out, outOffset, |
|
117 |
this._offset, this._offset + Math.min(curHas, outHas)) |
|
118 |
} else if (cur.length === outHas && this._offset === 0) { |
|
119 |
// shortcut -- cur is exactly long enough, and no offset.
|
|
120 |
out = cur |
|
121 |
} else { |
|
122 |
// slice out the piece of cur that we need.
|
|
123 |
out = cur.slice(this._offset, this._offset + outHas) |
|
124 |
}
|
|
125 |
||
126 |
if (curHas > outHas) { |
|
127 |
// means that the current buffer couldn't be completely output
|
|
128 |
// update this._offset to reflect how much WAS written
|
|
129 |
this._offset += outHas |
|
130 |
outHas = 0 |
|
131 |
} else { |
|
132 |
// output the entire current chunk.
|
|
133 |
// toss it away
|
|
134 |
outHas -= curHas |
|
135 |
outOffset += curHas |
|
136 |
bufferIndex ++ |
|
137 |
this._offset = 0 |
|
138 |
}
|
|
139 |
}
|
|
140 |
||
141 |
this._bufferLength -= this._chunkSize |
|
142 |
assert(out.length === this._chunkSize) |
|
143 |
// debug("emitting data", out)
|
|
144 |
// debug(" BS emitting, paused=%j", this._paused, this._bufferLength)
|
|
145 |
this.emit("data", out) |
|
146 |
out = null |
|
147 |
}
|
|
148 |
// debug(" BS out of loops", this._bufferLength)
|
|
149 |
||
150 |
// whatever is left, it's not enough to fill up a block, or we're paused
|
|
151 |
this._buffer = this._buffer.slice(bufferIndex) |
|
152 |
if (this._paused) { |
|
153 |
// debug(" BS paused, leaving", this._bufferLength)
|
|
154 |
this._needsDrain = true |
|
155 |
this._emitting = false |
|
156 |
return
|
|
157 |
}
|
|
158 |
||
159 |
// if flushing, and not using null-padding, then need to emit the last
|
|
160 |
// chunk(s) sitting in the queue. We know that it's not enough to
|
|
161 |
// fill up a whole block, because otherwise it would have been emitted
|
|
162 |
// above, but there may be some offset.
|
|
163 |
var l = this._buffer.length |
|
164 |
if (flush && !this._zeroes && l) { |
|
165 |
if (l === 1) { |
|
166 |
if (this._offset) { |
|
167 |
this.emit("data", this._buffer[0].slice(this._offset)) |
|
168 |
} else { |
|
169 |
this.emit("data", this._buffer[0]) |
|
170 |
}
|
|
171 |
} else { |
|
172 |
var outHas = this._bufferLength |
|
173 |
, out = new Buffer(outHas) |
|
174 |
, outOffset = 0 |
|
175 |
for (var i = 0; i < l; i ++) { |
|
176 |
var cur = this._buffer[i] |
|
177 |
, curHas = cur.length - this._offset |
|
178 |
cur.copy(out, outOffset, this._offset) |
|
179 |
this._offset = 0 |
|
180 |
outOffset += curHas |
|
181 |
this._bufferLength -= curHas |
|
182 |
}
|
|
183 |
this.emit("data", out) |
|
184 |
}
|
|
185 |
// truncate
|
|
186 |
this._buffer.length = 0 |
|
187 |
this._bufferLength = 0 |
|
188 |
this._offset = 0 |
|
189 |
}
|
|
190 |
||
191 |
// now either drained or ended
|
|
192 |
// debug("either draining, or ended", this._bufferLength, this._ended)
|
|
193 |
// means that we've flushed out all that we can so far.
|
|
194 |
if (this._needDrain) { |
|
195 |
// debug("emitting drain", this._bufferLength)
|
|
196 |
this._needDrain = false |
|
197 |
this.emit("drain") |
|
198 |
}
|
|
199 |
||
200 |
if ((this._bufferLength === 0) && this._ended && !this._endEmitted) { |
|
201 |
// debug("emitting end", this._bufferLength)
|
|
202 |
this._endEmitted = true |
|
203 |
this.emit("end") |
|
204 |
}
|
|
205 |
||
206 |
this._emitting = false |
|
207 |
||
208 |
// debug(" BS no longer emitting", flush, this._paused, this._emitting, this._bufferLength, this._chunkSize)
|
|
209 |
}
|