class Toro::Manager
Attributes
busy[R]
ready[R]
Public Class Methods
new(options={})
click to toggle source
# File lib/toro/manager.rb, line 10 def initialize(options={}) defaults = { concurrency: 1, queues: [Toro.options[:default_queue]], } options = defaults.merge(options) @queues = options[:queues] @threads = {} @ready = options[:concurrency].times.map do processor = Processor.new_link(current_actor) processor.proxy_id = processor.object_id processor end @busy = [] @is_done = false @fetcher = Fetcher.new({ manager: current_actor, queues: options[:queues] }) @listener = Listener.new({ queues: @queues, fetcher: @fetcher, manager: current_actor }) end
Public Instance Methods
assign(job)
click to toggle source
# File lib/toro/manager.rb, line 52 def assign(job) raise 'No processors ready' if !is_ready? processor = @ready.pop @busy << processor processor.async.process(job) end
clean_up_for_graceful_shutdown()
click to toggle source
# File lib/toro/manager.rb, line 69 def clean_up_for_graceful_shutdown if @busy.empty? shutdown return true end after(Toro.options[:graceful_shutdown_time]) { clean_up_for_graceful_shutdown } false end
dispatch()
click to toggle source
# File lib/toro/manager.rb, line 63 def dispatch raise "No processors, cannot continue!" if @ready.empty? && @busy.empty? raise "No ready processor!?" if @ready.empty? @fetcher.async.fetch end
hard_shutdown_in(delay)
click to toggle source
# File lib/toro/manager.rb, line 79 def hard_shutdown_in(delay) Toro.logger.info "Pausing up to #{delay} seconds to allow workers to finish..." after(delay) do # We've reached the timeout and we still have busy processors. # They must die but their messages shall live on. Toro.logger.warn "Terminating #{@busy.size} busy worker threads" requeue @busy.each do |processor| if processor.alive? && thread = @threads.delete(processor.object_id) thread.raise Shutdown end end signal_shutdown end end
heartbeat()
click to toggle source
# File lib/toro/manager.rb, line 119 def heartbeat return if stopped? after(5) do heartbeat end end
is_ready?()
click to toggle source
# File lib/toro/manager.rb, line 59 def is_ready? !@ready.empty? end
processor_complete(processor)
click to toggle source
# File lib/toro/manager.rb, line 127 def processor_complete(processor) @threads.delete(processor.object_id) @busy.delete(processor) if stopped? processor.terminate if processor.alive? shutdown if @busy.empty? else @ready << processor if processor.alive? dispatch end end
requeue()
click to toggle source
# File lib/toro/manager.rb, line 104 def requeue Toro::Database.with_connection do Job.where(status: 'running', started_by: Toro.process_identity). update_all(status: 'queued', started_by: nil, started_at: nil) end end
set_thread(proxy_id, thread)
click to toggle source
# File lib/toro/manager.rb, line 115 def set_thread(proxy_id, thread) @threads[proxy_id] = thread end
shutdown()
click to toggle source
# File lib/toro/manager.rb, line 99 def shutdown requeue signal_shutdown end
signal_shutdown()
click to toggle source
# File lib/toro/manager.rb, line 111 def signal_shutdown after(0) { signal(:shutdown) } end
start()
click to toggle source
# File lib/toro/manager.rb, line 29 def start @is_done = false @listener.async.start @ready.each { dispatch } heartbeat end
stop()
click to toggle source
# File lib/toro/manager.rb, line 36 def stop @is_done = true Toro.logger.debug "Shutting down #{@ready.size} quiet workers" @ready.each { |processor| processor.terminate if processor.alive? } @ready.clear @fetcher.terminate if @fetcher.alive? if @listener.alive? actors[:listener].stop if actors[:listener] @listener.terminate end return if clean_up_for_graceful_shutdown hard_shutdown_in(Toro.options[:hard_shutdown_time]) end
stopped?()
click to toggle source
# File lib/toro/manager.rb, line 139 def stopped? @is_done end