class Nsq::Connection

Constants

FRAME_CLASSES
RESPONSE_HEARTBEAT
RESPONSE_OK
USER_AGENT

Attributes

host[R]
max_in_flight[RW]
port[R]
presumed_in_flight[R]

Public Class Methods

new(opts = {}) click to toggle source
# File lib/nsq/connection.rb, line 25
def initialize(opts = {})
  @host = opts[:host] || (raise ArgumentError, 'host is required')
  @port = opts[:port] || (raise ArgumentError, 'host is required')
  @queue = opts[:queue]
  @topic = opts[:topic]
  @channel = opts[:channel]
  @msg_timeout = opts[:msg_timeout] || 60_000 # 60s
  @max_in_flight = opts[:max_in_flight] || 1

  if @msg_timeout < 1000
    raise ArgumentError, 'msg_timeout cannot be less than 1000. it\'s in milliseconds.'
  end

  # for outgoing communication
  @write_queue = Queue.new

  # For indicating that the connection has died.
  # We use a Queue so we don't have to poll. Used to communicate across
  # threads (from write_loop and read_loop to connect_and_monitor).
  @death_queue = Queue.new

  @connected = false
  @presumed_in_flight = 0

  open_connection
  start_monitoring_connection
end

Public Instance Methods

close() click to toggle source

close the connection and don’t try to re-open it

# File lib/nsq/connection.rb, line 60
def close
  stop_monitoring_connection
  close_connection
end
connected?() click to toggle source
# File lib/nsq/connection.rb, line 54
def connected?
  @connected
end
fin(message_id) click to toggle source
# File lib/nsq/connection.rb, line 76
def fin(message_id)
  write "FIN #{message_id}\n"
  decrement_in_flight
end
mpub(topic, messages) click to toggle source
# File lib/nsq/connection.rb, line 98
def mpub(topic, messages)
  body = messages.map do |message|
    [message.bytesize, message].pack('Na*')
  end.join

  write ["MPUB #{topic}\n", body.bytesize, messages.size, body].pack('a*NNa*')
end
pub(topic, message) click to toggle source
# File lib/nsq/connection.rb, line 93
def pub(topic, message)
  write ["PUB #{topic}\n", message.bytesize, message].pack('a*Na*')
end
rdy(count) click to toggle source
# File lib/nsq/connection.rb, line 71
def rdy(count)
  write "RDY #{count}\n"
end
re_up_ready() click to toggle source

Tell the server we are ready for more messages!

# File lib/nsq/connection.rb, line 108
def re_up_ready
  rdy(@max_in_flight)
  # assume these messages are coming our way. yes, this might not be the
  # case, but it's much easier to manage our RDY state with the server if
  # we treat things this way.
  @presumed_in_flight = @max_in_flight
end
req(message_id, timeout) click to toggle source
# File lib/nsq/connection.rb, line 82
def req(message_id, timeout)
  write "REQ #{message_id} #{timeout}\n"
  decrement_in_flight
end
sub(topic, channel) click to toggle source
# File lib/nsq/connection.rb, line 66
def sub(topic, channel)
  write "SUB #{topic} #{channel}\n"
end
touch(message_id) click to toggle source
# File lib/nsq/connection.rb, line 88
def touch(message_id)
  write "TOUCH #{message_id}\n"
end

Private Instance Methods

close_connection() click to toggle source

closes the connection and stops listening for messages

# File lib/nsq/connection.rb, line 332
def close_connection
  cls if connected?
  stop_read_loop
  stop_write_loop
  @socket = nil
  @connected = false
end
cls() click to toggle source
# File lib/nsq/connection.rb, line 119
def cls
  write "CLS\n"
end
decrement_in_flight() click to toggle source
# File lib/nsq/connection.rb, line 200
def decrement_in_flight
  @presumed_in_flight -= 1

  if server_needs_rdy_re_ups?
    # now that we're less than @max_in_flight we might need to re-up our RDY state
    threshold = (@max_in_flight * 0.2).ceil
    re_up_ready if @presumed_in_flight <= threshold
  end
end
die(reason) click to toggle source

this is called when there’s a connection error in the read or write loop it triggers ‘connect_and_monitor` to try to reconnect

# File lib/nsq/connection.rb, line 343
def die(reason)
  @connected = false
  @death_queue.push(reason)
end
frame_class_for_type(type) click to toggle source
# File lib/nsq/connection.rb, line 194
def frame_class_for_type(type)
  raise "Bad frame type specified: #{type}" if type > FRAME_CLASSES.length - 1
  [Response, Error, Message][type]
end
handle_response(frame) click to toggle source
# File lib/nsq/connection.rb, line 170
def handle_response(frame)
  if frame.data == RESPONSE_HEARTBEAT
    debug 'Received heartbeat'
    nop
  elsif frame.data == RESPONSE_OK
    debug 'Received OK'
  else
    die "Received response we don't know how to handle: #{frame.data}"
  end
end
identify() click to toggle source
# File lib/nsq/connection.rb, line 140
def identify
  hostname = Socket.gethostname
  metadata = {
    client_id: Socket.gethostbyname(hostname).flatten.compact.first,
    hostname: hostname,
    feature_negotiation: true,
    heartbeat_interval: 30_000, # 30 seconds
    output_buffer: 16_000, # 16kb
    output_buffer_timeout: 250, # 250ms
    tls_v1: false,
    snappy: false,
    deflate: false,
    sample_rate: 0, # disable sampling
    user_agent: USER_AGENT,
    msg_timeout: @msg_timeout
  }.to_json
  write_to_socket ["IDENTIFY\n", metadata.length, metadata].pack('a*Na*')

  # Now wait for the response!
  frame = receive_frame
  server = JSON.parse(frame.data)

  if @max_in_flight > server['max_rdy_count']
    raise "max_in_flight is set to #{@max_in_flight}, server only supports #{server['max_rdy_count']}"
  end

  @server_version = server['version']
end
monitor_connection() click to toggle source
# File lib/nsq/connection.rb, line 284
def monitor_connection
  loop do
    # wait for death, hopefully it never comes
    cause_of_death = @death_queue.pop
    warn "Died from: #{cause_of_death}"

    debug 'Reconnecting...'
    reconnect
    debug 'Reconnected!'

    # clear all death messages, since we're now reconnected.
    # we don't want to complete this loop and immediately reconnect again.
    @death_queue.clear
  end
end
nop() click to toggle source
# File lib/nsq/connection.rb, line 124
def nop
  write "NOP\n"
end
open_connection() click to toggle source
# File lib/nsq/connection.rb, line 311
def open_connection
  @socket = TCPSocket.new(@host, @port)
  # write the version and IDENTIFY directly to the socket to make sure
  # it gets to nsqd ahead of anything in the `@write_queue`
  write_to_socket '  V2'
  identify

  start_read_loop
  start_write_loop
  @connected = true

  # we need to re-subscribe if there's a topic specified
  if @topic
    debug "Subscribing to #{@topic}"
    sub(@topic, @channel)
    re_up_ready
  end
end
read_loop() click to toggle source
# File lib/nsq/connection.rb, line 222
def read_loop
  loop do
    frame = receive_frame
    if frame.is_a?(Response)
      handle_response(frame)
    elsif frame.is_a?(Error)
      error "Error received: #{frame.data}"
    elsif frame.is_a?(Message)
      debug "<<< #{frame.body}"
      @queue.push(frame) if @queue
    else
      raise 'No data from socket'
    end
  end
rescue Exception => ex
  die(ex)
end
receive_frame() click to toggle source
# File lib/nsq/connection.rb, line 182
def receive_frame
  if buffer = @socket.read(8)
    size, type = buffer.unpack('NN')
    size -= 4 # we want the size of the data part and type already took up 4 bytes
    data = @socket.read(size)
    frame_class = frame_class_for_type(type)
    return frame_class.new(data, self)
  end
end
reconnect() click to toggle source

close the connection if it’s not already closed and try to reconnect over and over until we succeed!

# File lib/nsq/connection.rb, line 303
def reconnect
  close_connection
  with_retries do
    open_connection
  end
end
server_needs_rdy_re_ups?() click to toggle source
# File lib/nsq/connection.rb, line 393
def server_needs_rdy_re_ups?
  # versions less than 0.3.0 need RDY re-ups
  # see: https://github.com/bitly/nsq/blob/master/ChangeLog.md#030---2014-11-18
  major, minor, patch = @server_version.split('.').map(&:to_i)
  major == 0 && minor <= 2
end
snooze(t) click to toggle source

Se we can stub for testing and reconnect in a tight loop

# File lib/nsq/connection.rb, line 388
def snooze(t)
  sleep(t)
end
start_monitoring_connection() click to toggle source

Waits for death of connection

# File lib/nsq/connection.rb, line 272
def start_monitoring_connection
  @connection_monitor_thread ||= Thread.new{monitor_connection}
  @connection_monitor_thread.abort_on_exception = true
end
start_read_loop() click to toggle source
# File lib/nsq/connection.rb, line 211
def start_read_loop
  @read_loop_thread ||= Thread.new{read_loop}
end
start_write_loop() click to toggle source
# File lib/nsq/connection.rb, line 241
def start_write_loop
  @write_loop_thread ||= Thread.new{write_loop}
end
stop_monitoring_connection() click to toggle source
# File lib/nsq/connection.rb, line 278
def stop_monitoring_connection
  @connection_monitor_thread.kill if @connection_monitor_thread
  @connection_monitor = nil
end
stop_read_loop() click to toggle source
# File lib/nsq/connection.rb, line 216
def stop_read_loop
  @read_loop_thread.kill if @read_loop_thread
  @read_loop_thread = nil
end
stop_write_loop() click to toggle source
# File lib/nsq/connection.rb, line 246
def stop_write_loop
  @stop_write_loop = true
  @write_loop_thread.join(1) if @write_loop_thread
  @write_loop_thread = nil
end
with_retries(&block) click to toggle source

Retry the supplied block with exponential backoff.

Borrowed liberally from: github.com/ooyala/retries/blob/master/lib/retries.rb

# File lib/nsq/connection.rb, line 353
def with_retries(&block)
  base_sleep_seconds = 0.5
  max_sleep_seconds = 300 # 5 minutes

  # Let's do this thing
  attempts = 0
  start_time = Time.now

  begin
    attempts += 1
    return block.call(attempts)

  rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH,
         Errno::ENETDOWN, Errno::ENETUNREACH, Errno::ETIMEDOUT, Timeout::Error => ex

    raise ex if attempts >= 100

    # The sleep time is an exponentially-increasing function of base_sleep_seconds.
    # But, it never exceeds max_sleep_seconds.
    sleep_seconds = [base_sleep_seconds * (2 ** (attempts - 1)), max_sleep_seconds].min
    # Randomize to a random value in the range sleep_seconds/2 .. sleep_seconds
    sleep_seconds = sleep_seconds * (0.5 * (1 + rand()))
    # But never sleep less than base_sleep_seconds
    sleep_seconds = [base_sleep_seconds, sleep_seconds].max

    warn "Failed to connect: #{ex}. Retrying in #{sleep_seconds.round(1)} seconds."

    snooze(sleep_seconds)

    retry
  end
end
write(raw) click to toggle source
# File lib/nsq/connection.rb, line 129
def write(raw)
  @write_queue.push(raw)
end
write_loop() click to toggle source
# File lib/nsq/connection.rb, line 253
def write_loop
  @stop_write_loop = false
  data = nil
  loop do
    data = @write_queue.pop
    write_to_socket(data)
    break if @stop_write_loop && @write_queue.size == 0
  end
rescue Exception => ex
  # requeue PUB and MPUB commands
  if data =~ /^M?PUB/
    debug "Requeueing to write_queue: #{data.inspect}"
    @write_queue.push(data)
  end
  die(ex)
end
write_to_socket(raw) click to toggle source
# File lib/nsq/connection.rb, line 134
def write_to_socket(raw)
  debug ">>> #{raw.inspect}"
  @socket.write(raw)
end