~ubuntu-branches/ubuntu/trusty/node-block-stream/trusty

1 by Jérémy Lal
Import upstream version 0.0.5
1
// write data to it, and it'll emit data in 512 byte blocks.
2
// if you .end() or .flush(), it'll emit whatever it's got,
3
// padded with nulls to 512 bytes.
4
5
module.exports = BlockStream
6
7
var Stream = require("stream").Stream
8
  , inherits = require("inherits")
9
  , assert = require("assert").ok
10
  , debug = process.env.DEBUG ? console.error : function () {}
11
12
function BlockStream (size, opt) {
13
  this.writable = this.readable = true
14
  this._opt = opt || {}
15
  this._chunkSize = size || 512
16
  this._offset = 0
17
  this._buffer = []
18
  this._bufferLength = 0
19
  if (this._opt.nopad) this._zeroes = false
20
  else {
21
    this._zeroes = new Buffer(this._chunkSize)
22
    for (var i = 0; i < this._chunkSize; i ++) {
23
      this._zeroes[i] = 0
24
    }
25
  }
26
}
27
28
inherits(BlockStream, Stream)
29
30
BlockStream.prototype.write = function (c) {
31
  // debug("   BS write", c)
32
  if (this._ended) throw new Error("BlockStream: write after end")
33
  if (c && !Buffer.isBuffer(c)) c = new Buffer(c + "")
34
  if (c.length) {
35
    this._buffer.push(c)
36
    this._bufferLength += c.length
37
  }
38
  // debug("pushed onto buffer", this._bufferLength)
39
  if (this._bufferLength >= this._chunkSize) {
40
    if (this._paused) {
41
      // debug("   BS paused, return false, need drain")
42
      this._needDrain = true
43
      return false
44
    }
45
    this._emitChunk()
46
  }
47
  return true
48
}
49
50
BlockStream.prototype.pause = function () {
51
  // debug("   BS pausing")
52
  this._paused = true
53
}
54
55
BlockStream.prototype.resume = function () {
56
  // debug("   BS resume")
57
  this._paused = false
58
  return this._emitChunk()
59
}
60
61
BlockStream.prototype.end = function (chunk) {
62
  // debug("end", chunk)
63
  if (typeof chunk === "function") cb = chunk, chunk = null
64
  if (chunk) this.write(chunk)
65
  this._ended = true
66
  this.flush()
67
}
68
69
BlockStream.prototype.flush = function () {
70
  this._emitChunk(true)
71
}
72
73
BlockStream.prototype._emitChunk = function (flush) {
74
  // debug("emitChunk flush=%j emitting=%j paused=%j", flush, this._emitting, this._paused)
75
76
  // emit a <chunkSize> chunk
77
  if (flush && this._zeroes) {
78
    // debug("    BS push zeroes", this._bufferLength)
79
    // push a chunk of zeroes
80
    var padBytes = (this._bufferLength % this._chunkSize)
81
    if (padBytes !== 0) padBytes = this._chunkSize - padBytes
82
    if (padBytes > 0) {
83
      // debug("padBytes", padBytes, this._zeroes.slice(0, padBytes))
84
      this._buffer.push(this._zeroes.slice(0, padBytes))
85
      this._bufferLength += padBytes
86
      // debug(this._buffer[this._buffer.length - 1].length, this._bufferLength)
87
    }
88
  }
89
90
  if (this._emitting || this._paused) return
91
  this._emitting = true
92
93
  // debug("    BS entering loops")
94
  var bufferIndex = 0
95
  while (this._bufferLength >= this._chunkSize &&
96
         (flush || !this._paused)) {
97
    // debug("     BS data emission loop", this._bufferLength)
98
99
    var out
100
      , outOffset = 0
101
      , outHas = this._chunkSize
102
103
    while (outHas > 0 && (flush || !this._paused) ) {
104
      // debug("    BS data inner emit loop", this._bufferLength)
105
      var cur = this._buffer[bufferIndex]
106
        , curHas = cur.length - this._offset
107
      // debug("cur=", cur)
108
      // debug("curHas=%j", curHas)
109
      // If it's not big enough to fill the whole thing, then we'll need
110
      // to copy multiple buffers into one.  However, if it is big enough,
111
      // then just slice out the part we want, to save unnecessary copying.
112
      // Also, need to copy if we've already done some copying, since buffers
113
      // can't be joined like cons strings.
114
      if (out || curHas < outHas) {
115
        out = out || new Buffer(this._chunkSize)
116
        cur.copy(out, outOffset,
117
                 this._offset, this._offset + Math.min(curHas, outHas))
118
      } else if (cur.length === outHas && this._offset === 0) {
119
        // shortcut -- cur is exactly long enough, and no offset.
120
        out = cur
121
      } else {
122
        // slice out the piece of cur that we need.
123
        out = cur.slice(this._offset, this._offset + outHas)
124
      }
125
126
      if (curHas > outHas) {
127
        // means that the current buffer couldn't be completely output
128
        // update this._offset to reflect how much WAS written
129
        this._offset += outHas
130
        outHas = 0
131
      } else {
132
        // output the entire current chunk.
133
        // toss it away
134
        outHas -= curHas
135
        outOffset += curHas
136
        bufferIndex ++
137
        this._offset = 0
138
      }
139
    }
140
141
    this._bufferLength -= this._chunkSize
142
    assert(out.length === this._chunkSize)
143
    // debug("emitting data", out)
144
    // debug("   BS emitting, paused=%j", this._paused, this._bufferLength)
145
    this.emit("data", out)
146
    out = null
147
  }
148
  // debug("    BS out of loops", this._bufferLength)
149
150
  // whatever is left, it's not enough to fill up a block, or we're paused
151
  this._buffer = this._buffer.slice(bufferIndex)
152
  if (this._paused) {
153
    // debug("    BS paused, leaving", this._bufferLength)
154
    this._needsDrain = true
155
    this._emitting = false
156
    return
157
  }
158
159
  // if flushing, and not using null-padding, then need to emit the last
160
  // chunk(s) sitting in the queue.  We know that it's not enough to
161
  // fill up a whole block, because otherwise it would have been emitted
162
  // above, but there may be some offset.
163
  var l = this._buffer.length
164
  if (flush && !this._zeroes && l) {
165
    if (l === 1) {
166
      if (this._offset) {
167
        this.emit("data", this._buffer[0].slice(this._offset))
168
      } else {
169
        this.emit("data", this._buffer[0])
170
      }
171
    } else {
172
      var outHas = this._bufferLength
173
        , out = new Buffer(outHas)
174
        , outOffset = 0
175
      for (var i = 0; i < l; i ++) {
176
        var cur = this._buffer[i]
177
          , curHas = cur.length - this._offset
178
        cur.copy(out, outOffset, this._offset)
179
        this._offset = 0
180
        outOffset += curHas
181
        this._bufferLength -= curHas
182
      }
183
      this.emit("data", out)
184
    }
185
    // truncate
186
    this._buffer.length = 0
187
    this._bufferLength = 0
188
    this._offset = 0
189
  }
190
191
  // now either drained or ended
192
  // debug("either draining, or ended", this._bufferLength, this._ended)
193
  // means that we've flushed out all that we can so far.
194
  if (this._needDrain) {
195
    // debug("emitting drain", this._bufferLength)
196
    this._needDrain = false
197
    this.emit("drain")
198
  }
199
200
  if ((this._bufferLength === 0) && this._ended && !this._endEmitted) {
201
    // debug("emitting end", this._bufferLength)
202
    this._endEmitted = true
203
    this.emit("end")
204
  }
205
206
  this._emitting = false
207
208
  // debug("    BS no longer emitting", flush, this._paused, this._emitting, this._bufferLength, this._chunkSize)
209
}