class Krakow::ConnectionFeatures::SnappyFrames::Io

Snappy-able IO

Constants

CHUNK_TYPE

Mapping of types

IDENTIFIER

Header identifier

IDENTIFIER_SIZE

Size of identifier

Attributes

buffer[R]
io[R]

Public Class Methods

new(io, args={}) click to toggle source

Create new snappy-able IO

@param io [IO] IO to wrap @return [Io]

# File lib/krakow/connection_features/snappy_frames.rb, line 39
def initialize(io, args={})
  @io = io
  @snappy_write_ident = false
  @buffer = ''
end

Public Instance Methods

checksum_mask(checksum) click to toggle source

Mask the checksum

@param checksum [String] @return [String]

# File lib/krakow/connection_features/snappy_frames.rb, line 57
def checksum_mask(checksum)
  (((checksum >> 15) | (checksum << 17)) + 0xa282ead8) & 0xffffffff
end
method_missing(*args) click to toggle source

Proxy to underlying socket

@param args [Object] @return [Object]

# File lib/krakow/connection_features/snappy_frames.rb, line 49
def method_missing(*args)
  io.__send__(*args)
end
read(n)
Alias for: recv
read_stream() click to toggle source

Read contents from stream

@return [String]

# File lib/krakow/connection_features/snappy_frames.rb, line 75
def read_stream
  header = io.recv(4)
  ident = CHUNK_TYPE[header.slice!(0)]
  size = (header << CHUNK_TYPE.key(:compressed)).unpack('L<').first
  content = io.recv(size)
  case ident
  when :identifier
    unless(content == IDENTIFIER)
      raise "Invalid stream identification encountered (content: #{content.inspect})"
    end
    read_stream
  when :compressed
    checksum = content.slice!(0, 4).unpack('L<').first
    deflated = Snappy.inflate(content)
    digest = Digest::CRC32c.new
    digest << deflated
    unless(checksum == checksum_mask(digest.checksum))
      raise 'Checksum mismatch!'
    end
    buffer << deflated
  when :uncompressed
    buffer << content
  end
end
recv(n) click to toggle source

Receive bytes from the IO

@param n [Integer] nuber of bytes @return [String]

# File lib/krakow/connection_features/snappy_frames.rb, line 65
def recv(n)
  read_stream unless buffer.size >= n
  result = buffer.slice!(0,n)
  result.empty? ? nil : result
end
Also aliased as: read
send_snappy_identifier() click to toggle source

Send the identifier for snappy content

@return [Integer] bytes written

# File lib/krakow/connection_features/snappy_frames.rb, line 122
def send_snappy_identifier
  io.write [CHUNK_TYPE.key(:identifier), IDENTIFIER_SIZE, IDENTIFIER].pack('a*a*a*')
end
write(string) click to toggle source

Write string to IO

@param string [String] @return [Integer] number of bytes written

# File lib/krakow/connection_features/snappy_frames.rb, line 104
def write(string)
  unless(@snappy_writer_ident)
    send_snappy_identifier
  end
  digest = Digest::CRC32c.new
  digest << string
  content = Snappy.deflate(string)
  size = content.length + 4
  size = [size].pack('L<')
  size.slice!(-1,1)
  checksum = [checksum_mask(digest.checksum)].pack('L<')
  output = [CHUNK_TYPE.key(:compressed), size, checksum, content].pack('a*a*a*a*')
  io.write output
end