var Stream = require('stream')

// through // // a stream that does nothing but re-emit the input. // useful for aggregating a series of changing but not ending streams into one stream)

exports = module.exports = through through.through = through

//create a readable writable stream.

function through (write, end, opts) {

write = write || function (data) { this.queue(data) }
end = end || function () { this.queue(null) }

var ended = false, destroyed = false, buffer = [], _ended = false
var stream = new Stream()
stream.readable = stream.writable = true
stream.paused = false

// stream.autoPause = !(opts && opts.autoPause === false)

stream.autoDestroy = !(opts && opts.autoDestroy === false)

stream.write = function (data) {
  write.call(this, data)
  return !stream.paused
}

function drain() {
  while(buffer.length && !stream.paused) {
    var data = buffer.shift()
    if(null === data)
      return stream.emit('end')
    else
      stream.emit('data', data)
  }
}

stream.queue = stream.push = function (data) {

// console.error(ended)

  if(_ended) return stream
  if(data === null) _ended = true
  buffer.push(data)
  drain()
  return stream
}

//this will be registered as the first 'end' listener
//must call destroy next tick, to make sure we're after any
//stream piped from here.
//this is only a problem if end is not emitted synchronously.
//a nicer way to do this is to make sure this is the last listener for 'end'

stream.on('end', function () {
  stream.readable = false
  if(!stream.writable && stream.autoDestroy)
    process.nextTick(function () {
      stream.destroy()
    })
})

function _end () {
  stream.writable = false
  end.call(stream)
  if(!stream.readable && stream.autoDestroy)
    stream.destroy()
}

stream.end = function (data) {
  if(ended) return
  ended = true
  if(arguments.length) stream.write(data)
  _end() // will emit or queue
  return stream
}

stream.destroy = function () {
  if(destroyed) return
  destroyed = true
  ended = true
  buffer.length = 0
  stream.writable = stream.readable = false
  stream.emit('close')
  return stream
}

stream.pause = function () {
  if(stream.paused) return
  stream.paused = true
  return stream
}

stream.resume = function () {
  if(stream.paused) {
    stream.paused = false
    stream.emit('resume')
  }
  drain()
  //may have become paused again,
  //as drain emits 'data'.
  if(!stream.paused)
    stream.emit('drain')
  return stream
}
return stream

}