class Krakow::Consumer
Consume messages from a server
Consume messages from a server
Attributes
Public Class Methods
@!endgroup
# File lib/krakow/consumer.rb, line 41 def initialize(args={}) super arguments[:connection_options] = {:features => {}, :config => {}}.merge( arguments[:connection_options] || {} ) @connections = {} @queue = Queue.new( current_actor, :removal_callback => :remove_message ) @distribution = Distribution::Default.new( :max_in_flight => max_in_flight, :backoff_interval => backoff_interval, :consumer => current_actor ) if(nsqlookupd) debug "Connections will be established via lookup #{nsqlookupd.inspect}" @discovery = Discovery.new(:nsqlookupd => nsqlookupd) discover elsif(host && port) direct_connect else abort Error::ConfigurationError.new('No connection information provided!') end end
Public Instance Methods
Build a new [Krakow::Connection]
@param host [String] remote host @param port [String, Integer] remote port @param queue [Queue] queue for messages @return [Krakow::Connection, nil] new connection or nil
# File lib/krakow/consumer.rb, line 131 def build_connection(host, port, queue) begin connection = Connection.new( :host => host, :port => port, :queue => queue, :topic => topic, :channel => channel, :notifier => notifier, :features => connection_options[:features], :features_args => connection_options[:config], :callbacks => { :handle => { :actor => current_actor, :method => :process_message } } ) queue.register_connection(connection) connection rescue => e error "Failed to build connection (host: #{host} port: #{port} queue: #{queue}) - #{e.class}: #{e}" debug "#{e.class}: #{e}\n#{e.backtrace.join("\n")}" nil end end
Confirm message has been processed
@param message_id [String, Krakow::FrameType::Message] @return [TrueClass] @raise [KeyError] connection not found
# File lib/krakow/consumer.rb, line 286 def confirm(message_id) message_id = message_id.message_id if message_id.respond_to?(:message_id) begin begin connection = distribution.in_flight_lookup(message_id) connection.transmit(Command::Fin.new(:message_id => message_id)) distribution.success(connection.identifier) rescue => e abort e end true rescue KeyError => e error "Message confirmation failed: #{e}" abort e rescue Error::LookupFailed => e error "Lookup of message for confirmation failed! <Message ID: #{message_id} - Error: #{e}>" abort e rescue Error::ConnectionUnavailable => e abort e rescue Celluloid::DeadActorError abort Error::ConnectionUnavailable.new ensure con = distribution.unregister_message(message_id) update_ready!(con) if con end end
@return [TrueClass, FalseClass] currently connected to at least
one nsqd
# File lib/krakow/consumer.rb, line 69 def connected? !!connections.values.any? do |con| begin con.connected? rescue Celluloid::DeadActorError false end end end
Returns [Krakow::Connection] associated to key
@param key [Object] identifier @return [Krakow::Connection] associated connection
# File lib/krakow/consumer.rb, line 98 def connection(key) @connections[key] end
Remove connection references when connection is terminated
@param actor [Object] terminated actor @param reason [Exception] reason for termination @return [nil]
# File lib/krakow/consumer.rb, line 252 def connection_failure(actor, reason) if(reason && key = connections.key(actor)) warn "Connection failure detected. Removing connection: #{key} - #{reason}" connections.delete(key) begin distribution.remove_connection(key) rescue Error::ConnectionUnavailable, Error::ConnectionFailure warn 'Caught connection unavailability' end queue.deregister_connection(key) distribution.redistribute! direct_connect unless discovery end nil end
Instance destructor
@return [nil]
# File lib/krakow/consumer.rb, line 110 def consumer_cleanup debug 'Tearing down consumer' if(distribution && distribution.alive?) distribution.terminate end if(queue && queue.alive?) queue.terminate end connections.values.each do |con| con.terminate if con.alive? end info 'Consumer torn down' nil end
Connect to nsqd instance directly
@return [Connection]
# File lib/krakow/consumer.rb, line 82 def direct_connect debug "Connection will be established via direct connection #{host}:#{port}" connection = build_connection(host, port, queue) if(register(connection)) info "Registered new connection #{connection}" distribution.redistribute! else abort Error::ConnectionFailure.new("Failed to establish subscription at provided end point (#{host}:#{port}") end connection end
Start the discovery interval lookup
@return [nil]
# File lib/krakow/consumer.rb, line 223 def discover init! after(discovery_interval + (discovery_jitter * rand)){ discover } end
Initialize the consumer by starting lookup and adding connections
@return [nil]
# File lib/krakow/consumer.rb, line 201 def init! debug 'Running consumer `init!` connection builds' found = discovery.lookup(topic) debug "Discovery results: #{found.inspect}" connection = nil found.each do |node| debug "Processing discovery result: #{node.inspect}" key = Connection.identifier(node[:broadcast_address], node[:tcp_port], topic, channel) unless(connections[key]) connection = build_connection(node[:broadcast_address], node[:tcp_port], queue) info "Registered new connection #{connection}" if register(connection) else debug "Discovery result already registered: #{node.inspect}" end end distribution.redistribute! if connection nil end
Process a given message if required
@param message [Krakow::FrameType] @param connection [Krakow::Connection] @return [Krakow::FrameType] @note If we receive a message that is already in flight, attempt
to scrub message from wait queue. If message is found, retry distribution registration. If message is not found, assume it is currently being processed and do not allow new message to be queued
# File lib/krakow/consumer.rb, line 168 def process_message(message, connection) discard = false if(message.is_a?(FrameType::Message)) message.origin = current_actor message.connection = connection retried = false begin distribution.register_message(message, connection.identifier) rescue KeyError => e if(!retried && queue.scrub_duplicate_message(message)) retried = true retry else error "Received message is currently in flight and not in wait queue. Discarding! (#{message})" discard = true end end end discard ? nil : message end
Register connection with distribution
@param connection [Krakow::Connection] @return [TrueClass, FalseClass] true if subscription was successful
# File lib/krakow/consumer.rb, line 232 def register(connection) begin connection.init! connection.transmit(Command::Sub.new(:topic_name => topic, :channel_name => channel)) self.link connection connections[connection.identifier] = connection distribution.add_connection(connection) true rescue Error::BadResponse => e debug "Failed to establish connection: #{e.result ? e.result.error : '<No Response!>'}" connection.terminate false end end
Remove message
@param messages [Array<FrameType::Message>] @return [NilClass] @note used mainly for queue callback
# File lib/krakow/consumer.rb, line 273 def remove_message(messages) [messages].flatten.compact.each do |msg| distribution.unregister_message(msg.message_id) update_ready!(msg.connection) end nil end
Requeue message (generally due to processing failure)
@param message_id [String, Krakow::FrameType::Message] @param timeout [Numeric] @return [TrueClass]
# File lib/krakow/consumer.rb, line 319 def requeue(message_id, timeout=0) message_id = message_id.message_id if message_id.respond_to?(:message_id) distribution.in_flight_lookup(message_id) do |connection| distribution.unregister_message(message_id) connection.transmit( Command::Req.new( :message_id => message_id, :timeout => timeout ) ) distribution.failure(connection.identifier) update_ready!(connection) end true end
@return [String] stringify object
# File lib/krakow/consumer.rb, line 103 def to_s "<#{self.class.name}:#{object_id} T:#{topic} C:#{channel}>" end
Touch message (to extend timeout)
@param message_id [String, Krakow::FrameType::Message] @return [TrueClass]
# File lib/krakow/consumer.rb, line 339 def touch(message_id) message_id = message_id.message_id if message_id.respond_to?(:message_id) begin distribution.in_flight_lookup(message_id) do |connection| connection.transmit( Command::Touch.new(:message_id => message_id) ) end true rescue Error::LookupFailed => e error "Lookup of message for touch failed! <Message ID: #{message_id} - Error: #{e}>" abort e end end
Send RDY for connection based on distribution rules
@param connection [Krakow::Connection] @return [nil]
# File lib/krakow/consumer.rb, line 193 def update_ready!(connection) distribution.set_ready_for(connection) nil end