module Fix::Engine::Connection

The client connection handling logic and method overrides

Constants

TEST_REQ_GRACE_TIME

Grace time before we disconnect a client that doesn’t reply to a test request

Attributes

comp_id[RW]
hrtbt_int[RW]
ip[RW]
last_request_at[RW]
msg_buf[RW]
port[RW]
target_comp_id[RW]

Public Instance Methods

keep_alive() click to toggle source

Keeps the connection alive by sending regular heartbeats, and test request messages whenever the connection has been idl’ing for too long

# File lib/fix/engine/connection.rb, line 56
def keep_alive
  @last_send_at     ||= 0
  @last_request_at  ||= 0
  @hrtbt_int        ||= 0

  # Send a regular heartbeat when we don't send anything down the line for a while
  if @hrtbt_int > 0 && (@last_send_at < (Time.now.to_i - @hrtbt_int))
    send_heartbeat
  end

  # Trigger a test req message when we haven't received anything for a while
  if !@pending_test_req_id && (last_request_at < (Time.now.to_i - @hrtbt_int))
    send_test_request
  end
end
kill!() click to toggle source

Kills the connection after sending a logout message, if applicable

# File lib/fix/engine/connection.rb, line 125
def kill!
  if @target_comp_id
    log("Logging out #{peer}")

    logout = FP::Messages::Logout.new
    logout.text = 'Bye!'

    send_msg(logout)
  end

  close_connection_after_writing
end
peer() click to toggle source

The way we refer to our connection peer in various logs and messages

# File lib/fix/engine/connection.rb, line 35
def peer
  "server"
end
peer_error(error_msg, msg_seq_num) click to toggle source

Notifies the connected peer it fucked up somehow and kill the connection

@param error_msg [String] The reason to embed in the reject message @param msg_seq_num [Fixnum] The message sequence number this error pertains to

# File lib/fix/engine/connection.rb, line 152
def peer_error(error_msg, msg_seq_num)
  log("Notifying #{peer} of error: <#{error_msg}> and terminating")

  rjct              = FP::Messages::Reject.new
  rjct.text         = error_msg
  rjct.ref_seq_num  = msg_seq_num

  send_msg(rjct)
  kill!
end
post_init() click to toggle source

Initialize the messages array, our comp_id, and the expected message sequence number

# File lib/fix/engine/connection.rb, line 25
def post_init
  @expected_seq_num = 1

  # The sent messages
  @messages = []
end
process_msg(msg) click to toggle source

Maintains the message sequence consistency before handing off the message to #handle_msg

# File lib/fix/engine/connection.rb, line 166
def process_msg(msg)
  @recv_seq_num = msg.msg_seq_num

  log("Received a <#{msg.class}> from #{peer} with sequence number <#{msg.msg_seq_num}>")

  # If sequence number == expected, then process it normally
  if (@expected_seq_num == @recv_seq_num)
    if @comp_id && msg.target_comp_id != @comp_id
      @target_comp_id = msg.sender_comp_id

      # Whoops, incorrect COMP_ID received, kill it with fire
      if (msg.target_comp_id != @comp_id)
        peer_error("Incorrect TARGET_COMP_ID in message, expected <#{@comp_id}>, got <#{msg.target_comp_id}>", msg.header.msg_seq_num)
      end

    else
      if msg.is_a?(FP::Messages::Heartbeat)
        # If we were expecting an answer to a test request we can sign it off and
        # cancel the scheduled connection termination
        if @pending_test_req_id && msg.test_req_id && (@pending_test_req_id == msg.test_req_id)
          @pending_test_req_id = nil
        end

      elsif msg.is_a?(FP::Messages::TestRequest)
        # Answer test requests with a matching heartbeat
        hb = FP::Messages::Heartbeat.new
        hb.test_req_id = msg.test_req_id
        send_msg(hb)

      elsif msg.is_a?(FP::Messages::ResendRequest)
        # Re-send requested message range
        @messages[msg.begin_seq_no - 1, (msg.end_seq_no.zero? ? @messages.length : (msg.end_seq_no - msg.begin_seq_no + 1))].each do |m|
          log("Re-sending <#{m.class}> to <#{ip}:#{port}> with sequence number <#{m.msg_seq_num}>")
          send_data(m.dump)
          @last_send_at = Time.now.to_i
        end

      elsif msg.is_a?(FP::Message)
        run_message_handler(msg)
      end
    end

    @expected_seq_num += 1

  elsif (@expected_seq_num > @recv_seq_num)
    log("Ignoring message <#{msg}> with stale sequence number <#{msg.msg_seq_num}>, expecting <#{@expected_seq_num}>")

  elsif (@expected_seq_num < @recv_seq_num) && @target_comp_id
    # Request missing range when detect a gap
    rr = FP::Messages::ResendRequest.new
    rr.begin_seq_no = @expected_seq_num
    send_msg(rr)
  end

  self.last_request_at = Time.now.to_i
end
receive_data(data) click to toggle source

Run when a client has sent a chunk of data, it gets appended to a buffer and a parsing attempt is made at the buffered data

@param data [String] The received data chunk

# File lib/fix/engine/connection.rb, line 239
def receive_data(data)
  @buf ||= MessageBuffer.new do |parsed|
    if (parsed.class == FP::ParseFailure) || !parsed.errors.count.zero?
      peer_error("#{parsed.message} -- #{parsed.errors.join(", ")}", @expected_seq_num)
      log("Failed to parse message <#{parsed.message}>")
      parsed.errors.each { |err| log(" >>> #{err}") }

    else
      process_msg(parsed)

    end
  end

  begin
    @buf.add_data(data)
  rescue
    log("Raised exception by #{peer} when parsing data <#{@buf.msg_buf.gsub(/\x01/, '|')}>, terminating.")
    log($!.message + $!.backtrace.join("\n"))
    kill!
  end
end
run_message_handler(msg) click to toggle source

Runs the defined message handler for the message’s class

@param msg [FP::Message] The message to handle

# File lib/fix/engine/connection.rb, line 228
def run_message_handler(msg)
  m = "on_#{msg.class.to_s.split('::').last.gsub(/(.)([A-Z])/, '\1_\2').downcase}".to_sym
  send(m, msg) if respond_to?(m)
end
send_heartbeat(test_req_id = nil) click to toggle source

Sends a heartbeat message with an optional test_req_id parameter

@param test_req_id [String] Sets the test request ID if sent in response to a test request

# File lib/fix/engine/connection.rb, line 91
def send_heartbeat(test_req_id = nil)
  msg = FP::Messages::Heartbeat.new
  test_req_id && msg.test_req_id = test_req_id
  send_msg(msg)
end
send_msg(msg) click to toggle source

Sends a Fix::Protocol::Message to the connected peer

@param msg [Fix::Protocol::Message] The message to send

# File lib/fix/engine/connection.rb, line 102
def send_msg(msg)
  @send_seq_num ||= 1

  msg.msg_seq_num     = @send_seq_num
  msg.sender_comp_id  = @comp_id
  msg.target_comp_id  = @target_comp_id

  log("Sending <#{msg.class}> to #{peer} with sequence number <#{msg.msg_seq_num}>")

  if msg.valid?
    @messages[msg.msg_seq_num] = msg
    send_data(msg.dump)
    @send_seq_num += 1
    @last_send_at = Time.now.to_i
  else
    log(msg.errors.join(', '))
    raise "Tried to send invalid message! <#{msg.errors.join(', ')}>"
  end
end
send_test_request() click to toggle source

Sends a test request and expects an answer before TEST_REQ_GRACE_TIME

# File lib/fix/engine/connection.rb, line 75
def send_test_request
  tr = FP::Messages::TestRequest.new
  tr.test_req_id = SecureRandom.hex(6)
  send_msg(tr)
  @pending_test_req_id = tr.test_req_id

  EM.add_timer(TEST_REQ_GRACE_TIME) do
    @pending_test_req_id && kill!
  end
end
set_heartbeat_interval(interval) click to toggle source

Sets the heartbeat interval and schedules the keep alive call

@param interval [Fixnum] The frequency in seconds at which a heartbeat should be emitted

# File lib/fix/engine/connection.rb, line 44
def set_heartbeat_interval(interval)
  @hrtbt_int && raise("Can't set heartbeat interval twice")
  @hrtbt_int = interval

  log("Heartbeat interval for #{peer} : <#{hrtbt_int}s>")
  @keep_alive_timer = EM.add_periodic_timer(1) { keep_alive }
end
unbind() click to toggle source

Cleans up after we’re done

# File lib/fix/engine/connection.rb, line 141
def unbind
  log("Terminating connection to #{peer}")
  @keep_alive_timer && @keep_alive_timer.cancel
end