module DatWorkerPool::Worker::InstanceMethods

Attributes

dwp_number[R]

Public Class Methods

new(runner, queue, number) click to toggle source
# File lib/dat-worker-pool/worker.rb, line 60
def initialize(runner, queue, number)
  @dwp_runner, @dwp_queue, @dwp_number = runner, queue, number
  @dwp_running = false
  @dwp_thread  = nil
end

Public Instance Methods

dwp_join(*args) click to toggle source
# File lib/dat-worker-pool/worker.rb, line 91
def dwp_join(*args)
  @dwp_thread.join(*args) if self.dwp_thread_alive?
end
dwp_raise(*args) click to toggle source
# File lib/dat-worker-pool/worker.rb, line 95
def dwp_raise(*args)
  @dwp_thread.raise(*args) if self.dwp_thread_alive?
end
dwp_running?() click to toggle source
# File lib/dat-worker-pool/worker.rb, line 75
def dwp_running?
  !!@dwp_running
end
dwp_shutdown?() click to toggle source
# File lib/dat-worker-pool/worker.rb, line 79
def dwp_shutdown?
  !self.dwp_running?
end
dwp_signal_shutdown() click to toggle source
# File lib/dat-worker-pool/worker.rb, line 71
def dwp_signal_shutdown
  @dwp_running = false
end
dwp_start() click to toggle source
# File lib/dat-worker-pool/worker.rb, line 66
def dwp_start
  @dwp_running = true
  @dwp_thread ||= Thread.new{ dwp_work_loop }
end
dwp_thread_alive?() click to toggle source

this is needed because even if the running flag has been set to false (meaning the worker has been shutdown) the thread may still be alive because its `work` is taking a long time or its still trying to shut down

# File lib/dat-worker-pool/worker.rb, line 87
def dwp_thread_alive?
  !!(@dwp_thread && @dwp_thread.alive?)
end

Private Instance Methods

dwp_handle_exception(exception, work_item = nil) click to toggle source
# File lib/dat-worker-pool/worker.rb, line 184
def dwp_handle_exception(exception, work_item = nil)
  begin
    dwp_log_exception(exception)
    dwp_run_callback('on_error', exception, work_item)
  rescue *STANDARD_ERROR_CLASSES => on_error_exception
    # errors while running on-error callbacks are logged but otherwise
    # ignored to keep the worker from crashing, ideally these should be
    # caught by the on-error callbacks themselves and never get here
    dwp_log_exception(on_error_exception)
  end
end
dwp_log(&message_block) click to toggle source
# File lib/dat-worker-pool/worker.rb, line 202
def dwp_log(&message_block)
  @dwp_runner.worker_log(self, &message_block)
end
dwp_log_exception(exception) click to toggle source
# File lib/dat-worker-pool/worker.rb, line 206
def dwp_log_exception(exception)
  dwp_log{ "#{exception.class}: #{exception.message}" }
  (exception.backtrace || []).each{ |l| dwp_log{ l } }
end
dwp_make_available() click to toggle source

this is a separate method so the test runner can call it individually

# File lib/dat-worker-pool/worker.rb, line 158
def dwp_make_available
  @dwp_runner.make_worker_available(self)
  dwp_run_callback 'on_available'
  dwp_log{ "Available" }
end
dwp_make_unavailable() click to toggle source

this is a separate method so the test runner can call it individually

# File lib/dat-worker-pool/worker.rb, line 151
def dwp_make_unavailable
  @dwp_runner.make_worker_unavailable(self)
  dwp_run_callback 'on_unavailable'
  dwp_log{ "Unavailable" }
end
dwp_run_callback(callback, *args) click to toggle source
# File lib/dat-worker-pool/worker.rb, line 196
def dwp_run_callback(callback, *args)
  (self.class.send("#{callback}_callbacks") || []).each do |callback|
    self.instance_exec(*args, &callback)
  end
end
dwp_setup() click to toggle source
# File lib/dat-worker-pool/worker.rb, line 139
def dwp_setup
  dwp_log{ "Starting" }
  begin
    dwp_run_callback 'on_start'
    dwp_make_available
  rescue *STANDARD_ERROR_CLASSES => exception
    dwp_handle_exception(exception)
    Thread.current.raise exception
  end
end
dwp_teardown() click to toggle source
# File lib/dat-worker-pool/worker.rb, line 172
def dwp_teardown
  begin
    dwp_make_unavailable
    dwp_run_callback 'on_shutdown'
  rescue *STANDARD_ERROR_CLASSES => exception
    dwp_handle_exception(exception)
  end
  dwp_log{ "Shutdown" }
  @dwp_running = false
  @dwp_thread  = nil
end
dwp_work(work_item) click to toggle source

this is a separate method so the test runner can call it individually

# File lib/dat-worker-pool/worker.rb, line 165
def dwp_work(work_item)
  dwp_log{ "Working, item: #{work_item.inspect}" }
  dwp_run_callback('before_work', work_item)
  work!(work_item)
  dwp_run_callback('after_work', work_item)
end
dwp_work_loop() click to toggle source

rescue `ShutdownError` but re-raise it after calling the on-error callbacks, this ensures it causes the loop to exit

# File lib/dat-worker-pool/worker.rb, line 114
def dwp_work_loop
  dwp_setup
  while self.dwp_running?
    begin
      if !(work_item = queue.dwp_pop).nil?
        begin
          dwp_make_unavailable
          dwp_work(work_item)
        rescue ShutdownError => exception
          dwp_handle_exception(exception, work_item)
          Thread.current.raise exception
        rescue *STANDARD_ERROR_CLASSES => exception
          dwp_handle_exception(exception, work_item)
        ensure
          dwp_make_available
        end
      end
    rescue *STANDARD_ERROR_CLASSES => exception
      dwp_handle_exception(exception, work_item)
    end
  end
ensure
  dwp_teardown
end
number() click to toggle source

Helpers

# File lib/dat-worker-pool/worker.rb, line 102
def number; @dwp_number;               end
params() click to toggle source
# File lib/dat-worker-pool/worker.rb, line 103
def params; @dwp_runner.worker_params; end
queue() click to toggle source
# File lib/dat-worker-pool/worker.rb, line 104
def queue;  @dwp_runner.queue;         end
work!(work_item) click to toggle source

overwrite this method to add custom work logic; this has to be overwritten or the workers will not know how to handle a work item

# File lib/dat-worker-pool/worker.rb, line 108
def work!(work_item)
  raise NotImplementedError
end