class Chore::Consumer
Base class for a Chore
Consumer
. Provides the interface that a Chore::Consumer
implementation should adhere to.
Attributes
Public Class Methods
@param [String] queue_name
Name of queue to be consumed from @param [Hash] opts
# File lib/chore/consumer.rb, line 15 def initialize(queue_name, opts={}) @queue_name = queue_name @running = true end
Causes the underlying connection for all consumers of this class to be reset. Useful for the case where the consumer is being used across a fork. Should be overriden in consumers (but is not required).
# File lib/chore/consumer.rb, line 22 def self.reset_connection! end
Public Instance Methods
Complete should mark a message as finished.
@param [String] message_id Unique ID of the message @param [Hash] receipt_handle Unique ID of the consuming transaction in non-filesystem implementations
# File lib/chore/consumer.rb, line 46 def complete(message_id, receipt_handle) raise NotImplementedError end
Consume takes a block with an arity of two. The two params are |message_id,message_body| where message_id is any object that the consumer will need to be able to act on a message later (reject, complete, etc)
@param [Block] &handler Message handler, used by the calling context (worker) to create & assigns a UnitOfWork
# File lib/chore/consumer.rb, line 30 def consume(&handler) raise NotImplementedError end
Instance of duplicate detection implementation class
@return [DuplicateDetector]
# File lib/chore/consumer.rb, line 83 def dupe_detector @dupes ||= DuplicateDetector.new({:servers => Chore.config.dedupe_servers, :dupe_on_cache_failure => false}) end
Determine whether or not we have already seen this message
@param [String] dedupe_key @param [Class] klass @param [Integer] queue_timeout
@return [TrueClass, FalseClass]
# File lib/chore/consumer.rb, line 76 def duplicate_message?(dedupe_key, klass, queue_timeout) dupe_detector.found_duplicate?(:id=>dedupe_key, :queue=>klass.to_s, :visibility_timeout=>queue_timeout) end
Returns up to n work
@param n
# File lib/chore/consumer.rb, line 65 def provide_work(n) raise NotImplementedError end
Reject should put a message back on a queue to be processed again later. It takes a message_id as returned via consume.
@param [String] message_id Unique ID of the message
# File lib/chore/consumer.rb, line 38 def reject(message_id) raise NotImplementedError end
Returns true if the Consumer
is currently running
@return [TrueClass, FalseClass]
# File lib/chore/consumer.rb, line 58 def running? @running end
Perform any shutdown behavior and stop consuming messages
# File lib/chore/consumer.rb, line 51 def stop @running = false end
Private Instance Methods
Gets messages from queue implementation and invokes the provided block over each one. Afterwards, the :on_fetch hook will be invoked per message. This block call provides data necessary for the worker (calling context) to populate a UnitOfWork
struct.
@param [Block] &handler Message handler, passed along by consume
# File lib/chore/consumer.rb, line 95 def handle_messages(&handler) raise NotImplementedError end