class NATS::Protocol::Parser
Public Class Methods
new(nc)
click to toggle source
# File lib/nats/io/parser.rb, line 40 def initialize(nc) @nc = nc reset! end
Public Instance Methods
parse(data)
click to toggle source
# File lib/nats/io/parser.rb, line 56 def parse(data) @buf = @buf ? @buf << data : data while (@buf) case @parse_state when AWAITING_CONTROL_LINE case @buf when MSG @buf = $' @sub, @sid, @reply, @needed = $1, $2.to_i, $4, $5.to_i @parse_state = AWAITING_MSG_PAYLOAD when HMSG @buf = $' @sub, @sid, @reply, @header_needed, @needed = $1, $2.to_i, $4, $5.to_i, $6.to_i @parse_state = AWAITING_MSG_PAYLOAD when OK # No-op right now @buf = $' when ERR @buf = $' @nc.process_err($1) when PING @buf = $' @nc.process_ping when PONG @buf = $' @nc.process_pong when INFO @buf = $' # First INFO message is processed synchronously on connect, # and onwards we would be receiving asynchronously INFO commands # signaling possible changes in the topology of the NATS cluster. @nc.process_info($1) when UNKNOWN @buf = $' @nc.process_err("Unknown protocol: #{$1}") else # If we are here we do not have a complete line yet that we understand. return end @buf = nil if (@buf && @buf.empty?) when AWAITING_MSG_PAYLOAD return unless (@needed && @buf.bytesize >= (@needed + CR_LF_SIZE)) if @header_needed hbuf = @buf.slice(0, @header_needed) payload = @buf.slice(@header_needed, (@needed-@header_needed)) @nc.process_msg(@sub, @sid, @reply, payload, hbuf) @buf = @buf.slice((@needed + CR_LF_SIZE), @buf.bytesize) else @nc.process_msg(@sub, @sid, @reply, @buf.slice(0, @needed), nil) @buf = @buf.slice((@needed + CR_LF_SIZE), @buf.bytesize) end @sub = @sid = @reply = @needed = @header_needed = nil @parse_state = AWAITING_CONTROL_LINE @buf = nil if (@buf && @buf.empty?) end end end
reset!()
click to toggle source
# File lib/nats/io/parser.rb, line 45 def reset! @buf = nil @parse_state = AWAITING_CONTROL_LINE @sub = nil @sid = nil @reply = nil @needed = nil @header_needed = nil end