]> gerrit.simantics Code Review - simantics/district.git/blob - org.simantics.maps.server/node/node-v4.8.0-win-x64/node_modules/npm/node_modules/readable-stream/lib/_stream_writable.js
Adding integrated tile server
[simantics/district.git] / org.simantics.maps.server / node / node-v4.8.0-win-x64 / node_modules / npm / node_modules / readable-stream / lib / _stream_writable.js
1 // A bit simpler than readable streams.
2 // Implement an async ._write(chunk, encoding, cb), and it'll handle all
3 // the drain event emission and buffering.
4
5 'use strict';
6
7 module.exports = Writable;
8
9 /*<replacement>*/
10 var processNextTick = require('process-nextick-args');
11 /*</replacement>*/
12
13 /*<replacement>*/
14 var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick;
15 /*</replacement>*/
16
17 Writable.WritableState = WritableState;
18
19 /*<replacement>*/
20 var util = require('core-util-is');
21 util.inherits = require('inherits');
22 /*</replacement>*/
23
24 /*<replacement>*/
25 var internalUtil = {
26   deprecate: require('util-deprecate')
27 };
28 /*</replacement>*/
29
30 /*<replacement>*/
31 var Stream;
32 (function () {
33   try {
34     Stream = require('st' + 'ream');
35   } catch (_) {} finally {
36     if (!Stream) Stream = require('events').EventEmitter;
37   }
38 })();
39 /*</replacement>*/
40
41 var Buffer = require('buffer').Buffer;
42 /*<replacement>*/
43 var bufferShim = require('buffer-shims');
44 /*</replacement>*/
45
46 util.inherits(Writable, Stream);
47
48 function nop() {}
49
50 function WriteReq(chunk, encoding, cb) {
51   this.chunk = chunk;
52   this.encoding = encoding;
53   this.callback = cb;
54   this.next = null;
55 }
56
57 var Duplex;
58 function WritableState(options, stream) {
59   Duplex = Duplex || require('./_stream_duplex');
60
61   options = options || {};
62
63   // object stream flag to indicate whether or not this stream
64   // contains buffers or objects.
65   this.objectMode = !!options.objectMode;
66
67   if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
68
69   // the point at which write() starts returning false
70   // Note: 0 is a valid value, means that we always return false if
71   // the entire buffer is not flushed immediately on write()
72   var hwm = options.highWaterMark;
73   var defaultHwm = this.objectMode ? 16 : 16 * 1024;
74   this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
75
76   // cast to ints.
77   this.highWaterMark = ~ ~this.highWaterMark;
78
79   this.needDrain = false;
80   // at the start of calling end()
81   this.ending = false;
82   // when end() has been called, and returned
83   this.ended = false;
84   // when 'finish' is emitted
85   this.finished = false;
86
87   // should we decode strings into buffers before passing to _write?
88   // this is here so that some node-core streams can optimize string
89   // handling at a lower level.
90   var noDecode = options.decodeStrings === false;
91   this.decodeStrings = !noDecode;
92
93   // Crypto is kind of old and crusty.  Historically, its default string
94   // encoding is 'binary' so we have to make this configurable.
95   // Everything else in the universe uses 'utf8', though.
96   this.defaultEncoding = options.defaultEncoding || 'utf8';
97
98   // not an actual buffer we keep track of, but a measurement
99   // of how much we're waiting to get pushed to some underlying
100   // socket or file.
101   this.length = 0;
102
103   // a flag to see when we're in the middle of a write.
104   this.writing = false;
105
106   // when true all writes will be buffered until .uncork() call
107   this.corked = 0;
108
109   // a flag to be able to tell if the onwrite cb is called immediately,
110   // or on a later tick.  We set this to true at first, because any
111   // actions that shouldn't happen until "later" should generally also
112   // not happen before the first write call.
113   this.sync = true;
114
115   // a flag to know if we're processing previously buffered items, which
116   // may call the _write() callback in the same tick, so that we don't
117   // end up in an overlapped onwrite situation.
118   this.bufferProcessing = false;
119
120   // the callback that's passed to _write(chunk,cb)
121   this.onwrite = function (er) {
122     onwrite(stream, er);
123   };
124
125   // the callback that the user supplies to write(chunk,encoding,cb)
126   this.writecb = null;
127
128   // the amount that is being written when _write is called.
129   this.writelen = 0;
130
131   this.bufferedRequest = null;
132   this.lastBufferedRequest = null;
133
134   // number of pending user-supplied write callbacks
135   // this must be 0 before 'finish' can be emitted
136   this.pendingcb = 0;
137
138   // emit prefinish if the only thing we're waiting for is _write cbs
139   // This is relevant for synchronous Transform streams
140   this.prefinished = false;
141
142   // True if the error was already emitted and should not be thrown again
143   this.errorEmitted = false;
144
145   // count buffered requests
146   this.bufferedRequestCount = 0;
147
148   // allocate the first CorkedRequest, there is always
149   // one allocated and free to use, and we maintain at most two
150   this.corkedRequestsFree = new CorkedRequest(this);
151 }
152
153 WritableState.prototype.getBuffer = function writableStateGetBuffer() {
154   var current = this.bufferedRequest;
155   var out = [];
156   while (current) {
157     out.push(current);
158     current = current.next;
159   }
160   return out;
161 };
162
163 (function () {
164   try {
165     Object.defineProperty(WritableState.prototype, 'buffer', {
166       get: internalUtil.deprecate(function () {
167         return this.getBuffer();
168       }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
169     });
170   } catch (_) {}
171 })();
172
173 var Duplex;
174 function Writable(options) {
175   Duplex = Duplex || require('./_stream_duplex');
176
177   // Writable ctor is applied to Duplexes, though they're not
178   // instanceof Writable, they're instanceof Readable.
179   if (!(this instanceof Writable) && !(this instanceof Duplex)) return new Writable(options);
180
181   this._writableState = new WritableState(options, this);
182
183   // legacy.
184   this.writable = true;
185
186   if (options) {
187     if (typeof options.write === 'function') this._write = options.write;
188
189     if (typeof options.writev === 'function') this._writev = options.writev;
190   }
191
192   Stream.call(this);
193 }
194
195 // Otherwise people can pipe Writable streams, which is just wrong.
196 Writable.prototype.pipe = function () {
197   this.emit('error', new Error('Cannot pipe, not readable'));
198 };
199
200 function writeAfterEnd(stream, cb) {
201   var er = new Error('write after end');
202   // TODO: defer error events consistently everywhere, not just the cb
203   stream.emit('error', er);
204   processNextTick(cb, er);
205 }
206
207 // If we get something that is not a buffer, string, null, or undefined,
208 // and we're not in objectMode, then that's an error.
209 // Otherwise stream chunks are all considered to be of length=1, and the
210 // watermarks determine how many objects to keep in the buffer, rather than
211 // how many bytes or characters.
212 function validChunk(stream, state, chunk, cb) {
213   var valid = true;
214   var er = false;
215   // Always throw error if a null is written
216   // if we are not in object mode then throw
217   // if it is not a buffer, string, or undefined.
218   if (chunk === null) {
219     er = new TypeError('May not write null values to stream');
220   } else if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
221     er = new TypeError('Invalid non-string/buffer chunk');
222   }
223   if (er) {
224     stream.emit('error', er);
225     processNextTick(cb, er);
226     valid = false;
227   }
228   return valid;
229 }
230
231 Writable.prototype.write = function (chunk, encoding, cb) {
232   var state = this._writableState;
233   var ret = false;
234
235   if (typeof encoding === 'function') {
236     cb = encoding;
237     encoding = null;
238   }
239
240   if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
241
242   if (typeof cb !== 'function') cb = nop;
243
244   if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
245     state.pendingcb++;
246     ret = writeOrBuffer(this, state, chunk, encoding, cb);
247   }
248
249   return ret;
250 };
251
252 Writable.prototype.cork = function () {
253   var state = this._writableState;
254
255   state.corked++;
256 };
257
258 Writable.prototype.uncork = function () {
259   var state = this._writableState;
260
261   if (state.corked) {
262     state.corked--;
263
264     if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
265   }
266 };
267
268 Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
269   // node::ParseEncoding() requires lower case.
270   if (typeof encoding === 'string') encoding = encoding.toLowerCase();
271   if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
272   this._writableState.defaultEncoding = encoding;
273   return this;
274 };
275
276 function decodeChunk(state, chunk, encoding) {
277   if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
278     chunk = bufferShim.from(chunk, encoding);
279   }
280   return chunk;
281 }
282
283 // if we're already writing something, then just put this
284 // in the queue, and wait our turn.  Otherwise, call _write
285 // If we return false, then we need a drain event, so set that flag.
286 function writeOrBuffer(stream, state, chunk, encoding, cb) {
287   chunk = decodeChunk(state, chunk, encoding);
288
289   if (Buffer.isBuffer(chunk)) encoding = 'buffer';
290   var len = state.objectMode ? 1 : chunk.length;
291
292   state.length += len;
293
294   var ret = state.length < state.highWaterMark;
295   // we must ensure that previous needDrain will not be reset to false.
296   if (!ret) state.needDrain = true;
297
298   if (state.writing || state.corked) {
299     var last = state.lastBufferedRequest;
300     state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
301     if (last) {
302       last.next = state.lastBufferedRequest;
303     } else {
304       state.bufferedRequest = state.lastBufferedRequest;
305     }
306     state.bufferedRequestCount += 1;
307   } else {
308     doWrite(stream, state, false, len, chunk, encoding, cb);
309   }
310
311   return ret;
312 }
313
314 function doWrite(stream, state, writev, len, chunk, encoding, cb) {
315   state.writelen = len;
316   state.writecb = cb;
317   state.writing = true;
318   state.sync = true;
319   if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
320   state.sync = false;
321 }
322
323 function onwriteError(stream, state, sync, er, cb) {
324   --state.pendingcb;
325   if (sync) processNextTick(cb, er);else cb(er);
326
327   stream._writableState.errorEmitted = true;
328   stream.emit('error', er);
329 }
330
331 function onwriteStateUpdate(state) {
332   state.writing = false;
333   state.writecb = null;
334   state.length -= state.writelen;
335   state.writelen = 0;
336 }
337
338 function onwrite(stream, er) {
339   var state = stream._writableState;
340   var sync = state.sync;
341   var cb = state.writecb;
342
343   onwriteStateUpdate(state);
344
345   if (er) onwriteError(stream, state, sync, er, cb);else {
346     // Check if we're actually ready to finish, but don't emit yet
347     var finished = needFinish(state);
348
349     if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
350       clearBuffer(stream, state);
351     }
352
353     if (sync) {
354       /*<replacement>*/
355       asyncWrite(afterWrite, stream, state, finished, cb);
356       /*</replacement>*/
357     } else {
358         afterWrite(stream, state, finished, cb);
359       }
360   }
361 }
362
363 function afterWrite(stream, state, finished, cb) {
364   if (!finished) onwriteDrain(stream, state);
365   state.pendingcb--;
366   cb();
367   finishMaybe(stream, state);
368 }
369
370 // Must force callback to be called on nextTick, so that we don't
371 // emit 'drain' before the write() consumer gets the 'false' return
372 // value, and has a chance to attach a 'drain' listener.
373 function onwriteDrain(stream, state) {
374   if (state.length === 0 && state.needDrain) {
375     state.needDrain = false;
376     stream.emit('drain');
377   }
378 }
379
380 // if there's something in the buffer waiting, then process it
381 function clearBuffer(stream, state) {
382   state.bufferProcessing = true;
383   var entry = state.bufferedRequest;
384
385   if (stream._writev && entry && entry.next) {
386     // Fast case, write everything using _writev()
387     var l = state.bufferedRequestCount;
388     var buffer = new Array(l);
389     var holder = state.corkedRequestsFree;
390     holder.entry = entry;
391
392     var count = 0;
393     while (entry) {
394       buffer[count] = entry;
395       entry = entry.next;
396       count += 1;
397     }
398
399     doWrite(stream, state, true, state.length, buffer, '', holder.finish);
400
401     // doWrite is almost always async, defer these to save a bit of time
402     // as the hot path ends with doWrite
403     state.pendingcb++;
404     state.lastBufferedRequest = null;
405     if (holder.next) {
406       state.corkedRequestsFree = holder.next;
407       holder.next = null;
408     } else {
409       state.corkedRequestsFree = new CorkedRequest(state);
410     }
411   } else {
412     // Slow case, write chunks one-by-one
413     while (entry) {
414       var chunk = entry.chunk;
415       var encoding = entry.encoding;
416       var cb = entry.callback;
417       var len = state.objectMode ? 1 : chunk.length;
418
419       doWrite(stream, state, false, len, chunk, encoding, cb);
420       entry = entry.next;
421       // if we didn't call the onwrite immediately, then
422       // it means that we need to wait until it does.
423       // also, that means that the chunk and cb are currently
424       // being processed, so move the buffer counter past them.
425       if (state.writing) {
426         break;
427       }
428     }
429
430     if (entry === null) state.lastBufferedRequest = null;
431   }
432
433   state.bufferedRequestCount = 0;
434   state.bufferedRequest = entry;
435   state.bufferProcessing = false;
436 }
437
438 Writable.prototype._write = function (chunk, encoding, cb) {
439   cb(new Error('not implemented'));
440 };
441
442 Writable.prototype._writev = null;
443
444 Writable.prototype.end = function (chunk, encoding, cb) {
445   var state = this._writableState;
446
447   if (typeof chunk === 'function') {
448     cb = chunk;
449     chunk = null;
450     encoding = null;
451   } else if (typeof encoding === 'function') {
452     cb = encoding;
453     encoding = null;
454   }
455
456   if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
457
458   // .end() fully uncorks
459   if (state.corked) {
460     state.corked = 1;
461     this.uncork();
462   }
463
464   // ignore unnecessary end() calls.
465   if (!state.ending && !state.finished) endWritable(this, state, cb);
466 };
467
468 function needFinish(state) {
469   return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
470 }
471
472 function prefinish(stream, state) {
473   if (!state.prefinished) {
474     state.prefinished = true;
475     stream.emit('prefinish');
476   }
477 }
478
479 function finishMaybe(stream, state) {
480   var need = needFinish(state);
481   if (need) {
482     if (state.pendingcb === 0) {
483       prefinish(stream, state);
484       state.finished = true;
485       stream.emit('finish');
486     } else {
487       prefinish(stream, state);
488     }
489   }
490   return need;
491 }
492
493 function endWritable(stream, state, cb) {
494   state.ending = true;
495   finishMaybe(stream, state);
496   if (cb) {
497     if (state.finished) processNextTick(cb);else stream.once('finish', cb);
498   }
499   state.ended = true;
500   stream.writable = false;
501 }
502
503 // It seems a linked list but it is not
504 // there will be only 2 of these for each stream
505 function CorkedRequest(state) {
506   var _this = this;
507
508   this.next = null;
509   this.entry = null;
510
511   this.finish = function (err) {
512     var entry = _this.entry;
513     _this.entry = null;
514     while (entry) {
515       var cb = entry.callback;
516       state.pendingcb--;
517       cb(err);
518       entry = entry.next;
519     }
520     if (state.corkedRequestsFree) {
521       state.corkedRequestsFree.next = _this;
522     } else {
523       state.corkedRequestsFree = _this;
524     }
525   };
526 }