class Bifrost::Manager
This class is used to handle the setup and execution of multiple listeners
Public Class Methods
Initialisation of the Manager
sets up a subscriber, which informs the manager when the worker is ready to begin work
# File lib/bifrost/manager.rb, line 18 def initialize subscribe('worker_ready', :worker_ready) end
Public Instance Methods
A supervised worker can be added to the current collection of supervised workers this also starts the actor
# File lib/bifrost/manager.rb, line 24 def add(topic, subscriber, proc, options = {}) if topic.nil? || subscriber.nil? || proc.nil? raise InvalidWorkerDefinitionError, 'Invalid worker' else Worker.supervise(as: Worker.handle(topic, subscriber), args: [topic, subscriber, proc, append_default_options(options)]) end end
When we run all the workers as actors in their own threads. This run also blocks to make sure the spawned threads remain operational indefinitely
# File lib/bifrost/manager.rb, line 34 def run # Put the supervisor thread to sleep indefinitely loop do # TODO: Perhaps there is a better way? sleep(60) end end
This callback is fired when the worker signals it is ready to commence work after initialisation or recommence after recovering from a failure. When a worker completes initialisation it can take a while for the worker to be registered as an Actor in Celluloid, for this reason we need need to put a minor delay in the initialisation procedure
# File lib/bifrost/manager.rb, line 46 def worker_ready(*args) info("Worker bootstrapping with #{args}...") sleep(ENV['BIFROST_BOOTSTRAP_DELAY'] || 2) # TODO: Perhaps there is a better way? worker = get_worker(args[1], args[2]) if worker # Link the worker to the supervisor so if the worker misbehaves the supervisor is alerted # to this poor behaviour, the supervisor decides how to handle recovery link(worker) worker.async.run else error("Worker bootstrap failure with #{args}") end end
Private Instance Methods
Create default options hash if custom options do not exit
# File lib/bifrost/manager.rb, line 64 def append_default_options(custom_options) options = { non_repeatable: false } options.merge(custom_options) end
Retrieve a worker through the supervisory structure, this can take a while as the worker might be going through a restart procedure by the actor framework
# File lib/bifrost/manager.rb, line 71 def get_worker(topic, subscriber) handle = Worker.handle(topic, subscriber) Celluloid::Actor[handle.to_sym] end
This callback function fires when an worker dies
# File lib/bifrost/manager.rb, line 77 def worker_died(worker, reason) error("Worker #{worker.inspect} has died: #{reason.class}") end