1 // write data to it, and it'll emit data in 512 byte blocks.
2 // if you .end() or .flush(), it'll emit whatever it's got,
3 // padded with nulls to 512 bytes.
5 module.exports = BlockStream
7 var Stream = require("stream").Stream
8 , inherits = require("inherits")
9 , assert = require("assert").ok
10 , debug = process.env.DEBUG ? console.error : function () {}
12 function BlockStream (size, opt) {
13 this.writable = this.readable = true
15 this._chunkSize = size || 512
18 this._bufferLength = 0
19 if (this._opt.nopad) this._zeroes = false
21 this._zeroes = new Buffer(this._chunkSize)
22 for (var i = 0; i < this._chunkSize; i ++) {
28 inherits(BlockStream, Stream)
30 BlockStream.prototype.write = function (c) {
31 // debug(" BS write", c)
32 if (this._ended) throw new Error("BlockStream: write after end")
33 if (c && !Buffer.isBuffer(c)) c = new Buffer(c + "")
36 this._bufferLength += c.length
38 // debug("pushed onto buffer", this._bufferLength)
39 if (this._bufferLength >= this._chunkSize) {
41 // debug(" BS paused, return false, need drain")
42 this._needDrain = true
50 BlockStream.prototype.pause = function () {
51 // debug(" BS pausing")
55 BlockStream.prototype.resume = function () {
56 // debug(" BS resume")
58 return this._emitChunk()
61 BlockStream.prototype.end = function (chunk) {
62 // debug("end", chunk)
63 if (typeof chunk === "function") cb = chunk, chunk = null
64 if (chunk) this.write(chunk)
69 BlockStream.prototype.flush = function () {
73 BlockStream.prototype._emitChunk = function (flush) {
74 // debug("emitChunk flush=%j emitting=%j paused=%j", flush, this._emitting, this._paused)
76 // emit a <chunkSize> chunk
77 if (flush && this._zeroes) {
78 // debug(" BS push zeroes", this._bufferLength)
79 // push a chunk of zeroes
80 var padBytes = (this._bufferLength % this._chunkSize)
81 if (padBytes !== 0) padBytes = this._chunkSize - padBytes
83 // debug("padBytes", padBytes, this._zeroes.slice(0, padBytes))
84 this._buffer.push(this._zeroes.slice(0, padBytes))
85 this._bufferLength += padBytes
86 // debug(this._buffer[this._buffer.length - 1].length, this._bufferLength)
90 if (this._emitting || this._paused) return
93 // debug(" BS entering loops")
95 while (this._bufferLength >= this._chunkSize &&
96 (flush || !this._paused)) {
97 // debug(" BS data emission loop", this._bufferLength)
101 , outHas = this._chunkSize
103 while (outHas > 0 && (flush || !this._paused) ) {
104 // debug(" BS data inner emit loop", this._bufferLength)
105 var cur = this._buffer[bufferIndex]
106 , curHas = cur.length - this._offset
107 // debug("cur=", cur)
108 // debug("curHas=%j", curHas)
109 // If it's not big enough to fill the whole thing, then we'll need
110 // to copy multiple buffers into one. However, if it is big enough,
111 // then just slice out the part we want, to save unnecessary copying.
112 // Also, need to copy if we've already done some copying, since buffers
113 // can't be joined like cons strings.
114 if (out || curHas < outHas) {
115 out = out || new Buffer(this._chunkSize)
116 cur.copy(out, outOffset,
117 this._offset, this._offset + Math.min(curHas, outHas))
118 } else if (cur.length === outHas && this._offset === 0) {
119 // shortcut -- cur is exactly long enough, and no offset.
122 // slice out the piece of cur that we need.
123 out = cur.slice(this._offset, this._offset + outHas)
126 if (curHas > outHas) {
127 // means that the current buffer couldn't be completely output
128 // update this._offset to reflect how much WAS written
129 this._offset += outHas
132 // output the entire current chunk.
141 this._bufferLength -= this._chunkSize
142 assert(out.length === this._chunkSize)
143 // debug("emitting data", out)
144 // debug(" BS emitting, paused=%j", this._paused, this._bufferLength)
145 this.emit("data", out)
148 // debug(" BS out of loops", this._bufferLength)
150 // whatever is left, it's not enough to fill up a block, or we're paused
151 this._buffer = this._buffer.slice(bufferIndex)
153 // debug(" BS paused, leaving", this._bufferLength)
154 this._needsDrain = true
155 this._emitting = false
159 // if flushing, and not using null-padding, then need to emit the last
160 // chunk(s) sitting in the queue. We know that it's not enough to
161 // fill up a whole block, because otherwise it would have been emitted
162 // above, but there may be some offset.
163 var l = this._buffer.length
164 if (flush && !this._zeroes && l) {
167 this.emit("data", this._buffer[0].slice(this._offset))
169 this.emit("data", this._buffer[0])
172 var outHas = this._bufferLength
173 , out = new Buffer(outHas)
175 for (var i = 0; i < l; i ++) {
176 var cur = this._buffer[i]
177 , curHas = cur.length - this._offset
178 cur.copy(out, outOffset, this._offset)
181 this._bufferLength -= curHas
183 this.emit("data", out)
186 this._buffer.length = 0
187 this._bufferLength = 0
191 // now either drained or ended
192 // debug("either draining, or ended", this._bufferLength, this._ended)
193 // means that we've flushed out all that we can so far.
194 if (this._needDrain) {
195 // debug("emitting drain", this._bufferLength)
196 this._needDrain = false
200 if ((this._bufferLength === 0) && this._ended && !this._endEmitted) {
201 // debug("emitting end", this._bufferLength)
202 this._endEmitted = true
206 this._emitting = false
208 // debug(" BS no longer emitting", flush, this._paused, this._emitting, this._bufferLength, this._chunkSize)