class Oni::Daemon
The Daemon
class takes care of retrieving work to be processed, scheduling it and dispatching it to a mapper and worker. In essence a Daemon
instance can be seen as a controller when compared with typical MVC frameworks.
This daemon starts a number of threads (5 by default) that will each perform work on their own using the corresponding mapper and worker class.
@!attribute [r] workers
@return [Array<Thread>]
Constants
- DEFAULT_THREAD_AMOUNT
The default amount of threads to start.
@return [Fixnum]
- DEFAULT_WORKER_TIMEOUT
The default amount of threads to start.
@return [Fixnum]
Attributes
Public Class Methods
Creates a new instance of the class and calls `#after_initialize` if it is defined.
# File lib/oni/daemon.rb, line 36 def initialize @workers = [] after_initialize if respond_to?(:after_initialize) end
Public Instance Methods
Called when a job has been completed, by default this method is a noop. This method is passed 2 arguments:
-
The raw input message.
-
The output of the worker (remapped by the mapper).
@param [Mixed] message The raw input message (e.g. an AWS SQS message) @param [Mixed] output The output of the worker.
# File lib/oni/daemon.rb, line 144 def complete(message, output) end
Creates a new mapper and passes it a set of arguments as defined in {Oni::Daemon#mapper_arguments}.
@return [Oni::Mapper]
# File lib/oni/daemon.rb, line 163 def create_mapper unless option(:mapper) raise ArgumentError, 'No mapper has been set in the `:mapper` option' end return option(:mapper).new end
Called whenever an error is raised in the daemon, mapper or worker. By default this method just re-raises the error.
@param [StandardError] error
# File lib/oni/daemon.rb, line 153 def error(error) raise error end
Processes the given message. Upon completion the `#complete` method is called and passed the resulting output.
@param [Mixed] message
# File lib/oni/daemon.rb, line 101 def process(message) output = run_worker(message) complete(message, output) end
Receives a message, by default this method raises an error.
@raise [NotImplementedError]
# File lib/oni/daemon.rb, line 130 def receive raise NotImplementedError, 'You must manually implement #receive' end
The main code to execute in individual threads.
If an error occurs in the receive method or processing a job the error handler is executed and the process is retried. It's the responsibility of the `error` method to determine if the process should fail only once (and fail hard) or if it should continue running.
# File lib/oni/daemon.rb, line 192 def run_thread receive do |message| process message end rescue => error error(error) retry end
Maps the input, runs the worker and then maps the output into something that the daemon can understand.
@param [Mixed] message @return [Mixed]
# File lib/oni/daemon.rb, line 114 def run_worker(message) mapper = create_mapper input = mapper.map_input(message) worker = option(:worker).new(*input) output = Timeout.timeout worker_timeout do worker.process end mapper.map_output output end
Spawns a new thread that waits for daemon input.
@return [Thread]
# File lib/oni/daemon.rb, line 176 def spawn_thread thread = Thread.new { run_thread } thread.abort_on_exception = true return thread end
Starts the daemon and waits for all threads to finish execution. This method is blocking since it will wait for all threads to finish.
If the current class has a `before_start` method defined it's called before starting the daemon.
# File lib/oni/daemon.rb, line 49 def start before_start if respond_to?(:before_start) if threads > 0 threads.times do workers << spawn_thread end workers.each(&:join) # If we don't have any threads run in non threaded mode. else run_thread end rescue => error error(error) end
Terminates all the threads and clears up the list. Note that calling this method acts much like sending a SIGKILL signal to a process: threads will be shut down immediately.
# File lib/oni/daemon.rb, line 72 def stop workers.each(&:kill) workers.clear end
Returns the amount of threads to use.
@return [Fixnum]
# File lib/oni/daemon.rb, line 82 def threads return option(:threads, DEFAULT_THREAD_AMOUNT) end
Returns the amount of threads to use.
@return [Fixnum]
# File lib/oni/daemon.rb, line 91 def worker_timeout option :worker_timeout, DEFAULT_WORKER_TIMEOUT end