class NATS::Protocol::Parser

Public Class Methods

new(nc) click to toggle source
# File lib/nats/io/parser.rb, line 40
def initialize(nc)
  @nc = nc
  reset!
end

Public Instance Methods

parse(data) click to toggle source
# File lib/nats/io/parser.rb, line 56
def parse(data)
  @buf = @buf ? @buf << data : data
  while (@buf)
    case @parse_state
    when AWAITING_CONTROL_LINE
      case @buf
      when MSG
        @buf = $'
        @sub, @sid, @reply, @needed = $1, $2.to_i, $4, $5.to_i
        @parse_state = AWAITING_MSG_PAYLOAD
      when HMSG
        @buf = $'
        @sub, @sid, @reply, @header_needed, @needed = $1, $2.to_i, $4, $5.to_i, $6.to_i
        @parse_state = AWAITING_MSG_PAYLOAD
      when OK # No-op right now
        @buf = $'
      when ERR
        @buf = $'
        @nc.process_err($1)
      when PING
        @buf = $'
        @nc.process_ping
      when PONG
        @buf = $'
        @nc.process_pong
      when INFO
        @buf = $'
        # First INFO message is processed synchronously on connect,
        # and onwards we would be receiving asynchronously INFO commands
        # signaling possible changes in the topology of the NATS cluster.
        @nc.process_info($1)
      when UNKNOWN
        @buf = $'
        @nc.process_err("Unknown protocol: #{$1}")
      else
        # If we are here we do not have a complete line yet that we understand.
        return
      end
      @buf = nil if (@buf && @buf.empty?)

    when AWAITING_MSG_PAYLOAD
      return unless (@needed && @buf.bytesize >= (@needed + CR_LF_SIZE))
      if @header_needed
        hbuf = @buf.slice(0, @header_needed)
        payload = @buf.slice(@header_needed, (@needed-@header_needed))
        @nc.process_msg(@sub, @sid, @reply, payload, hbuf)
        @buf = @buf.slice((@needed + CR_LF_SIZE), @buf.bytesize)
      else
        @nc.process_msg(@sub, @sid, @reply, @buf.slice(0, @needed), nil)
        @buf = @buf.slice((@needed + CR_LF_SIZE), @buf.bytesize)
      end

      @sub = @sid = @reply = @needed = @header_needed = nil
      @parse_state = AWAITING_CONTROL_LINE
      @buf = nil if (@buf && @buf.empty?)
    end
  end
end
reset!() click to toggle source
# File lib/nats/io/parser.rb, line 45
def reset!
  @buf = nil
  @parse_state = AWAITING_CONTROL_LINE

  @sub = nil
  @sid = nil
  @reply = nil
  @needed = nil
  @header_needed = nil
end