class Nsq::Consumer

Attributes

max_in_flight[R]

Public Class Methods

new(opts = {}) click to toggle source
# File lib/nsq/consumer.rb, line 8
def initialize(opts = {})
  if opts[:nsqlookupd]
    @nsqlookupds = [opts[:nsqlookupd]].flatten
  else
    @nsqlookupds = []
  end

  @topic = opts[:topic] || raise(ArgumentError, 'topic is required')
  @channel = opts[:channel] || raise(ArgumentError, 'channel is required')
  @max_in_flight = opts[:max_in_flight] || 1
  @discovery_interval = opts[:discovery_interval] || 60
  @msg_timeout = opts[:msg_timeout]

  # This is where we queue up the messages we receive from each connection
  @messages = opts[:queue] || Queue.new

  # This is where we keep a record of our active nsqd connections
  # The key is a string with the host and port of the instance (e.g.
  # '127.0.0.1:4150') and the key is the Connection instance.
  @connections = {}

  if !@nsqlookupds.empty?
    discover_repeatedly(
      nsqlookupds: @nsqlookupds,
      topic: @topic,
      interval: @discovery_interval
    )
  else
    # normally, we find nsqd instances to connect to via nsqlookupd(s)
    # in this case let's connect to an nsqd instance directly
    add_connection(opts[:nsqd] || '127.0.0.1:4150', max_in_flight: @max_in_flight)
  end

  at_exit{terminate}
end

Public Instance Methods

pop() click to toggle source

pop the next message off the queue

# File lib/nsq/consumer.rb, line 46
def pop
  @messages.pop
end
pop_without_blocking() click to toggle source

By default, if the internal queue is empty, pop will block until a new message comes in.

Calling this method won’t block. If there are no messages, it just returns nil.

# File lib/nsq/consumer.rb, line 56
def pop_without_blocking
  @messages.pop(true)
rescue ThreadError
  # When the Queue is empty calling `Queue#pop(true)` will raise a ThreadError
  nil
end
size() click to toggle source

returns the number of messages we have locally in the queue

# File lib/nsq/consumer.rb, line 65
def size
  @messages.size
end

Private Instance Methods

add_connection(nsqd, options = {}) click to toggle source
Calls superclass method Nsq::ClientBase#add_connection
# File lib/nsq/consumer.rb, line 71
def add_connection(nsqd, options = {})
  super(nsqd, {
    topic: @topic,
    channel: @channel,
    queue: @messages,
    msg_timeout: @msg_timeout,
    max_in_flight: 1
  }.merge(options))
end
connections_changed() click to toggle source
# File lib/nsq/consumer.rb, line 86
def connections_changed
  redistribute_ready
end
max_in_flight_per_connection(number_of_connections = @connections.length) click to toggle source

Be conservative, but don’t set a connection’s max_in_flight below 1

# File lib/nsq/consumer.rb, line 82
def max_in_flight_per_connection(number_of_connections = @connections.length)
  [@max_in_flight / number_of_connections, 1].max
end
redistribute_ready() click to toggle source
# File lib/nsq/consumer.rb, line 90
def redistribute_ready
  @connections.values.each do |connection|
    connection.max_in_flight = max_in_flight_per_connection
    connection.re_up_ready
  end
end