class Shoryuken::Manager

Constants

BATCH_LIMIT
MIN_DISPATCH_INTERVAL

See github.com/phstc/shoryuken/issues/348#issuecomment-292847028

Attributes

group[R]

Public Class Methods

new(group, fetcher, polling_strategy, concurrency, executor) click to toggle source
# File lib/shoryuken/manager.rb, line 11
def initialize(group, fetcher, polling_strategy, concurrency, executor)
  @group            = group
  @fetcher          = fetcher
  @polling_strategy = polling_strategy
  @max_processors   = concurrency
  @busy_processors  = Concurrent::AtomicFixnum.new(0)
  @executor         = executor
  @running          = Concurrent::AtomicBoolean.new(true)
end

Public Instance Methods

running?() click to toggle source
# File lib/shoryuken/manager.rb, line 26
def running?
  @running.true? && @executor.running?
end
start() click to toggle source
# File lib/shoryuken/manager.rb, line 21
def start
  fire_utilization_update_event
  dispatch_loop
end

Private Instance Methods

assign(queue_name, sqs_msg) click to toggle source
# File lib/shoryuken/manager.rb, line 75
def assign(queue_name, sqs_msg)
  return unless running?

  logger.debug { "Assigning #{sqs_msg.message_id}" }

  @busy_processors.increment
  fire_utilization_update_event

  Concurrent::Promise
    .execute(executor: @executor) { Processor.process(queue_name, sqs_msg) }
    .then { processor_done(queue_name) }
    .rescue { processor_done(queue_name) }
end
batched_queue?(queue) click to toggle source
# File lib/shoryuken/manager.rb, line 102
def batched_queue?(queue)
  Shoryuken.worker_registry.batch_receive_messages?(queue.name)
end
busy() click to toggle source
# File lib/shoryuken/manager.rb, line 56
def busy
  @busy_processors.value
end
dispatch() click to toggle source
# File lib/shoryuken/manager.rb, line 38
def dispatch
  return unless running?

  if ready <= 0 || (queue = @polling_strategy.next_queue).nil?
    return sleep(MIN_DISPATCH_INTERVAL)
  end

  fire_event(:dispatch, false, queue_name: queue.name)

  logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }

  batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
rescue => e
  handle_dispatch_error(e)
ensure
  dispatch_loop
end
dispatch_batch(queue) click to toggle source
# File lib/shoryuken/manager.rb, line 89
def dispatch_batch(queue)
  batch = @fetcher.fetch(queue, BATCH_LIMIT)
  @polling_strategy.messages_found(queue.name, batch.size)
  assign(queue.name, patch_batch!(batch)) if batch.any?
end
dispatch_loop() click to toggle source
# File lib/shoryuken/manager.rb, line 32
def dispatch_loop
  return unless running?

  @executor.post { dispatch }
end
dispatch_single_messages(queue) click to toggle source
# File lib/shoryuken/manager.rb, line 95
def dispatch_single_messages(queue)
  messages = @fetcher.fetch(queue, ready)

  @polling_strategy.messages_found(queue.name, messages.size)
  messages.each { |message| assign(queue.name, message) }
end
fire_utilization_update_event() click to toggle source
# File lib/shoryuken/manager.rb, line 125
def fire_utilization_update_event
  fire_event :utilization_update, false, {
    group: @group,
    max_processors: @max_processors,
    busy_processors: busy
  }
end
handle_dispatch_error(ex) click to toggle source
# File lib/shoryuken/manager.rb, line 116
def handle_dispatch_error(ex)
  logger.error { "Manager failed: #{ex.message}" }
  logger.error { ex.backtrace.join("\n") } unless ex.backtrace.nil?

  Process.kill('USR1', Process.pid)

  @running.make_false
end
message_id() click to toggle source
# File lib/shoryuken/manager.rb, line 108
def message_id
  "batch-with-#{size}-messages"
end
patch_batch!(sqs_msgs) click to toggle source
# File lib/shoryuken/manager.rb, line 106
def patch_batch!(sqs_msgs)
  sqs_msgs.instance_eval do
    def message_id
      "batch-with-#{size}-messages"
    end
  end

  sqs_msgs
end
processor_done(queue) click to toggle source
# File lib/shoryuken/manager.rb, line 64
def processor_done(queue)
  @busy_processors.decrement
  fire_utilization_update_event

  client_queue = Shoryuken::Client.queues(queue)
  return unless client_queue.fifo?
  return unless @polling_strategy.respond_to?(:message_processed)

  @polling_strategy.message_processed(queue)
end
ready() click to toggle source
# File lib/shoryuken/manager.rb, line 60
def ready
  @max_processors - busy
end