class Nsq::Connection
Constants
- FRAME_CLASSES
- RESPONSE_HEARTBEAT
- RESPONSE_OK
- USER_AGENT
Attributes
host[R]
max_in_flight[RW]
port[R]
presumed_in_flight[R]
Public Class Methods
new(opts = {})
click to toggle source
# File lib/nsq/connection.rb, line 25 def initialize(opts = {}) @host = opts[:host] || (raise ArgumentError, 'host is required') @port = opts[:port] || (raise ArgumentError, 'host is required') @queue = opts[:queue] @topic = opts[:topic] @channel = opts[:channel] @msg_timeout = opts[:msg_timeout] || 60_000 # 60s @max_in_flight = opts[:max_in_flight] || 1 if @msg_timeout < 1000 raise ArgumentError, 'msg_timeout cannot be less than 1000. it\'s in milliseconds.' end # for outgoing communication @write_queue = Queue.new # For indicating that the connection has died. # We use a Queue so we don't have to poll. Used to communicate across # threads (from write_loop and read_loop to connect_and_monitor). @death_queue = Queue.new @connected = false @presumed_in_flight = 0 open_connection start_monitoring_connection end
Public Instance Methods
close()
click to toggle source
close the connection and don’t try to re-open it
# File lib/nsq/connection.rb, line 60 def close stop_monitoring_connection close_connection end
connected?()
click to toggle source
# File lib/nsq/connection.rb, line 54 def connected? @connected end
fin(message_id)
click to toggle source
# File lib/nsq/connection.rb, line 76 def fin(message_id) write "FIN #{message_id}\n" decrement_in_flight end
mpub(topic, messages)
click to toggle source
# File lib/nsq/connection.rb, line 98 def mpub(topic, messages) body = messages.map do |message| [message.bytesize, message].pack('Na*') end.join write ["MPUB #{topic}\n", body.bytesize, messages.size, body].pack('a*NNa*') end
pub(topic, message)
click to toggle source
# File lib/nsq/connection.rb, line 93 def pub(topic, message) write ["PUB #{topic}\n", message.bytesize, message].pack('a*Na*') end
rdy(count)
click to toggle source
# File lib/nsq/connection.rb, line 71 def rdy(count) write "RDY #{count}\n" end
re_up_ready()
click to toggle source
Tell the server we are ready for more messages!
# File lib/nsq/connection.rb, line 108 def re_up_ready rdy(@max_in_flight) # assume these messages are coming our way. yes, this might not be the # case, but it's much easier to manage our RDY state with the server if # we treat things this way. @presumed_in_flight = @max_in_flight end
req(message_id, timeout)
click to toggle source
# File lib/nsq/connection.rb, line 82 def req(message_id, timeout) write "REQ #{message_id} #{timeout}\n" decrement_in_flight end
sub(topic, channel)
click to toggle source
# File lib/nsq/connection.rb, line 66 def sub(topic, channel) write "SUB #{topic} #{channel}\n" end
touch(message_id)
click to toggle source
# File lib/nsq/connection.rb, line 88 def touch(message_id) write "TOUCH #{message_id}\n" end
Private Instance Methods
close_connection()
click to toggle source
closes the connection and stops listening for messages
# File lib/nsq/connection.rb, line 332 def close_connection cls if connected? stop_read_loop stop_write_loop @socket = nil @connected = false end
cls()
click to toggle source
# File lib/nsq/connection.rb, line 119 def cls write "CLS\n" end
decrement_in_flight()
click to toggle source
# File lib/nsq/connection.rb, line 200 def decrement_in_flight @presumed_in_flight -= 1 if server_needs_rdy_re_ups? # now that we're less than @max_in_flight we might need to re-up our RDY state threshold = (@max_in_flight * 0.2).ceil re_up_ready if @presumed_in_flight <= threshold end end
die(reason)
click to toggle source
this is called when there’s a connection error in the read or write loop it triggers ‘connect_and_monitor` to try to reconnect
# File lib/nsq/connection.rb, line 343 def die(reason) @connected = false @death_queue.push(reason) end
frame_class_for_type(type)
click to toggle source
# File lib/nsq/connection.rb, line 194 def frame_class_for_type(type) raise "Bad frame type specified: #{type}" if type > FRAME_CLASSES.length - 1 [Response, Error, Message][type] end
handle_response(frame)
click to toggle source
# File lib/nsq/connection.rb, line 170 def handle_response(frame) if frame.data == RESPONSE_HEARTBEAT debug 'Received heartbeat' nop elsif frame.data == RESPONSE_OK debug 'Received OK' else die "Received response we don't know how to handle: #{frame.data}" end end
identify()
click to toggle source
# File lib/nsq/connection.rb, line 140 def identify hostname = Socket.gethostname metadata = { client_id: Socket.gethostbyname(hostname).flatten.compact.first, hostname: hostname, feature_negotiation: true, heartbeat_interval: 30_000, # 30 seconds output_buffer: 16_000, # 16kb output_buffer_timeout: 250, # 250ms tls_v1: false, snappy: false, deflate: false, sample_rate: 0, # disable sampling user_agent: USER_AGENT, msg_timeout: @msg_timeout }.to_json write_to_socket ["IDENTIFY\n", metadata.length, metadata].pack('a*Na*') # Now wait for the response! frame = receive_frame server = JSON.parse(frame.data) if @max_in_flight > server['max_rdy_count'] raise "max_in_flight is set to #{@max_in_flight}, server only supports #{server['max_rdy_count']}" end @server_version = server['version'] end
monitor_connection()
click to toggle source
# File lib/nsq/connection.rb, line 284 def monitor_connection loop do # wait for death, hopefully it never comes cause_of_death = @death_queue.pop warn "Died from: #{cause_of_death}" debug 'Reconnecting...' reconnect debug 'Reconnected!' # clear all death messages, since we're now reconnected. # we don't want to complete this loop and immediately reconnect again. @death_queue.clear end end
nop()
click to toggle source
# File lib/nsq/connection.rb, line 124 def nop write "NOP\n" end
open_connection()
click to toggle source
# File lib/nsq/connection.rb, line 311 def open_connection @socket = TCPSocket.new(@host, @port) # write the version and IDENTIFY directly to the socket to make sure # it gets to nsqd ahead of anything in the `@write_queue` write_to_socket ' V2' identify start_read_loop start_write_loop @connected = true # we need to re-subscribe if there's a topic specified if @topic debug "Subscribing to #{@topic}" sub(@topic, @channel) re_up_ready end end
read_loop()
click to toggle source
# File lib/nsq/connection.rb, line 222 def read_loop loop do frame = receive_frame if frame.is_a?(Response) handle_response(frame) elsif frame.is_a?(Error) error "Error received: #{frame.data}" elsif frame.is_a?(Message) debug "<<< #{frame.body}" @queue.push(frame) if @queue else raise 'No data from socket' end end rescue Exception => ex die(ex) end
receive_frame()
click to toggle source
# File lib/nsq/connection.rb, line 182 def receive_frame if buffer = @socket.read(8) size, type = buffer.unpack('NN') size -= 4 # we want the size of the data part and type already took up 4 bytes data = @socket.read(size) frame_class = frame_class_for_type(type) return frame_class.new(data, self) end end
reconnect()
click to toggle source
close the connection if it’s not already closed and try to reconnect over and over until we succeed!
# File lib/nsq/connection.rb, line 303 def reconnect close_connection with_retries do open_connection end end
server_needs_rdy_re_ups?()
click to toggle source
# File lib/nsq/connection.rb, line 393 def server_needs_rdy_re_ups? # versions less than 0.3.0 need RDY re-ups # see: https://github.com/bitly/nsq/blob/master/ChangeLog.md#030---2014-11-18 major, minor, patch = @server_version.split('.').map(&:to_i) major == 0 && minor <= 2 end
snooze(t)
click to toggle source
Se we can stub for testing and reconnect in a tight loop
# File lib/nsq/connection.rb, line 388 def snooze(t) sleep(t) end
start_monitoring_connection()
click to toggle source
Waits for death of connection
# File lib/nsq/connection.rb, line 272 def start_monitoring_connection @connection_monitor_thread ||= Thread.new{monitor_connection} @connection_monitor_thread.abort_on_exception = true end
start_read_loop()
click to toggle source
# File lib/nsq/connection.rb, line 211 def start_read_loop @read_loop_thread ||= Thread.new{read_loop} end
start_write_loop()
click to toggle source
# File lib/nsq/connection.rb, line 241 def start_write_loop @write_loop_thread ||= Thread.new{write_loop} end
stop_monitoring_connection()
click to toggle source
# File lib/nsq/connection.rb, line 278 def stop_monitoring_connection @connection_monitor_thread.kill if @connection_monitor_thread @connection_monitor = nil end
stop_read_loop()
click to toggle source
# File lib/nsq/connection.rb, line 216 def stop_read_loop @read_loop_thread.kill if @read_loop_thread @read_loop_thread = nil end
stop_write_loop()
click to toggle source
# File lib/nsq/connection.rb, line 246 def stop_write_loop @stop_write_loop = true @write_loop_thread.join(1) if @write_loop_thread @write_loop_thread = nil end
with_retries(&block)
click to toggle source
Retry the supplied block with exponential backoff.
Borrowed liberally from: github.com/ooyala/retries/blob/master/lib/retries.rb
# File lib/nsq/connection.rb, line 353 def with_retries(&block) base_sleep_seconds = 0.5 max_sleep_seconds = 300 # 5 minutes # Let's do this thing attempts = 0 start_time = Time.now begin attempts += 1 return block.call(attempts) rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, Errno::ENETDOWN, Errno::ENETUNREACH, Errno::ETIMEDOUT, Timeout::Error => ex raise ex if attempts >= 100 # The sleep time is an exponentially-increasing function of base_sleep_seconds. # But, it never exceeds max_sleep_seconds. sleep_seconds = [base_sleep_seconds * (2 ** (attempts - 1)), max_sleep_seconds].min # Randomize to a random value in the range sleep_seconds/2 .. sleep_seconds sleep_seconds = sleep_seconds * (0.5 * (1 + rand())) # But never sleep less than base_sleep_seconds sleep_seconds = [base_sleep_seconds, sleep_seconds].max warn "Failed to connect: #{ex}. Retrying in #{sleep_seconds.round(1)} seconds." snooze(sleep_seconds) retry end end
write(raw)
click to toggle source
# File lib/nsq/connection.rb, line 129 def write(raw) @write_queue.push(raw) end
write_loop()
click to toggle source
# File lib/nsq/connection.rb, line 253 def write_loop @stop_write_loop = false data = nil loop do data = @write_queue.pop write_to_socket(data) break if @stop_write_loop && @write_queue.size == 0 end rescue Exception => ex # requeue PUB and MPUB commands if data =~ /^M?PUB/ debug "Requeueing to write_queue: #{data.inspect}" @write_queue.push(data) end die(ex) end
write_to_socket(raw)
click to toggle source
# File lib/nsq/connection.rb, line 134 def write_to_socket(raw) debug ">>> #{raw.inspect}" @socket.write(raw) end