class DatWorkerPool::Runner

Attributes

logger_proxy[R]
num_workers[R]
queue[R]
worker_class[R]
worker_params[R]

Public Class Methods

new(args) click to toggle source
# File lib/dat-worker-pool/runner.rb, line 13
def initialize(args)
  @num_workers   = args[:num_workers]
  @queue         = args[:queue]
  @worker_class  = args[:worker_class]
  @worker_params = args[:worker_params]

  @logger_proxy = if args[:logger]
    LoggerProxy.new(args[:logger])
  else
    NullLoggerProxy.new
  end

  @workers           = LockedArray.new
  @available_workers = LockedSet.new
end

Public Instance Methods

available_worker_count() click to toggle source
# File lib/dat-worker-pool/runner.rb, line 76
def available_worker_count
  @available_workers.size
end
log(&message_block) click to toggle source
# File lib/dat-worker-pool/runner.rb, line 92
def log(&message_block)
  @logger_proxy.runner_log(&message_block)
end
make_worker_available(worker) click to toggle source
# File lib/dat-worker-pool/runner.rb, line 84
def make_worker_available(worker)
  @available_workers.add(worker.object_id)
end
make_worker_unavailable(worker) click to toggle source
# File lib/dat-worker-pool/runner.rb, line 88
def make_worker_unavailable(worker)
  @available_workers.remove(worker.object_id)
end
shutdown(timeout = nil) click to toggle source

the workers should be told to shutdown before the queue because the queue shutdown will wake them up; a worker popping on a shutdown queue will always get `nil` back and will loop as fast as allowed until its shutdown flag is flipped, so shutting down the workers then the queue keeps them from looping as fast as possible; if any kind of standard error or the expected timeout error (assuming the workers take too long to shutdown) is raised, force a shutdown; this ensures we shutdown as best as possible instead of letting ruby kill the threads when the process exits; non-timeout errors will be re-raised so they can be caught and handled (or shown when the process exits)

# File lib/dat-worker-pool/runner.rb, line 49
def shutdown(timeout = nil)
  log do
    timeout_message = timeout ? "#{timeout} second(s)" : "none"
    "Shutting down worker pool (timeout: #{timeout_message})"
  end
  begin
    @workers.with_lock{ |m, ws| ws.each(&:dwp_signal_shutdown) }
    @queue.dwp_signal_shutdown
    MuchTimeout.just_optional_timeout(timeout, {
      :do => proc{
        @queue.dwp_shutdown
        wait_for_workers_to_shutdown
      },
      :on_timeout => proc{
        e = ShutdownError.new("Timed out shutting down (#{timeout} seconds).")
        force_workers_to_shutdown(e, timeout)
      }
    }) do
    end
  rescue StandardError => err
    e = ShutdownError.new("Errored while shutting down: #{err.inspect}")
    force_workers_to_shutdown(e, timeout)
    raise err
  end
  log{ "Finished shutting down" }
end
start() click to toggle source
# File lib/dat-worker-pool/runner.rb, line 33
def start
  log{ "Starting worker pool with #{@num_workers} worker(s)" }
  @queue.dwp_start
  @num_workers.times.each{ |n| build_worker(n + 1) }
end
worker_available?() click to toggle source
# File lib/dat-worker-pool/runner.rb, line 80
def worker_available?
  self.available_worker_count > 0
end
worker_log(worker, &message_block) click to toggle source
# File lib/dat-worker-pool/runner.rb, line 96
def worker_log(worker, &message_block)
  @logger_proxy.worker_log(worker, &message_block)
end
workers() click to toggle source
# File lib/dat-worker-pool/runner.rb, line 29
def workers
  @workers.values
end

Private Instance Methods

build_worker(number) click to toggle source
# File lib/dat-worker-pool/runner.rb, line 102
def build_worker(number)
  @workers.push(@worker_class.new(self, @queue, number).tap(&:dwp_start))
end
force_workers_to_shutdown(error, timeout) click to toggle source

use a while loop instead of each to join all the workers, while we are joining a worker a different worker can shutdown and remove itself from the `@workers` array; rescue when joining the workers, ruby will raise any exceptions that aren't handled by a thread when its joined, this ensures if the hard shutdown is raised and not rescued (for example, in the workers ensure), then it won't cause the forced shutdown to end prematurely

# File lib/dat-worker-pool/runner.rb, line 132
def force_workers_to_shutdown(error, timeout)
  log{ "Forcing #{@workers.size} workers to shutdown" }
  while !(worker = @workers.first).nil?
    worker.dwp_raise(error)
    begin
      worker.dwp_join
    rescue StandardError => exception
      log{ "An error occurred while waiting for worker " \
           "to shutdown ##{worker.dwp_number} (forced)" }
    rescue ShutdownError
      # these are expected (because we raised them in the thread) so they
      # don't need to be logged
    end
    remove_worker(worker)
    log{ "Worker ##{worker.dwp_number} shutdown (forced)" }
  end
end
remove_worker(worker) click to toggle source

make sure the worker has been removed from the available workers, in case it errored before it was able to make itself unavailable

# File lib/dat-worker-pool/runner.rb, line 152
def remove_worker(worker)
  self.make_worker_unavailable(worker)
  @workers.delete(worker)
end
wait_for_workers_to_shutdown() click to toggle source

use an until loop instead of each to join all the workers, while we are joining a worker a different worker can shutdown and remove itself from the `@workers` array; rescue when joining the workers, ruby will raise any exceptions that aren't handled by a thread when its joined, this allows all the workers to be joined

# File lib/dat-worker-pool/runner.rb, line 111
def wait_for_workers_to_shutdown
  log{ "Waiting for #{@workers.size} workers to shutdown" }
  while !(worker = @workers.first).nil?
    begin
      worker.dwp_join
    rescue StandardError => exception
      log{ "An error occurred while waiting for worker " \
           "to shutdown ##{worker.dwp_number}" }
    end
    remove_worker(worker)
    log{ "Worker ##{worker.dwp_number} shutdown" }
  end
end