class Oni::Daemon

The Daemon class takes care of retrieving work to be processed, scheduling it and dispatching it to a mapper and worker. In essence a Daemon instance can be seen as a controller when compared with typical MVC frameworks.

This daemon starts a number of threads (5 by default) that will each perform work on their own using the corresponding mapper and worker class.

@!attribute [r] workers

@return [Array<Thread>]

Constants

DEFAULT_THREAD_AMOUNT

The default amount of threads to start.

@return [Fixnum]

DEFAULT_WORKER_TIMEOUT

The default amount of threads to start.

@return [Fixnum]

Attributes

workers[R]

Public Class Methods

new() click to toggle source

Creates a new instance of the class and calls `#after_initialize` if it is defined.

# File lib/oni/daemon.rb, line 36
def initialize
  @workers = []

  after_initialize if respond_to?(:after_initialize)
end

Public Instance Methods

complete(message, output) click to toggle source

Called when a job has been completed, by default this method is a noop. This method is passed 2 arguments:

  1. The raw input message.

  2. The output of the worker (remapped by the mapper).

@param [Mixed] message The raw input message (e.g. an AWS SQS message) @param [Mixed] output The output of the worker.

# File lib/oni/daemon.rb, line 144
def complete(message, output)
end
create_mapper() click to toggle source

Creates a new mapper and passes it a set of arguments as defined in {Oni::Daemon#mapper_arguments}.

@return [Oni::Mapper]

# File lib/oni/daemon.rb, line 163
def create_mapper
  unless option(:mapper)
    raise ArgumentError, 'No mapper has been set in the `:mapper` option'
  end

  return option(:mapper).new
end
error(error) click to toggle source

Called whenever an error is raised in the daemon, mapper or worker. By default this method just re-raises the error.

@param [StandardError] error

# File lib/oni/daemon.rb, line 153
def error(error)
  raise error
end
process(message) click to toggle source

Processes the given message. Upon completion the `#complete` method is called and passed the resulting output.

@param [Mixed] message

# File lib/oni/daemon.rb, line 101
def process(message)
  output = run_worker(message)

  complete(message, output)
end
receive() click to toggle source

Receives a message, by default this method raises an error.

@raise [NotImplementedError]

# File lib/oni/daemon.rb, line 130
def receive
  raise NotImplementedError, 'You must manually implement #receive'
end
run_thread() click to toggle source

The main code to execute in individual threads.

If an error occurs in the receive method or processing a job the error handler is executed and the process is retried. It's the responsibility of the `error` method to determine if the process should fail only once (and fail hard) or if it should continue running.

# File lib/oni/daemon.rb, line 192
def run_thread
  receive do |message|
    process message
  end
rescue => error
  error(error)

  retry
end
run_worker(message) click to toggle source

Maps the input, runs the worker and then maps the output into something that the daemon can understand.

@param [Mixed] message @return [Mixed]

# File lib/oni/daemon.rb, line 114
def run_worker(message)
  mapper = create_mapper
  input  = mapper.map_input(message)
  worker = option(:worker).new(*input)
  output = Timeout.timeout worker_timeout do
    worker.process
  end

  mapper.map_output output
end
spawn_thread() click to toggle source

Spawns a new thread that waits for daemon input.

@return [Thread]

# File lib/oni/daemon.rb, line 176
def spawn_thread
  thread = Thread.new { run_thread }

  thread.abort_on_exception = true

  return thread
end
start() click to toggle source

Starts the daemon and waits for all threads to finish execution. This method is blocking since it will wait for all threads to finish.

If the current class has a `before_start` method defined it's called before starting the daemon.

# File lib/oni/daemon.rb, line 49
def start
  before_start if respond_to?(:before_start)

  if threads > 0
    threads.times do
      workers << spawn_thread
    end

    workers.each(&:join)

  # If we don't have any threads run in non threaded mode.
  else
    run_thread
  end
rescue => error
  error(error)
end
stop() click to toggle source

Terminates all the threads and clears up the list. Note that calling this method acts much like sending a SIGKILL signal to a process: threads will be shut down immediately.

# File lib/oni/daemon.rb, line 72
def stop
  workers.each(&:kill)
  workers.clear
end
threads() click to toggle source

Returns the amount of threads to use.

@return [Fixnum]

# File lib/oni/daemon.rb, line 82
def threads
  return option(:threads, DEFAULT_THREAD_AMOUNT)
end
worker_timeout() click to toggle source

Returns the amount of threads to use.

@return [Fixnum]

# File lib/oni/daemon.rb, line 91
def worker_timeout
  option :worker_timeout, DEFAULT_WORKER_TIMEOUT
end