1 // A bit simpler than readable streams.
2 // Implement an async ._write(chunk, cb), and it'll handle all
3 // the drain event emission and buffering.
7 module.exports = Writable;
10 var processNextTick = require('process-nextick-args');
15 var Buffer = require('buffer').Buffer;
18 Writable.WritableState = WritableState;
22 var util = require('core-util-is');
23 util.inherits = require('inherits');
31 Stream = require('st' + 'ream');
34 Stream = require('events').EventEmitter;
38 var Buffer = require('buffer').Buffer;
40 util.inherits(Writable, Stream);
44 function WriteReq(chunk, encoding, cb) {
46 this.encoding = encoding;
51 function WritableState(options, stream) {
52 var Duplex = require('./_stream_duplex');
54 options = options || {};
56 // object stream flag to indicate whether or not this stream
57 // contains buffers or objects.
58 this.objectMode = !!options.objectMode;
60 if (stream instanceof Duplex)
61 this.objectMode = this.objectMode || !!options.writableObjectMode;
63 // the point at which write() starts returning false
64 // Note: 0 is a valid value, means that we always return false if
65 // the entire buffer is not flushed immediately on write()
66 var hwm = options.highWaterMark;
67 var defaultHwm = this.objectMode ? 16 : 16 * 1024;
68 this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
71 this.highWaterMark = ~~this.highWaterMark;
73 this.needDrain = false;
74 // at the start of calling end()
76 // when end() has been called, and returned
78 // when 'finish' is emitted
79 this.finished = false;
81 // should we decode strings into buffers before passing to _write?
82 // this is here so that some node-core streams can optimize string
83 // handling at a lower level.
84 var noDecode = options.decodeStrings === false;
85 this.decodeStrings = !noDecode;
87 // Crypto is kind of old and crusty. Historically, its default string
88 // encoding is 'binary' so we have to make this configurable.
89 // Everything else in the universe uses 'utf8', though.
90 this.defaultEncoding = options.defaultEncoding || 'utf8';
92 // not an actual buffer we keep track of, but a measurement
93 // of how much we're waiting to get pushed to some underlying
97 // a flag to see when we're in the middle of a write.
100 // when true all writes will be buffered until .uncork() call
103 // a flag to be able to tell if the onwrite cb is called immediately,
104 // or on a later tick. We set this to true at first, because any
105 // actions that shouldn't happen until "later" should generally also
106 // not happen before the first write call.
109 // a flag to know if we're processing previously buffered items, which
110 // may call the _write() callback in the same tick, so that we don't
111 // end up in an overlapped onwrite situation.
112 this.bufferProcessing = false;
114 // the callback that's passed to _write(chunk,cb)
115 this.onwrite = function(er) {
119 // the callback that the user supplies to write(chunk,encoding,cb)
122 // the amount that is being written when _write is called.
125 this.bufferedRequest = null;
126 this.lastBufferedRequest = null;
128 // number of pending user-supplied write callbacks
129 // this must be 0 before 'finish' can be emitted
132 // emit prefinish if the only thing we're waiting for is _write cbs
133 // This is relevant for synchronous Transform streams
134 this.prefinished = false;
136 // True if the error was already emitted and should not be thrown again
137 this.errorEmitted = false;
140 WritableState.prototype.getBuffer = function writableStateGetBuffer() {
141 var current = this.bufferedRequest;
145 current = current.next;
151 Object.defineProperty(WritableState.prototype, 'buffer', {
152 get: require('util-deprecate')(function() {
153 return this.getBuffer();
154 }, '_writableState.buffer is deprecated. Use ' +
155 '_writableState.getBuffer() instead.')
160 function Writable(options) {
161 var Duplex = require('./_stream_duplex');
163 // Writable ctor is applied to Duplexes, though they're not
164 // instanceof Writable, they're instanceof Readable.
165 if (!(this instanceof Writable) && !(this instanceof Duplex))
166 return new Writable(options);
168 this._writableState = new WritableState(options, this);
171 this.writable = true;
174 if (typeof options.write === 'function')
175 this._write = options.write;
177 if (typeof options.writev === 'function')
178 this._writev = options.writev;
184 // Otherwise people can pipe Writable streams, which is just wrong.
185 Writable.prototype.pipe = function() {
186 this.emit('error', new Error('Cannot pipe. Not readable.'));
190 function writeAfterEnd(stream, cb) {
191 var er = new Error('write after end');
192 // TODO: defer error events consistently everywhere, not just the cb
193 stream.emit('error', er);
194 processNextTick(cb, er);
197 // If we get something that is not a buffer, string, null, or undefined,
198 // and we're not in objectMode, then that's an error.
199 // Otherwise stream chunks are all considered to be of length=1, and the
200 // watermarks determine how many objects to keep in the buffer, rather than
201 // how many bytes or characters.
202 function validChunk(stream, state, chunk, cb) {
205 if (!(Buffer.isBuffer(chunk)) &&
206 typeof chunk !== 'string' &&
208 chunk !== undefined &&
210 var er = new TypeError('Invalid non-string/buffer chunk');
211 stream.emit('error', er);
212 processNextTick(cb, er);
218 Writable.prototype.write = function(chunk, encoding, cb) {
219 var state = this._writableState;
222 if (typeof encoding === 'function') {
227 if (Buffer.isBuffer(chunk))
230 encoding = state.defaultEncoding;
232 if (typeof cb !== 'function')
236 writeAfterEnd(this, cb);
237 else if (validChunk(this, state, chunk, cb)) {
239 ret = writeOrBuffer(this, state, chunk, encoding, cb);
245 Writable.prototype.cork = function() {
246 var state = this._writableState;
251 Writable.prototype.uncork = function() {
252 var state = this._writableState;
257 if (!state.writing &&
260 !state.bufferProcessing &&
261 state.bufferedRequest)
262 clearBuffer(this, state);
266 Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
267 // node::ParseEncoding() requires lower case.
268 if (typeof encoding === 'string')
269 encoding = encoding.toLowerCase();
270 if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64',
271 'ucs2', 'ucs-2','utf16le', 'utf-16le', 'raw']
272 .indexOf((encoding + '').toLowerCase()) > -1))
273 throw new TypeError('Unknown encoding: ' + encoding);
274 this._writableState.defaultEncoding = encoding;
277 function decodeChunk(state, chunk, encoding) {
278 if (!state.objectMode &&
279 state.decodeStrings !== false &&
280 typeof chunk === 'string') {
281 chunk = new Buffer(chunk, encoding);
286 // if we're already writing something, then just put this
287 // in the queue, and wait our turn. Otherwise, call _write
288 // If we return false, then we need a drain event, so set that flag.
289 function writeOrBuffer(stream, state, chunk, encoding, cb) {
290 chunk = decodeChunk(state, chunk, encoding);
292 if (Buffer.isBuffer(chunk))
294 var len = state.objectMode ? 1 : chunk.length;
298 var ret = state.length < state.highWaterMark;
299 // we must ensure that previous needDrain will not be reset to false.
301 state.needDrain = true;
303 if (state.writing || state.corked) {
304 var last = state.lastBufferedRequest;
305 state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
307 last.next = state.lastBufferedRequest;
309 state.bufferedRequest = state.lastBufferedRequest;
312 doWrite(stream, state, false, len, chunk, encoding, cb);
318 function doWrite(stream, state, writev, len, chunk, encoding, cb) {
319 state.writelen = len;
321 state.writing = true;
324 stream._writev(chunk, state.onwrite);
326 stream._write(chunk, encoding, state.onwrite);
330 function onwriteError(stream, state, sync, er, cb) {
333 processNextTick(cb, er);
337 stream._writableState.errorEmitted = true;
338 stream.emit('error', er);
341 function onwriteStateUpdate(state) {
342 state.writing = false;
343 state.writecb = null;
344 state.length -= state.writelen;
348 function onwrite(stream, er) {
349 var state = stream._writableState;
350 var sync = state.sync;
351 var cb = state.writecb;
353 onwriteStateUpdate(state);
356 onwriteError(stream, state, sync, er, cb);
358 // Check if we're actually ready to finish, but don't emit yet
359 var finished = needFinish(state);
363 !state.bufferProcessing &&
364 state.bufferedRequest) {
365 clearBuffer(stream, state);
369 processNextTick(afterWrite, stream, state, finished, cb);
371 afterWrite(stream, state, finished, cb);
376 function afterWrite(stream, state, finished, cb) {
378 onwriteDrain(stream, state);
381 finishMaybe(stream, state);
384 // Must force callback to be called on nextTick, so that we don't
385 // emit 'drain' before the write() consumer gets the 'false' return
386 // value, and has a chance to attach a 'drain' listener.
387 function onwriteDrain(stream, state) {
388 if (state.length === 0 && state.needDrain) {
389 state.needDrain = false;
390 stream.emit('drain');
395 // if there's something in the buffer waiting, then process it
396 function clearBuffer(stream, state) {
397 state.bufferProcessing = true;
398 var entry = state.bufferedRequest;
400 if (stream._writev && entry && entry.next) {
401 // Fast case, write everything using _writev()
405 cbs.push(entry.callback);
410 // count the one we are adding, as well.
411 // TODO(isaacs) clean this up
413 state.lastBufferedRequest = null;
414 doWrite(stream, state, true, state.length, buffer, '', function(err) {
415 for (var i = 0; i < cbs.length; i++) {
423 // Slow case, write chunks one-by-one
425 var chunk = entry.chunk;
426 var encoding = entry.encoding;
427 var cb = entry.callback;
428 var len = state.objectMode ? 1 : chunk.length;
430 doWrite(stream, state, false, len, chunk, encoding, cb);
432 // if we didn't call the onwrite immediately, then
433 // it means that we need to wait until it does.
434 // also, that means that the chunk and cb are currently
435 // being processed, so move the buffer counter past them.
442 state.lastBufferedRequest = null;
444 state.bufferedRequest = entry;
445 state.bufferProcessing = false;
448 Writable.prototype._write = function(chunk, encoding, cb) {
449 cb(new Error('not implemented'));
452 Writable.prototype._writev = null;
454 Writable.prototype.end = function(chunk, encoding, cb) {
455 var state = this._writableState;
457 if (typeof chunk === 'function') {
461 } else if (typeof encoding === 'function') {
466 if (chunk !== null && chunk !== undefined)
467 this.write(chunk, encoding);
469 // .end() fully uncorks
475 // ignore unnecessary end() calls.
476 if (!state.ending && !state.finished)
477 endWritable(this, state, cb);
481 function needFinish(state) {
482 return (state.ending &&
483 state.length === 0 &&
484 state.bufferedRequest === null &&
489 function prefinish(stream, state) {
490 if (!state.prefinished) {
491 state.prefinished = true;
492 stream.emit('prefinish');
496 function finishMaybe(stream, state) {
497 var need = needFinish(state);
499 if (state.pendingcb === 0) {
500 prefinish(stream, state);
501 state.finished = true;
502 stream.emit('finish');
504 prefinish(stream, state);
510 function endWritable(stream, state, cb) {
512 finishMaybe(stream, state);
517 stream.once('finish', cb);