class PulsarSdk::Protocol::Reader
Constants
- CMD_SIZE_LEN
- FRAME_SIZE_LEN
Public Class Methods
new(io)
click to toggle source
# File lib/pulsar_sdk/protocol/reader.rb, line 9 def initialize(io) ensure_interface_implemented!(io) @io = io @readed = 0 end
Public Instance Methods
read_command()
click to toggle source
# File lib/pulsar_sdk/protocol/reader.rb, line 35 def read_command cmd_size = read(CMD_SIZE_LEN, 'N') cmd_bytes = read(cmd_size) Pulsar::Proto::BaseCommand.decode(cmd_bytes) end
read_frame_size()
click to toggle source
# File lib/pulsar_sdk/protocol/reader.rb, line 28 def read_frame_size frame_size = read(FRAME_SIZE_LEN, 'N') # reset cursor! let's read the frame @readed = 0 frame_size end
read_fully()
click to toggle source
TODO add timeout?
# File lib/pulsar_sdk/protocol/reader.rb, line 17 def read_fully frame_szie = read_frame_size raise "IO reader is empty! maybe server error, please check server log for help." if frame_szie.nil? base_cmd = read_command buffer = read_remaining(frame_szie) [base_cmd, buffer] end
read_remaining(frame_szie)
click to toggle source
# File lib/pulsar_sdk/protocol/reader.rb, line 41 def read_remaining(frame_szie) meta_and_payload_size = frame_szie - @readed return if meta_and_payload_size <= 0 read(meta_and_payload_size) end
Private Instance Methods
ensure_interface_implemented!(io)
click to toggle source
# File lib/pulsar_sdk/protocol/reader.rb, line 48 def ensure_interface_implemented!(io) [:read, :closed?].each do |x| raise "io must implement method: #{x}" unless io.respond_to?(x) end end
read(size, unpack = nil)
click to toggle source
# File lib/pulsar_sdk/protocol/reader.rb, line 54 def read(size, unpack = nil) raise Errno::ECONNRESET if @io.closed? raise Errno::ETIMEDOUT unless IO.select([@io], nil) bytes = @io.read(size) @readed = @readed.to_i + size.to_i return bytes if unpack.nil? || bytes.nil? bytes.unpack(unpack).first end