class ActiveHook::Server::Worker

The Worker manages our two main processes - Queue and Retry. Each of these processes is alloted a number of threads. These threads are then forked. Each worker object maintains control of these threads through the aptly named start and shutdown methods.

Attributes

id[RW]
queue_threads[RW]
retry_threads[RW]

Public Class Methods

new(options = {}) click to toggle source
# File lib/activehook/server/worker.rb, line 11
def initialize(options = {})
  options.each { |key, value| send("#{key}=", value) }
  @pid = Process.pid
  @threads = []
  @_threads_real = []
  at_exit { shutdown }
end

Public Instance Methods

shutdown() click to toggle source

Shutsdown our worker as well as its threads.

# File lib/activehook/server/worker.rb, line 30
def shutdown
  shutdown_message
  @threads.each(&:shutdown)
  @_threads_real.each(&:exit)
end
start() click to toggle source

Starts our new worker.

# File lib/activehook/server/worker.rb, line 21
def start
  validate!
  start_message
  build_threads
  start_threads
end

Private Instance Methods

build_threads() click to toggle source

Instantiates our Queue and Retry objects based on the number of threads specified for each process type. We store these objects as an array in @threads.

# File lib/activehook/server/worker.rb, line 53
def build_threads
  @queue_threads.times { @threads << Queue.new }
  @retry_threads.times { @threads << Retry.new }
end
shutdown_message() click to toggle source

Information about the shutdown process

# File lib/activehook/server/worker.rb, line 66
def shutdown_message
  ActiveHook.log.info("* Worker #{@id} shutdown, pid: #{@pid}")
end
start_message() click to toggle source

Information about the start process

# File lib/activehook/server/worker.rb, line 60
def start_message
  ActiveHook.log.info("* Worker #{@id} started, pid: #{@pid}")
end
start_threads() click to toggle source

Forks the worker and creates the actual threads (@_threads_real) for our Queue and Retry objects. We then start them and join them to the main process.

# File lib/activehook/server/worker.rb, line 42
def start_threads
  @threads.each do |thread|
    @_threads_real << Thread.new { thread.start }
  end
  @_threads_real.map(&:join)
end
validate!() click to toggle source

Validates our data before starting the worker.

# File lib/activehook/server/worker.rb, line 72
def validate!
  raise Errors::Worker, 'Queue threads must be an Integer.' unless @queue_threads.is_a?(Integer)
  raise Errors::Worker, 'Retry threads must be an Integer.' unless @retry_threads.is_a?(Integer)
end