1 var Stream = require('stream').Stream;
2 var util = require('util');
4 module.exports = DelayedStream;
5 function DelayedStream() {
8 this.maxDataSize = 1024 * 1024;
9 this.pauseStream = true;
11 this._maxDataSizeExceeded = false;
12 this._released = false;
13 this._bufferedEvents = [];
15 util.inherits(DelayedStream, Stream);
17 DelayedStream.create = function(source, options) {
18 var delayedStream = new this();
20 options = options || {};
21 for (var option in options) {
22 delayedStream[option] = options[option];
25 delayedStream.source = source;
27 var realEmit = source.emit;
28 source.emit = function() {
29 delayedStream._handleEmit(arguments);
30 return realEmit.apply(source, arguments);
33 source.on('error', function() {});
34 if (delayedStream.pauseStream) {
41 Object.defineProperty(DelayedStream.prototype, 'readable', {
45 return this.source.readable;
49 DelayedStream.prototype.setEncoding = function() {
50 return this.source.setEncoding.apply(this.source, arguments);
53 DelayedStream.prototype.resume = function() {
54 if (!this._released) {
61 DelayedStream.prototype.pause = function() {
65 DelayedStream.prototype.release = function() {
66 this._released = true;
68 this._bufferedEvents.forEach(function(args) {
69 this.emit.apply(this, args);
71 this._bufferedEvents = [];
74 DelayedStream.prototype.pipe = function() {
75 var r = Stream.prototype.pipe.apply(this, arguments);
80 DelayedStream.prototype._handleEmit = function(args) {
82 this.emit.apply(this, args);
86 if (args[0] === 'data') {
87 this.dataSize += args[1].length;
88 this._checkIfMaxDataSizeExceeded();
91 this._bufferedEvents.push(args);
94 DelayedStream.prototype._checkIfMaxDataSizeExceeded = function() {
95 if (this._maxDataSizeExceeded) {
99 if (this.dataSize <= this.maxDataSize) {
103 this._maxDataSizeExceeded = true;
105 'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.'
106 this.emit('error', new Error(message));