class Shoryuken::Manager
Constants
- BATCH_LIMIT
- MIN_DISPATCH_INTERVAL
See github.com/phstc/shoryuken/issues/348#issuecomment-292847028
Attributes
group[R]
Public Class Methods
new(group, fetcher, polling_strategy, concurrency, executor)
click to toggle source
# File lib/shoryuken/manager.rb, line 11 def initialize(group, fetcher, polling_strategy, concurrency, executor) @group = group @fetcher = fetcher @polling_strategy = polling_strategy @max_processors = concurrency @busy_processors = Concurrent::AtomicFixnum.new(0) @executor = executor @running = Concurrent::AtomicBoolean.new(true) @stop_new_dispatching = Concurrent::AtomicBoolean.new(false) @dispatching_release_signal = ::Queue.new end
Public Instance Methods
await_dispatching_in_progress()
click to toggle source
# File lib/shoryuken/manager.rb, line 32 def await_dispatching_in_progress # There might still be a dispatching on-going, as the response from SQS could take some time # We don't want to stop the process before processing incoming messages, as they would stay "in-flight" for some time on SQS # We use a queue, as the dispatch_loop is running on another thread, and this is a efficient way of communicating between threads. @dispatching_release_signal.pop end
running?()
click to toggle source
# File lib/shoryuken/manager.rb, line 39 def running? @running.true? && @executor.running? end
start()
click to toggle source
# File lib/shoryuken/manager.rb, line 23 def start fire_utilization_update_event dispatch_loop end
stop_new_dispatching()
click to toggle source
# File lib/shoryuken/manager.rb, line 28 def stop_new_dispatching @stop_new_dispatching.make_true end
Private Instance Methods
assign(queue_name, sqs_msg)
click to toggle source
# File lib/shoryuken/manager.rb, line 91 def assign(queue_name, sqs_msg) return unless running? logger.debug { "Assigning #{sqs_msg.message_id}" } @busy_processors.increment fire_utilization_update_event Concurrent::Promise .execute(executor: @executor) { Processor.process(queue_name, sqs_msg) } .then { processor_done(queue_name) } .rescue { processor_done(queue_name) } end
batched_queue?(queue)
click to toggle source
# File lib/shoryuken/manager.rb, line 117 def batched_queue?(queue) Shoryuken.worker_registry.batch_receive_messages?(queue.name) end
busy()
click to toggle source
# File lib/shoryuken/manager.rb, line 72 def busy @busy_processors.value end
dispatch()
click to toggle source
# File lib/shoryuken/manager.rb, line 54 def dispatch return unless running? if ready <= 0 || (queue = @polling_strategy.next_queue).nil? return sleep(MIN_DISPATCH_INTERVAL) end fire_event(:dispatch, false, queue_name: queue.name) logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" } batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) rescue => e handle_dispatch_error(e) ensure dispatch_loop end
dispatch_batch(queue)
click to toggle source
# File lib/shoryuken/manager.rb, line 105 def dispatch_batch(queue) batch = @fetcher.fetch(queue, BATCH_LIMIT) @polling_strategy.messages_found(queue.name, batch.size) assign(queue.name, patch_batch!(batch)) if batch.any? end
dispatch_loop()
click to toggle source
# File lib/shoryuken/manager.rb, line 45 def dispatch_loop if @stop_new_dispatching.true? || !running? @dispatching_release_signal << 1 return end @executor.post { dispatch } end
dispatch_single_messages(queue)
click to toggle source
# File lib/shoryuken/manager.rb, line 111 def dispatch_single_messages(queue) messages = @fetcher.fetch(queue, ready) @polling_strategy.messages_found(queue.name, messages.size) messages.each { |message| assign(queue.name, message) } end
fire_utilization_update_event()
click to toggle source
# File lib/shoryuken/manager.rb, line 140 def fire_utilization_update_event fire_event :utilization_update, false, { group: @group, max_processors: @max_processors, busy_processors: busy } end
handle_dispatch_error(ex)
click to toggle source
# File lib/shoryuken/manager.rb, line 131 def handle_dispatch_error(ex) logger.error { "Manager failed: #{ex.message}" } logger.error { ex.backtrace.join("\n") } unless ex.backtrace.nil? Process.kill('USR1', Process.pid) @running.make_false end
message_id()
click to toggle source
# File lib/shoryuken/manager.rb, line 123 def message_id "batch-with-#{size}-messages" end
patch_batch!(sqs_msgs)
click to toggle source
# File lib/shoryuken/manager.rb, line 121 def patch_batch!(sqs_msgs) sqs_msgs.instance_eval do def message_id "batch-with-#{size}-messages" end end sqs_msgs end
processor_done(queue)
click to toggle source
# File lib/shoryuken/manager.rb, line 80 def processor_done(queue) @busy_processors.decrement fire_utilization_update_event client_queue = Shoryuken::Client.queues(queue) return unless client_queue.fifo? return unless @polling_strategy.respond_to?(:message_processed) @polling_strategy.message_processed(queue) end
ready()
click to toggle source
# File lib/shoryuken/manager.rb, line 76 def ready @max_processors - busy end