class Nsq::Producer

Attributes

topic[R]

Public Class Methods

new(opts = {}) click to toggle source
# File lib/nsq/producer.rb, line 7
def initialize(opts = {})
  @connections = {}
  @topic = opts[:topic]
  @discovery_interval = opts[:discovery_interval] || 60

  nsqlookupds = []
  if opts[:nsqlookupd]
    nsqlookupds = [opts[:nsqlookupd]].flatten
    discover_repeatedly(
      nsqlookupds: nsqlookupds,
      interval: @discovery_interval
    )

  elsif opts[:nsqd]
    nsqds = [opts[:nsqd]].flatten
    nsqds.each{|d| add_connection(d)}

  else
    add_connection('127.0.0.1:4150')
  end

  at_exit{terminate}
end

Public Instance Methods

write(*raw_messages) click to toggle source
# File lib/nsq/producer.rb, line 32
def write(*raw_messages)
  if !@topic
    raise 'No topic specified. Either specify a topic when instantiating the Producer or use write_to_topic.'
  end

  write_to_topic(@topic, *raw_messages)
end
write_to_topic(topic, *raw_messages) click to toggle source
# File lib/nsq/producer.rb, line 41
def write_to_topic(topic, *raw_messages)
  # return error if message(s) not provided
  raise ArgumentError, 'message not provided' if raw_messages.empty?

  # stringify the messages
  messages = raw_messages.map(&:to_s)

  # get a suitable connection to write to
  connection = connection_for_write

  if messages.length > 1
    connection.mpub(topic, messages)
  else
    connection.pub(topic, messages.first)
  end
end

Private Instance Methods

connection_for_write() click to toggle source
# File lib/nsq/producer.rb, line 60
def connection_for_write
  # Choose a random Connection that's currently connected
  # Or, if there's nothing connected, just take any random one
  connections_currently_connected = connections.select{|_,c| c.connected?}
  connection = connections_currently_connected.values.sample || connections.values.sample

  # Raise an exception if there's no connection available
  unless connection
    raise 'No connections available'
  end

  connection
end