class Nsq::Producer
Attributes
topic[R]
Public Class Methods
new(opts = {})
click to toggle source
# File lib/nsq/producer.rb, line 7 def initialize(opts = {}) @connections = {} @topic = opts[:topic] @discovery_interval = opts[:discovery_interval] || 60 nsqlookupds = [] if opts[:nsqlookupd] nsqlookupds = [opts[:nsqlookupd]].flatten discover_repeatedly( nsqlookupds: nsqlookupds, interval: @discovery_interval ) elsif opts[:nsqd] nsqds = [opts[:nsqd]].flatten nsqds.each{|d| add_connection(d)} else add_connection('127.0.0.1:4150') end at_exit{terminate} end
Public Instance Methods
write(*raw_messages)
click to toggle source
# File lib/nsq/producer.rb, line 32 def write(*raw_messages) if !@topic raise 'No topic specified. Either specify a topic when instantiating the Producer or use write_to_topic.' end write_to_topic(@topic, *raw_messages) end
write_to_topic(topic, *raw_messages)
click to toggle source
# File lib/nsq/producer.rb, line 41 def write_to_topic(topic, *raw_messages) # return error if message(s) not provided raise ArgumentError, 'message not provided' if raw_messages.empty? # stringify the messages messages = raw_messages.map(&:to_s) # get a suitable connection to write to connection = connection_for_write if messages.length > 1 connection.mpub(topic, messages) else connection.pub(topic, messages.first) end end
Private Instance Methods
connection_for_write()
click to toggle source
# File lib/nsq/producer.rb, line 60 def connection_for_write # Choose a random Connection that's currently connected # Or, if there's nothing connected, just take any random one connections_currently_connected = connections.select{|_,c| c.connected?} connection = connections_currently_connected.values.sample || connections.values.sample # Raise an exception if there's no connection available unless connection raise 'No connections available' end connection end