class PulsarSdk::Protocol::Reader

Constants

CMD_SIZE_LEN
FRAME_SIZE_LEN

Public Class Methods

new(io) click to toggle source
# File lib/pulsar_sdk/protocol/reader.rb, line 9
def initialize(io)
  ensure_interface_implemented!(io)
  @io = io

  @readed = 0
end

Public Instance Methods

read_command() click to toggle source
# File lib/pulsar_sdk/protocol/reader.rb, line 35
def read_command
  cmd_size = read(CMD_SIZE_LEN, 'N')
  cmd_bytes = read(cmd_size)
  Pulsar::Proto::BaseCommand.decode(cmd_bytes)
end
read_frame_size() click to toggle source
# File lib/pulsar_sdk/protocol/reader.rb, line 28
def read_frame_size
  frame_size = read(FRAME_SIZE_LEN, 'N')
  # reset cursor! let's read the frame
  @readed = 0
  frame_size
end
read_fully() click to toggle source

TODO add timeout?

# File lib/pulsar_sdk/protocol/reader.rb, line 17
def read_fully
  frame_szie = read_frame_size
  raise "IO reader is empty! maybe server error, please check server log for help." if frame_szie.nil?

  base_cmd = read_command

  buffer = read_remaining(frame_szie)

  [base_cmd, buffer]
end
read_remaining(frame_szie) click to toggle source
# File lib/pulsar_sdk/protocol/reader.rb, line 41
def read_remaining(frame_szie)
  meta_and_payload_size = frame_szie - @readed
  return if meta_and_payload_size <= 0
  read(meta_and_payload_size)
end

Private Instance Methods

ensure_interface_implemented!(io) click to toggle source
# File lib/pulsar_sdk/protocol/reader.rb, line 48
def ensure_interface_implemented!(io)
  [:read, :closed?].each do |x|
    raise "io must implement method: #{x}" unless io.respond_to?(x)
  end
end
read(size, unpack = nil) click to toggle source
# File lib/pulsar_sdk/protocol/reader.rb, line 54
def read(size, unpack = nil)
  raise Errno::ECONNRESET if @io.closed?
  raise Errno::ETIMEDOUT unless IO.select([@io], nil)

  bytes = @io.read(size)
  @readed = @readed.to_i + size.to_i

  return bytes if unpack.nil? || bytes.nil?

  bytes.unpack(unpack).first
end