class Shoryuken::Manager

Constants

BATCH_LIMIT
MIN_DISPATCH_INTERVAL

See github.com/phstc/shoryuken/issues/348#issuecomment-292847028

Attributes

group[R]

Public Class Methods

new(group, fetcher, polling_strategy, concurrency, executor) click to toggle source
# File lib/shoryuken/manager.rb, line 11
def initialize(group, fetcher, polling_strategy, concurrency, executor)
  @group                      = group
  @fetcher                    = fetcher
  @polling_strategy           = polling_strategy
  @max_processors             = concurrency
  @busy_processors            = Concurrent::AtomicFixnum.new(0)
  @executor                   = executor
  @running                    = Concurrent::AtomicBoolean.new(true)
  @stop_new_dispatching       = Concurrent::AtomicBoolean.new(false)
  @dispatching_release_signal = ::Queue.new
end

Public Instance Methods

await_dispatching_in_progress() click to toggle source
# File lib/shoryuken/manager.rb, line 32
def await_dispatching_in_progress
  # There might still be a dispatching on-going, as the response from SQS could take some time
  # We don't want to stop the process before processing incoming messages, as they would stay "in-flight" for some time on SQS
  # We use a queue, as the dispatch_loop is running on another thread, and this is a efficient way of communicating between threads.
  @dispatching_release_signal.pop
end
running?() click to toggle source
# File lib/shoryuken/manager.rb, line 39
def running?
  @running.true? && @executor.running?
end
start() click to toggle source
# File lib/shoryuken/manager.rb, line 23
def start
  fire_utilization_update_event
  dispatch_loop
end
stop_new_dispatching() click to toggle source
# File lib/shoryuken/manager.rb, line 28
def stop_new_dispatching
  @stop_new_dispatching.make_true
end

Private Instance Methods

assign(queue_name, sqs_msg) click to toggle source
# File lib/shoryuken/manager.rb, line 91
def assign(queue_name, sqs_msg)
  return unless running?

  logger.debug { "Assigning #{sqs_msg.message_id}" }

  @busy_processors.increment
  fire_utilization_update_event

  Concurrent::Promise
    .execute(executor: @executor) { Processor.process(queue_name, sqs_msg) }
    .then { processor_done(queue_name) }
    .rescue { processor_done(queue_name) }
end
batched_queue?(queue) click to toggle source
# File lib/shoryuken/manager.rb, line 117
def batched_queue?(queue)
  Shoryuken.worker_registry.batch_receive_messages?(queue.name)
end
busy() click to toggle source
# File lib/shoryuken/manager.rb, line 72
def busy
  @busy_processors.value
end
dispatch() click to toggle source
# File lib/shoryuken/manager.rb, line 54
def dispatch
  return unless running?

  if ready <= 0 || (queue = @polling_strategy.next_queue).nil?
    return sleep(MIN_DISPATCH_INTERVAL)
  end

  fire_event(:dispatch, false, queue_name: queue.name)

  logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }

  batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
rescue => e
  handle_dispatch_error(e)
ensure
  dispatch_loop
end
dispatch_batch(queue) click to toggle source
# File lib/shoryuken/manager.rb, line 105
def dispatch_batch(queue)
  batch = @fetcher.fetch(queue, BATCH_LIMIT)
  @polling_strategy.messages_found(queue.name, batch.size)
  assign(queue.name, patch_batch!(batch)) if batch.any?
end
dispatch_loop() click to toggle source
# File lib/shoryuken/manager.rb, line 45
def dispatch_loop
  if @stop_new_dispatching.true? || !running?
    @dispatching_release_signal << 1
    return
  end

  @executor.post { dispatch }
end
dispatch_single_messages(queue) click to toggle source
# File lib/shoryuken/manager.rb, line 111
def dispatch_single_messages(queue)
  messages = @fetcher.fetch(queue, ready)
  @polling_strategy.messages_found(queue.name, messages.size)
  messages.each { |message| assign(queue.name, message) }
end
fire_utilization_update_event() click to toggle source
# File lib/shoryuken/manager.rb, line 140
def fire_utilization_update_event
  fire_event :utilization_update, false, {
    group: @group,
    max_processors: @max_processors,
    busy_processors: busy
  }
end
handle_dispatch_error(ex) click to toggle source
# File lib/shoryuken/manager.rb, line 131
def handle_dispatch_error(ex)
  logger.error { "Manager failed: #{ex.message}" }
  logger.error { ex.backtrace.join("\n") } unless ex.backtrace.nil?

  Process.kill('USR1', Process.pid)

  @running.make_false
end
message_id() click to toggle source
# File lib/shoryuken/manager.rb, line 123
def message_id
  "batch-with-#{size}-messages"
end
patch_batch!(sqs_msgs) click to toggle source
# File lib/shoryuken/manager.rb, line 121
def patch_batch!(sqs_msgs)
  sqs_msgs.instance_eval do
    def message_id
      "batch-with-#{size}-messages"
    end
  end

  sqs_msgs
end
processor_done(queue) click to toggle source
# File lib/shoryuken/manager.rb, line 80
def processor_done(queue)
  @busy_processors.decrement
  fire_utilization_update_event

  client_queue = Shoryuken::Client.queues(queue)
  return unless client_queue.fifo?
  return unless @polling_strategy.respond_to?(:message_processed)

  @polling_strategy.message_processed(queue)
end
ready() click to toggle source
# File lib/shoryuken/manager.rb, line 76
def ready
  @max_processors - busy
end