class Down::ChunkedIO

Wraps an enumerator that yields chunks of content into an IO object. It implements some essential IO methods:

By default the Down::ChunkedIO caches all read content into a tempfile, allowing it to be rewindable. If rewindability won't be used, it can be disabled by setting `:rewindable` to false, which eliminates any disk I/O.

Any cleanup code (i.e. ensure block) that the given enumerator carries is guaranteed to get executed, either when all content has been retrieved or when Down::ChunkedIO is closed. One can also specify an `:on_close` callback that will also get executed in those situations.

Attributes

data[RW]
encoding[RW]
size[RW]

Public Class Methods

new(chunks:, size: nil, on_close: nil, data: {}, rewindable: true, encoding: nil) click to toggle source
# File lib/down/chunked_io.rb, line 30
def initialize(chunks:, size: nil, on_close: nil, data: {}, rewindable: true, encoding: nil)
  @chunks     = chunks
  @size       = size
  @on_close   = on_close
  @data       = data
  @encoding   = find_encoding(encoding || "binary")
  @rewindable = rewindable
  @buffer     = nil
  @position   = 0

  retrieve_chunk # fetch first chunk so that we know whether the file is empty
end

Public Instance Methods

close() click to toggle source

Implements IO#close semantics. Closes the Down::ChunkedIO by terminating chunk retrieval and deleting the cached content.

# File lib/down/chunked_io.rb, line 253
def close
  return if @closed

  chunks_fiber.resume(:terminate) if chunks_fiber.alive?
  cache.close! if cache
  @buffer = nil
  @closed = true
end
closed?() click to toggle source

Returns whether the Down::ChunkedIO has been closed.

# File lib/down/chunked_io.rb, line 263
def closed?
  !!@closed
end
each_chunk() { |retrieve_chunk until chunks_depleted?| ... } click to toggle source

Yields elements of the underlying enumerator.

# File lib/down/chunked_io.rb, line 44
def each_chunk
  fail IOError, "closed stream" if closed?

  return enum_for(__method__) unless block_given?

  yield retrieve_chunk until chunks_depleted?
end
eof?() click to toggle source

Implements IO#eof? semantics. Returns whether we've reached end of file. It returns true if cache is at the end and there is no more content to retrieve. Raises IOError if closed.

# File lib/down/chunked_io.rb, line 233
def eof?
  fail IOError, "closed stream" if closed?

  return false if cache && !cache.eof?
  @buffer.nil? && chunks_depleted?
end
gets(separator_or_limit = $/, limit = nil) click to toggle source

Implements IO#gets semantics. Without arguments it retrieves lines of content separated by newlines.

With `separator` argument it does the following:

  • if `separator` is a nonempty string returns chunks of content surrounded with that sequence of bytes

  • if `separator` is an empty string returns paragraphs of content (content delimited by two newlines)

  • if `separator` is nil and `limit` is nil returns all content

With `limit` argument returns maximum of that amount of bytes.

Returns nil if end of file is reached. Raises IOError if closed.

# File lib/down/chunked_io.rb, line 96
def gets(separator_or_limit = $/, limit = nil)
  fail IOError, "closed stream" if closed?

  if separator_or_limit.is_a?(Integer)
    separator = $/
    limit     = separator_or_limit
  else
    separator = separator_or_limit
  end

  return read(limit) if separator.nil?

  separator = "\n\n" if separator.empty?

  data = String.new

  until data.include?(separator) || data.bytesize == limit || eof?
    remaining_length = limit - data.bytesize if limit
    data << readpartial(remaining_length, buffer ||= String.new)
  end

  buffer.clear if buffer # deallocate buffer

  line, extra = data.split(separator, 2)
  line << separator if data.include?(separator)

  data.clear # deallocate data

  if extra
    if cache
      cache.pos -= extra.bytesize
    else
      if @buffer
        @buffer.prepend(extra)
      else
        @buffer = extra
      end
    end
  end

  line.force_encoding(@encoding) if line
end
inspect() click to toggle source

Returns useful information about the Down::ChunkedIO object.

# File lib/down/chunked_io.rb, line 273
def inspect
  string  = String.new
  string << "#<#{self.class.name}"
  string << " chunks=#{@chunks.inspect}"
  string << " size=#{size.inspect}"
  string << " encoding=#{encoding.inspect}"
  string << " data=#{data.inspect}"
  string << " on_close=#{@on_close.inspect}"
  string << " rewindable=#{@rewindable.inspect}"
  string << " (closed)" if closed?
  string << ">"
end
pos() click to toggle source

Implements IO#pos semantics. Returns the current position of the Down::ChunkedIO.

# File lib/down/chunked_io.rb, line 225
def pos
  @position
end
Also aliased as: tell
read(length = nil, outbuf = nil) click to toggle source

Implements IO#read semantics. Without arguments it retrieves and returns all content.

With `length` argument returns exactly that number of bytes if they're available.

With `outbuf` argument each call will return that same string object, where the value is replaced with retrieved content.

If end of file is reached, returns empty string if called without arguments, or nil if called with arguments. Raises IOError if closed.

# File lib/down/chunked_io.rb, line 63
def read(length = nil, outbuf = nil)
  fail IOError, "closed stream" if closed?

  data   = outbuf.clear.force_encoding(Encoding::BINARY) if outbuf
  data ||= "".b

  remaining_length = length

  until remaining_length == 0 || eof?
    data << readpartial(remaining_length, buffer ||= String.new)
    remaining_length = length - data.bytesize if length
  end

  buffer.clear if buffer # deallocate string

  data.force_encoding(@encoding) unless length
  data unless data.empty? && length && length > 0
end
readpartial(maxlen = nil, outbuf = nil) click to toggle source

Implements IO#readpartial semantics. If there is any content readily available reads from it, otherwise fetches and reads from the next chunk. It writes to and reads from the cache when needed.

Without arguments it either returns all content that's readily available, or the next chunk. This is useful when you don't care about the size of chunks and you want to minimize string allocations.

With `maxlen` argument returns maximum of that amount of bytes (default is 16KB).

With `outbuf` argument each call will return that same string object, where the value is replaced with retrieved content.

Raises EOFError if end of file is reached. Raises IOError if closed.

# File lib/down/chunked_io.rb, line 154
def readpartial(maxlen = nil, outbuf = nil)
  fail IOError, "closed stream" if closed?

  maxlen ||= 16*1024

  data   = cache.read(maxlen, outbuf) if cache && !cache.eof?
  data ||= outbuf.clear.force_encoding(Encoding::BINARY) if outbuf
  data ||= "".b

  return data if maxlen == 0

  if @buffer.nil? && data.empty?
    fail EOFError, "end of file reached" if chunks_depleted?
    @buffer = retrieve_chunk
  end

  remaining_length = maxlen - data.bytesize

  unless @buffer.nil? || remaining_length == 0
    if remaining_length < @buffer.bytesize
      buffered_data = @buffer.byteslice(0, remaining_length)
      @buffer       = @buffer.byteslice(remaining_length..-1)
    else
      buffered_data = @buffer
      @buffer       = nil
    end

    data << buffered_data

    cache.write(buffered_data) if cache

    buffered_data.clear unless buffered_data.frozen?
  end

  @position += data.bytesize

  data.force_encoding(Encoding::BINARY)
end
rewind() click to toggle source

Implements IO#rewind semantics. Rewinds the Down::ChunkedIO by rewinding the cache and setting the position to the beginning of the file. Raises IOError if closed or not rewindable.

# File lib/down/chunked_io.rb, line 243
def rewind
  fail IOError, "closed stream" if closed?
  fail IOError, "this Down::ChunkedIO is not rewindable" if cache.nil?

  cache.rewind
  @position = 0
end
rewindable?() click to toggle source

Returns whether the Down::ChunkedIO was specified as rewindable.

# File lib/down/chunked_io.rb, line 268
def rewindable?
  @rewindable
end
seek(amount, whence = IO::SEEK_SET) click to toggle source

Implements IO#seek semantics.

# File lib/down/chunked_io.rb, line 194
def seek(amount, whence = IO::SEEK_SET)
  fail Errno::ESPIPE, "Illegal seek" if cache.nil?

  case whence
  when IO::SEEK_SET, :SET
    target_pos = amount
  when IO::SEEK_CUR, :CUR
    target_pos = @position + amount
  when IO::SEEK_END, :END
    unless chunks_depleted?
      cache.seek(0, IO::SEEK_END)
      IO.copy_stream(self, File::NULL)
    end

    target_pos = cache.size + amount
  else
    fail ArgumentError, "invalid whence: #{whence.inspect}"
  end

  if target_pos <= cache.size
    cache.seek(target_pos)
  else
    cache.seek(0, IO::SEEK_END)
    IO.copy_stream(self, File::NULL, target_pos - cache.size)
  end

  @position = cache.pos
end
tell()
Alias for: pos

Private Instance Methods

cache() click to toggle source

If Down::ChunkedIO is specified as rewindable, returns a new Tempfile for writing read content to. This allows the Down::ChunkedIO to be rewinded.

# File lib/down/chunked_io.rb, line 290
def cache
  return if !rewindable?

  @cache ||= (
    tempfile = Tempfile.new("down-chunked_io", binmode: true)
    tempfile.chmod(0000)      # make sure nobody else can read or write to it
    tempfile.unlink if posix? # remove entry from filesystem if it's POSIX
    tempfile
  )
end
chunks_depleted?() click to toggle source

Returns whether there is any content left to retrieve.

# File lib/down/chunked_io.rb, line 310
def chunks_depleted?
  !chunks_fiber.alive?
end
chunks_fiber() click to toggle source

Creates a Fiber wrapper around the underlying enumerator. The advantage of using a Fiber here is that we can terminate the chunk retrieval, in a way that executes any cleanup code that the enumerator potentially carries. At the end of iteration the :on_close callback is executed.

# File lib/down/chunked_io.rb, line 318
def chunks_fiber
  @chunks_fiber ||= Fiber.new do
    begin
      @chunks.each do |chunk|
        action = Fiber.yield chunk
        break if action == :terminate
      end
    ensure
      @on_close.call if @on_close
    end
  end
end
find_encoding(encoding) click to toggle source

Finds encoding by name. If the encoding couldn't be find, falls back to the generic binary encoding.

# File lib/down/chunked_io.rb, line 333
def find_encoding(encoding)
  Encoding.find(encoding)
rescue ArgumentError
  Encoding::BINARY
end
posix?() click to toggle source

Returns whether the filesystem has POSIX semantics.

# File lib/down/chunked_io.rb, line 340
def posix?
  RUBY_PLATFORM !~ /(mswin|mingw|cygwin|java)/
end
retrieve_chunk() click to toggle source

Returns current chunk and retrieves the next chunk. If next chunk is nil, we know we've reached EOF.

# File lib/down/chunked_io.rb, line 303
def retrieve_chunk
  chunk = @next_chunk
  @next_chunk = chunks_fiber.resume
  chunk
end