class Nsq::Consumer
Attributes
max_in_flight[R]
Public Class Methods
new(opts = {})
click to toggle source
# File lib/nsq/consumer.rb, line 8 def initialize(opts = {}) if opts[:nsqlookupd] @nsqlookupds = [opts[:nsqlookupd]].flatten else @nsqlookupds = [] end @topic = opts[:topic] || raise(ArgumentError, 'topic is required') @channel = opts[:channel] || raise(ArgumentError, 'channel is required') @max_in_flight = opts[:max_in_flight] || 1 @discovery_interval = opts[:discovery_interval] || 60 @msg_timeout = opts[:msg_timeout] @max_attempts = opts[:max_attempts] @ssl_context = opts[:ssl_context] @tls_options = opts[:tls_options] @tls_v1 = opts[:tls_v1] # This is where we queue up the messages we receive from each connection @messages = opts[:queue] || Queue.new # This is where we keep a record of our active nsqd connections # The key is a string with the host and port of the instance (e.g. # '127.0.0.1:4150') and the key is the Connection instance. @connections = {} if !@nsqlookupds.empty? discover_repeatedly( nsqlookupds: @nsqlookupds, topic: @topic, interval: @discovery_interval ) else # normally, we find nsqd instances to connect to via nsqlookupd(s) # in this case let's connect to an nsqd instance directly add_connection(opts[:nsqd] || '127.0.0.1:4150', max_in_flight: @max_in_flight) end end
Public Instance Methods
pop()
click to toggle source
pop the next message off the queue
# File lib/nsq/consumer.rb, line 48 def pop @messages.pop end
pop_without_blocking()
click to toggle source
By default, if the internal queue is empty, pop will block until a new message comes in.
Calling this method won't block. If there are no messages, it just returns nil.
# File lib/nsq/consumer.rb, line 58 def pop_without_blocking @messages.pop(true) rescue ThreadError # When the Queue is empty calling `Queue#pop(true)` will raise a ThreadError nil end
size()
click to toggle source
returns the number of messages we have locally in the queue
# File lib/nsq/consumer.rb, line 67 def size @messages.size end
Private Instance Methods
add_connection(nsqd, options = {})
click to toggle source
Calls superclass method
Nsq::ClientBase#add_connection
# File lib/nsq/consumer.rb, line 73 def add_connection(nsqd, options = {}) super(nsqd, { topic: @topic, channel: @channel, queue: @messages, msg_timeout: @msg_timeout, max_in_flight: 1, max_attempts: @max_attempts }.merge(options)) end
connections_changed()
click to toggle source
# File lib/nsq/consumer.rb, line 89 def connections_changed redistribute_ready end
max_in_flight_per_connection(number_of_connections = @connections.length)
click to toggle source
Be conservative, but don't set a connection's max_in_flight
below 1
# File lib/nsq/consumer.rb, line 85 def max_in_flight_per_connection(number_of_connections = @connections.length) [@max_in_flight / number_of_connections, 1].max end
redistribute_ready()
click to toggle source
# File lib/nsq/consumer.rb, line 93 def redistribute_ready @connections.values.each do |connection| connection.max_in_flight = max_in_flight_per_connection connection.re_up_ready end end