class Shoryuken::Manager
Constants
- BATCH_LIMIT
- MIN_DISPATCH_INTERVAL
See github.com/phstc/shoryuken/issues/348#issuecomment-292847028
Attributes
group[R]
Public Class Methods
new(group, fetcher, polling_strategy, concurrency, executor)
click to toggle source
# File lib/shoryuken/manager.rb, line 11 def initialize(group, fetcher, polling_strategy, concurrency, executor) @group = group @fetcher = fetcher @polling_strategy = polling_strategy @max_processors = concurrency @busy_processors = Concurrent::AtomicFixnum.new(0) @executor = executor @running = Concurrent::AtomicBoolean.new(true) end
Public Instance Methods
running?()
click to toggle source
# File lib/shoryuken/manager.rb, line 26 def running? @running.true? && @executor.running? end
start()
click to toggle source
# File lib/shoryuken/manager.rb, line 21 def start fire_utilization_update_event dispatch_loop end
Private Instance Methods
assign(queue_name, sqs_msg)
click to toggle source
# File lib/shoryuken/manager.rb, line 75 def assign(queue_name, sqs_msg) return unless running? logger.debug { "Assigning #{sqs_msg.message_id}" } @busy_processors.increment fire_utilization_update_event Concurrent::Promise .execute(executor: @executor) { Processor.process(queue_name, sqs_msg) } .then { processor_done(queue_name) } .rescue { processor_done(queue_name) } end
batched_queue?(queue)
click to toggle source
# File lib/shoryuken/manager.rb, line 102 def batched_queue?(queue) Shoryuken.worker_registry.batch_receive_messages?(queue.name) end
busy()
click to toggle source
# File lib/shoryuken/manager.rb, line 56 def busy @busy_processors.value end
dispatch()
click to toggle source
# File lib/shoryuken/manager.rb, line 38 def dispatch return unless running? if ready <= 0 || (queue = @polling_strategy.next_queue).nil? return sleep(MIN_DISPATCH_INTERVAL) end fire_event(:dispatch, false, queue_name: queue.name) logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" } batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) rescue => e handle_dispatch_error(e) ensure dispatch_loop end
dispatch_batch(queue)
click to toggle source
# File lib/shoryuken/manager.rb, line 89 def dispatch_batch(queue) batch = @fetcher.fetch(queue, BATCH_LIMIT) @polling_strategy.messages_found(queue.name, batch.size) assign(queue.name, patch_batch!(batch)) if batch.any? end
dispatch_loop()
click to toggle source
# File lib/shoryuken/manager.rb, line 32 def dispatch_loop return unless running? @executor.post { dispatch } end
dispatch_single_messages(queue)
click to toggle source
# File lib/shoryuken/manager.rb, line 95 def dispatch_single_messages(queue) messages = @fetcher.fetch(queue, ready) @polling_strategy.messages_found(queue.name, messages.size) messages.each { |message| assign(queue.name, message) } end
fire_utilization_update_event()
click to toggle source
# File lib/shoryuken/manager.rb, line 125 def fire_utilization_update_event fire_event :utilization_update, false, { group: @group, max_processors: @max_processors, busy_processors: busy } end
handle_dispatch_error(ex)
click to toggle source
# File lib/shoryuken/manager.rb, line 116 def handle_dispatch_error(ex) logger.error { "Manager failed: #{ex.message}" } logger.error { ex.backtrace.join("\n") } unless ex.backtrace.nil? Process.kill('USR1', Process.pid) @running.make_false end
message_id()
click to toggle source
# File lib/shoryuken/manager.rb, line 108 def message_id "batch-with-#{size}-messages" end
patch_batch!(sqs_msgs)
click to toggle source
# File lib/shoryuken/manager.rb, line 106 def patch_batch!(sqs_msgs) sqs_msgs.instance_eval do def message_id "batch-with-#{size}-messages" end end sqs_msgs end
processor_done(queue)
click to toggle source
# File lib/shoryuken/manager.rb, line 64 def processor_done(queue) @busy_processors.decrement fire_utilization_update_event client_queue = Shoryuken::Client.queues(queue) return unless client_queue.fifo? return unless @polling_strategy.respond_to?(:message_processed) @polling_strategy.message_processed(queue) end
ready()
click to toggle source
# File lib/shoryuken/manager.rb, line 60 def ready @max_processors - busy end