// Copyright Joyent, Inc. and other Node contributors. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the // “Software”), to deal in the Software without restriction, including // without limitation the rights to use, copy, modify, merge, publish, // distribute, sublicense, and/or sell copies of the Software, and to permit // persons to whom the Software is furnished to do so, subject to the // following conditions: // // The above copyright notice and this permission notice shall be included // in all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE.

// A bit simpler than readable streams. // Implement an async ._write(chunk, cb), and it’ll handle all // the drain event emission and buffering.

module.exports = Writable;

/*<replacement>*/ var Buffer = require(‘buffer’).Buffer; /*</replacement>*/

Writable.WritableState = WritableState;

/*<replacement>*/ var util = require(‘core-util-is’); util.inherits = require(‘inherits’); /*</replacement>*/

var Stream = require(‘stream’);

util.inherits(Writable, Stream);

function WriteReq(chunk, encoding, cb) {

this.chunk = chunk;
this.encoding = encoding;
this.callback = cb;

}

function WritableState(options, stream) {

options = options || {};

// the point at which write() starts returning false
// Note: 0 is a valid value, means that we always return false if
// the entire buffer is not flushed immediately on write()
var hwm = options.highWaterMark;
this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;

// object stream flag to indicate whether or not this stream
// contains buffers or objects.
this.objectMode = !!options.objectMode;

// cast to ints.
this.highWaterMark = ~~this.highWaterMark;

this.needDrain = false;
// at the start of calling end()
this.ending = false;
// when end() has been called, and returned
this.ended = false;
// when 'finish' is emitted
this.finished = false;

// should we decode strings into buffers before passing to _write?
// this is here so that some node-core streams can optimize string
// handling at a lower level.
var noDecode = options.decodeStrings === false;
this.decodeStrings = !noDecode;

// Crypto is kind of old and crusty.  Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
this.defaultEncoding = options.defaultEncoding || 'utf8';

// not an actual buffer we keep track of, but a measurement
// of how much we're waiting to get pushed to some underlying
// socket or file.
this.length = 0;

// a flag to see when we're in the middle of a write.
this.writing = false;

// a flag to be able to tell if the onwrite cb is called immediately,
// or on a later tick.  We set this to true at first, becuase any
// actions that shouldn't happen until "later" should generally also
// not happen before the first write call.
this.sync = true;

// a flag to know if we're processing previously buffered items, which
// may call the _write() callback in the same tick, so that we don't
// end up in an overlapped onwrite situation.
this.bufferProcessing = false;

// the callback that's passed to _write(chunk,cb)
this.onwrite = function(er) {
  onwrite(stream, er);
};

// the callback that the user supplies to write(chunk,encoding,cb)
this.writecb = null;

// the amount that is being written when _write is called.
this.writelen = 0;

this.buffer = [];

// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;

}

function Writable(options) {

var Duplex = require('./_stream_duplex');

// Writable ctor is applied to Duplexes, though they're not
// instanceof Writable, they're instanceof Readable.
if (!(this instanceof Writable) && !(this instanceof Duplex))
  return new Writable(options);

this._writableState = new WritableState(options, this);

// legacy.
this.writable = true;

Stream.call(this);

}

// Otherwise people can pipe Writable streams, which is just wrong. Writable.prototype.pipe = function() {

this.emit('error', new Error('Cannot pipe. Not readable.'));

};

function writeAfterEnd(stream, state, cb) {

var er = new Error('write after end');
// TODO: defer error events consistently everywhere, not just the cb
stream.emit('error', er);
process.nextTick(function() {
  cb(er);
});

}

// If we get something that is not a buffer, string, null, or undefined, // and we’re not in objectMode, then that’s an error. // Otherwise stream chunks are all considered to be of length=1, and the // watermarks determine how many objects to keep in the buffer, rather than // how many bytes or characters. function validChunk(stream, state, chunk, cb) {

var valid = true;
if (!Buffer.isBuffer(chunk) &&
    'string' !== typeof chunk &&
    chunk !== null &&
    chunk !== undefined &&
    !state.objectMode) {
  var er = new TypeError('Invalid non-string/buffer chunk');
  stream.emit('error', er);
  process.nextTick(function() {
    cb(er);
  });
  valid = false;
}
return valid;

}

Writable.prototype.write = function(chunk, encoding, cb) {

var state = this._writableState;
var ret = false;

if (typeof encoding === 'function') {
  cb = encoding;
  encoding = null;
}

if (Buffer.isBuffer(chunk))
  encoding = 'buffer';
else if (!encoding)
  encoding = state.defaultEncoding;

if (typeof cb !== 'function')
  cb = function() {};

if (state.ended)
  writeAfterEnd(this, state, cb);
else if (validChunk(this, state, chunk, cb))
  ret = writeOrBuffer(this, state, chunk, encoding, cb);

return ret;

};

function decodeChunk(state, chunk, encoding) {

if (!state.objectMode &&
    state.decodeStrings !== false &&
    typeof chunk === 'string') {
  chunk = new Buffer(chunk, encoding);
}
return chunk;

}

// if we’re already writing something, then just put this // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. function writeOrBuffer(stream, state, chunk, encoding, cb) {

chunk = decodeChunk(state, chunk, encoding);
if (Buffer.isBuffer(chunk))
  encoding = 'buffer';
var len = state.objectMode ? 1 : chunk.length;

state.length += len;

var ret = state.length < state.highWaterMark;
// we must ensure that previous needDrain will not be reset to false.
if (!ret)
  state.needDrain = true;

if (state.writing)
  state.buffer.push(new WriteReq(chunk, encoding, cb));
else
  doWrite(stream, state, len, chunk, encoding, cb);

return ret;

}

function doWrite(stream, state, len, chunk, encoding, cb) {

state.writelen = len;
state.writecb = cb;
state.writing = true;
state.sync = true;
stream._write(chunk, encoding, state.onwrite);
state.sync = false;

}

function onwriteError(stream, state, sync, er, cb) {

if (sync)
  process.nextTick(function() {
    cb(er);
  });
else
  cb(er);

stream._writableState.errorEmitted = true;
stream.emit('error', er);

}

function onwriteStateUpdate(state) {

state.writing = false;
state.writecb = null;
state.length -= state.writelen;
state.writelen = 0;

}

function onwrite(stream, er) {

var state = stream._writableState;
var sync = state.sync;
var cb = state.writecb;

onwriteStateUpdate(state);

if (er)
  onwriteError(stream, state, sync, er, cb);
else {
  // Check if we're actually ready to finish, but don't emit yet
  var finished = needFinish(stream, state);

  if (!finished && !state.bufferProcessing && state.buffer.length)
    clearBuffer(stream, state);

  if (sync) {
    process.nextTick(function() {
      afterWrite(stream, state, finished, cb);
    });
  } else {
    afterWrite(stream, state, finished, cb);
  }
}

}

function afterWrite(stream, state, finished, cb) {

if (!finished)
  onwriteDrain(stream, state);
cb();
if (finished)
  finishMaybe(stream, state);

}

// Must force callback to be called on nextTick, so that we don’t // emit ‘drain’ before the write() consumer gets the ‘false’ return // value, and has a chance to attach a ‘drain’ listener. function onwriteDrain(stream, state) {

if (state.length === 0 && state.needDrain) {
  state.needDrain = false;
  stream.emit('drain');
}

}

// if there’s something in the buffer waiting, then process it function clearBuffer(stream, state) {

state.bufferProcessing = true;

for (var c = 0; c < state.buffer.length; c++) {
  var entry = state.buffer[c];
  var chunk = entry.chunk;
  var encoding = entry.encoding;
  var cb = entry.callback;
  var len = state.objectMode ? 1 : chunk.length;

  doWrite(stream, state, len, chunk, encoding, cb);

  // if we didn't call the onwrite immediately, then
  // it means that we need to wait until it does.
  // also, that means that the chunk and cb are currently
  // being processed, so move the buffer counter past them.
  if (state.writing) {
    c++;
    break;
  }
}

state.bufferProcessing = false;
if (c < state.buffer.length)
  state.buffer = state.buffer.slice(c);
else
  state.buffer.length = 0;

}

Writable.prototype._write = function(chunk, encoding, cb) {

cb(new Error('not implemented'));

};

Writable.prototype.end = function(chunk, encoding, cb) {

var state = this._writableState;

if (typeof chunk === 'function') {
  cb = chunk;
  chunk = null;
  encoding = null;
} else if (typeof encoding === 'function') {
  cb = encoding;
  encoding = null;
}

if (typeof chunk !== 'undefined' && chunk !== null)
  this.write(chunk, encoding);

// ignore unnecessary end() calls.
if (!state.ending && !state.finished)
  endWritable(this, state, cb);

};

function needFinish(stream, state) {

return (state.ending &&
        state.length === 0 &&
        !state.finished &&
        !state.writing);

}

function finishMaybe(stream, state) {

var need = needFinish(stream, state);
if (need) {
  state.finished = true;
  stream.emit('finish');
}
return need;

}

function endWritable(stream, state, cb) {

state.ending = true;
finishMaybe(stream, state);
if (cb) {
  if (state.finished)
    process.nextTick(cb);
  else
    stream.once('finish', cb);
}
state.ended = true;

}