class Chore::Strategy::ThreadedConsumerStrategy

Attributes

batcher[RW]

Public Class Methods

new(fetcher) click to toggle source
# File lib/chore/strategies/consumer/threaded_consumer_strategy.rb, line 11
def initialize(fetcher)
  @fetcher = fetcher
  @batcher = Batcher.new(Chore.config.batch_size)
  @batcher.callback = lambda { |batch| @fetcher.manager.assign(batch) }
  @batcher.schedule(Chore.config.batch_timeout)
  @running = true
end

Public Instance Methods

fetch() click to toggle source

Begins fetching from queues by spinning up the configured :threads_per_queue: count of threads for each queue you're consuming from. Once all threads are spun up and running, the threads are then joined.

# File lib/chore/strategies/consumer/threaded_consumer_strategy.rb, line 22
def fetch
  Chore.logger.debug "Starting up consumer strategy: #{self.class.name}"
  threads = []
  Chore.config.queues.each do |queue|
    Chore.config.threads_per_queue.times do
      if running?
        threads << start_consumer_thread(queue)
      end
    end
  end

  threads.each(&:join)
end
running?() click to toggle source

Returns whether or not the ThreadedConsumerStrategy is running or not

# File lib/chore/strategies/consumer/threaded_consumer_strategy.rb, line 48
def running?
  @running
end
stop!() click to toggle source

If the ThreadedConsumerStrategy is currently running stop! will begin signalling it to stop It will stop the batcher from forking more work, as well as set a flag which will disable it's own consuming threads once they finish with their current work.

# File lib/chore/strategies/consumer/threaded_consumer_strategy.rb, line 39
def stop!
  if running?
    Chore.logger.info "Shutting down fetcher: #{self.class.name.to_s}"
    @batcher.stop
    @running = false
  end
end

Private Instance Methods

start_consumer_thread(queue) click to toggle source

Starts a consumer thread for polling the given queue. If <tt>stop!<tt> is called, the threads will shut themsevles down.

# File lib/chore/strategies/consumer/threaded_consumer_strategy.rb, line 55
def start_consumer_thread(queue)
  t = Thread.new(queue) do |tQueue|
    begin
      consumer = Chore.config.consumer.new(tQueue)
      consumer.consume do |message_id, message_receipt_handle, queue_name, queue_timeout, body, previous_attempts|
        # Quick hack to force this thread to end it's work
        # if we're shutting down. Could be delayed due to the
        # weird sometimes-blocking nature of SQS.
        consumer.stop if !running?
        Chore.logger.debug { "Got message: #{message_id}"}

        work = UnitOfWork.new(message_id, message_receipt_handle, queue_name, queue_timeout, body, previous_attempts, consumer)
        Chore.run_hooks_for(:consumed_from_source, work)
        @batcher.add(work)
      end
    rescue Chore::TerribleMistake
      Chore.logger.error "I've made a terrible mistake... shutting down Chore"
      self.stop!
      @fetcher.manager.shutdown!
    rescue => e
      Chore.logger.error "ThreadedConsumerStrategy#consumer thread raised an exception: #{e.inspect} at #{e.backtrace}"
    end
  end
  t
end