class Barbeque::Runner

Part of barbeque-worker. Runner dequeues a message from {MessageQueue} (Amazon SQS) and dispatches it to message handler.

Public Class Methods

new(job_queue) click to toggle source
# File lib/barbeque/runner.rb, line 13
def initialize(job_queue)
  @job_queue = job_queue
end

Public Instance Methods

run() click to toggle source
# File lib/barbeque/runner.rb, line 17
def run
  keep_maximum_concurrent_executions

  message = message_queue.dequeue
  return unless message

  Barbeque::ExceptionHandler.set_message_context(message.id, message.type)
  handler = MessageHandler.const_get(message.type, false)
  handler.new(message: message, message_queue: message_queue).run
end
stop() click to toggle source
# File lib/barbeque/runner.rb, line 28
def stop
  message_queue.stop!
end

Private Instance Methods

keep_maximum_concurrent_executions() click to toggle source
# File lib/barbeque/runner.rb, line 38
def keep_maximum_concurrent_executions
  max_num = Barbeque.config.maximum_concurrent_executions
  unless max_num
    # nil means unlimited
    return
  end

  loop do
    current_num = @job_queue.job_executions.where(status: :running).count + Barbeque::JobRetry.where(status: :running).joins(job_execution: :job_queue).merge(Barbeque::JobQueue.where(id: @job_queue.id)).count
    if current_num < max_num
      return
    end
    interval = Barbeque.config.runner_wait_seconds
    Rails.logger.info("#{current_num} executions are running but maximum_concurrent_executions is configured to #{max_num}. Waiting #{interval} seconds...")
    sleep(interval)
  end
end
message_queue() click to toggle source
# File lib/barbeque/runner.rb, line 34
def message_queue
  @message_queue ||= MessageQueue.new(@job_queue)
end