class Kafka::Protocol::Decoder

A decoder wraps an IO object, making it easy to read specific data types from it. The Kafka protocol is not self-describing, so a client must call these methods in just the right order for things to work.

Public Class Methods

from_string(str) click to toggle source
# File lib/kafka/protocol/decoder.rb, line 9
def self.from_string(str)
  new(StringIO.new(str))
end
new(io) click to toggle source

Initializes a new decoder.

@param io [IO] an object that acts as an IO.

# File lib/kafka/protocol/decoder.rb, line 16
def initialize(io)
  @io = io
end

Public Instance Methods

array(&block) click to toggle source

Decodes an array from the IO object.

The provided block will be called once for each item in the array. It is the responsibility of the block to decode the proper type in the block, since there's no information that allows the type to be inferred automatically.

@return [Array]

# File lib/kafka/protocol/decoder.rb, line 77
def array(&block)
  size = int32
  size.times.map(&block)
end
boolean() click to toggle source

Decodes an 8-bit boolean from the IO object.

@return [Boolean]

# File lib/kafka/protocol/decoder.rb, line 37
def boolean
  read(1) == 0x1
end
bytes() click to toggle source

Decodes a list of bytes from the IO object.

@return [String]

# File lib/kafka/protocol/decoder.rb, line 135
def bytes
  size = int32

  if size == -1
    nil
  else
    read(size)
  end
end
eof?() click to toggle source
# File lib/kafka/protocol/decoder.rb, line 20
def eof?
  @io.eof?
end
int16() click to toggle source

Decodes a 16-bit integer from the IO object.

@return [Integer]

# File lib/kafka/protocol/decoder.rb, line 51
def int16
  read(2).unpack("s>").first
end
int32() click to toggle source

Decodes a 32-bit integer from the IO object.

@return [Integer]

# File lib/kafka/protocol/decoder.rb, line 58
def int32
  read(4).unpack("l>").first
end
int64() click to toggle source

Decodes a 64-bit integer from the IO object.

@return [Integer]

# File lib/kafka/protocol/decoder.rb, line 65
def int64
  read(8).unpack("q>").first
end
int8() click to toggle source

Decodes an 8-bit integer from the IO object.

@return [Integer]

# File lib/kafka/protocol/decoder.rb, line 44
def int8
  read(1).unpack("C").first
end
peek(offset, length) click to toggle source

Get some next bytes without touching the current io offset

@return [Integer]

# File lib/kafka/protocol/decoder.rb, line 27
def peek(offset, length)
  data = @io.read(offset + length)
  return [] if data.nil?
  @io.ungetc(data)
  data.bytes[offset, offset + length] || []
end
read(number_of_bytes) click to toggle source

Reads the specified number of bytes from the IO object, returning them as a String.

@return [String]

# File lib/kafka/protocol/decoder.rb, line 162
def read(number_of_bytes)
  return "" if number_of_bytes == 0

  data = @io.read(number_of_bytes) or raise EOFError

  # If the `read` call returned less data than expected we should not
  # proceed.
  raise EOFError if data.size != number_of_bytes

  data
end
string() click to toggle source

Decodes a string from the IO object.

@return [String]

# File lib/kafka/protocol/decoder.rb, line 94
def string
  size = int16

  if size == -1
    nil
  else
    read(size)
  end
end
varint() click to toggle source

Read an integer under varints serializing from the IO object. developers.google.com/protocol-buffers/docs/encoding#varints

@return [Integer]

# File lib/kafka/protocol/decoder.rb, line 121
def varint
  group = 0
  data = 0
  while (chunk = int8) & 0x80 != 0
    data |= (chunk & 0x7f) << group
    group += 7
  end
  data |= chunk << group
  data & 0b1 != 0 ? ~(data >> 1) : (data >> 1)
end
varint_array(&block) click to toggle source

Decodes an array from the IO object. Just like array except the size is in varint format

@return [Array]

# File lib/kafka/protocol/decoder.rb, line 86
def varint_array(&block)
  size = varint
  size.times.map(&block)
end
varint_bytes() click to toggle source

Decodes a list of bytes from the IO object. The size is in varint format

@return [String]

# File lib/kafka/protocol/decoder.rb, line 148
def varint_bytes
  size =  varint

  if size == -1
    nil
  else
    read(size)
  end
end
varint_string() click to toggle source

Decodes a string from the IO object, the size is in varint format

@return [String]

# File lib/kafka/protocol/decoder.rb, line 107
def varint_string
  size = varint

  if size == -1
    nil
  else
    read(size)
  end
end