class Chore::Strategy::ThreadedConsumerStrategy
Attributes
batcher[RW]
Public Class Methods
new(fetcher)
click to toggle source
# File lib/chore/strategies/consumer/threaded_consumer_strategy.rb, line 11 def initialize(fetcher) @fetcher = fetcher @batcher = Batcher.new(Chore.config.batch_size) @batcher.callback = lambda { |batch| @fetcher.manager.assign(batch) } @batcher.schedule(Chore.config.batch_timeout) @running = true end
Public Instance Methods
fetch()
click to toggle source
Begins fetching from queues by spinning up the configured :threads_per_queue:
count of threads for each queue you're consuming from. Once all threads are spun up and running, the threads are then joined.
# File lib/chore/strategies/consumer/threaded_consumer_strategy.rb, line 22 def fetch Chore.logger.debug "Starting up consumer strategy: #{self.class.name}" threads = [] Chore.config.queues.each do |queue| Chore.config.threads_per_queue.times do if running? threads << start_consumer_thread(queue) end end end threads.each(&:join) end
running?()
click to toggle source
Returns whether or not the ThreadedConsumerStrategy
is running or not
# File lib/chore/strategies/consumer/threaded_consumer_strategy.rb, line 48 def running? @running end
stop!()
click to toggle source
If the ThreadedConsumerStrategy
is currently running stop!
will begin signalling it to stop It will stop the batcher from forking more work, as well as set a flag which will disable it's own consuming threads once they finish with their current work.
# File lib/chore/strategies/consumer/threaded_consumer_strategy.rb, line 39 def stop! if running? Chore.logger.info "Shutting down fetcher: #{self.class.name.to_s}" @batcher.stop @running = false end end
Private Instance Methods
start_consumer_thread(queue)
click to toggle source
Starts a consumer thread for polling the given queue
. If <tt>stop!<tt> is called, the threads will shut themsevles down.
# File lib/chore/strategies/consumer/threaded_consumer_strategy.rb, line 55 def start_consumer_thread(queue) t = Thread.new(queue) do |tQueue| begin consumer = Chore.config.consumer.new(tQueue) consumer.consume do |message_id, message_receipt_handle, queue_name, queue_timeout, body, previous_attempts| # Quick hack to force this thread to end it's work # if we're shutting down. Could be delayed due to the # weird sometimes-blocking nature of SQS. consumer.stop if !running? Chore.logger.debug { "Got message: #{message_id}"} work = UnitOfWork.new(message_id, message_receipt_handle, queue_name, queue_timeout, body, previous_attempts, consumer) Chore.run_hooks_for(:consumed_from_source, work) @batcher.add(work) end rescue Chore::TerribleMistake Chore.logger.error "I've made a terrible mistake... shutting down Chore" self.stop! @fetcher.manager.shutdown! rescue => e Chore.logger.error "ThreadedConsumerStrategy#consumer thread raised an exception: #{e.inspect} at #{e.backtrace}" end end t end