module DatWorkerPool::Worker::InstanceMethods
Attributes
dwp_number[R]
Public Class Methods
new(runner, queue, number)
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 60 def initialize(runner, queue, number) @dwp_runner, @dwp_queue, @dwp_number = runner, queue, number @dwp_running = false @dwp_thread = nil end
Public Instance Methods
dwp_join(*args)
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 91 def dwp_join(*args) @dwp_thread.join(*args) if self.dwp_thread_alive? end
dwp_raise(*args)
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 95 def dwp_raise(*args) @dwp_thread.raise(*args) if self.dwp_thread_alive? end
dwp_running?()
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 75 def dwp_running? !!@dwp_running end
dwp_shutdown?()
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 79 def dwp_shutdown? !self.dwp_running? end
dwp_signal_shutdown()
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 71 def dwp_signal_shutdown @dwp_running = false end
dwp_start()
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 66 def dwp_start @dwp_running = true @dwp_thread ||= Thread.new{ dwp_work_loop } end
dwp_thread_alive?()
click to toggle source
this is needed because even if the running flag has been set to false (meaning the worker has been shutdown) the thread may still be alive because its `work` is taking a long time or its still trying to shut down
# File lib/dat-worker-pool/worker.rb, line 87 def dwp_thread_alive? !!(@dwp_thread && @dwp_thread.alive?) end
Private Instance Methods
dwp_handle_exception(exception, work_item = nil)
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 184 def dwp_handle_exception(exception, work_item = nil) begin dwp_log_exception(exception) dwp_run_callback('on_error', exception, work_item) rescue *STANDARD_ERROR_CLASSES => on_error_exception # errors while running on-error callbacks are logged but otherwise # ignored to keep the worker from crashing, ideally these should be # caught by the on-error callbacks themselves and never get here dwp_log_exception(on_error_exception) end end
dwp_log(&message_block)
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 202 def dwp_log(&message_block) @dwp_runner.worker_log(self, &message_block) end
dwp_log_exception(exception)
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 206 def dwp_log_exception(exception) dwp_log{ "#{exception.class}: #{exception.message}" } (exception.backtrace || []).each{ |l| dwp_log{ l } } end
dwp_make_available()
click to toggle source
this is a separate method so the test runner can call it individually
# File lib/dat-worker-pool/worker.rb, line 158 def dwp_make_available @dwp_runner.make_worker_available(self) dwp_run_callback 'on_available' dwp_log{ "Available" } end
dwp_run_callback(callback, *args)
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 196 def dwp_run_callback(callback, *args) (self.class.send("#{callback}_callbacks") || []).each do |callback| self.instance_exec(*args, &callback) end end
dwp_setup()
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 139 def dwp_setup dwp_log{ "Starting" } begin dwp_run_callback 'on_start' dwp_make_available rescue *STANDARD_ERROR_CLASSES => exception dwp_handle_exception(exception) Thread.current.raise exception end end
dwp_teardown()
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 172 def dwp_teardown begin dwp_make_unavailable dwp_run_callback 'on_shutdown' rescue *STANDARD_ERROR_CLASSES => exception dwp_handle_exception(exception) end dwp_log{ "Shutdown" } @dwp_running = false @dwp_thread = nil end
dwp_work(work_item)
click to toggle source
this is a separate method so the test runner can call it individually
# File lib/dat-worker-pool/worker.rb, line 165 def dwp_work(work_item) dwp_log{ "Working, item: #{work_item.inspect}" } dwp_run_callback('before_work', work_item) work!(work_item) dwp_run_callback('after_work', work_item) end
dwp_work_loop()
click to toggle source
rescue `ShutdownError` but re-raise it after calling the on-error callbacks, this ensures it causes the loop to exit
# File lib/dat-worker-pool/worker.rb, line 114 def dwp_work_loop dwp_setup while self.dwp_running? begin if !(work_item = queue.dwp_pop).nil? begin dwp_make_unavailable dwp_work(work_item) rescue ShutdownError => exception dwp_handle_exception(exception, work_item) Thread.current.raise exception rescue *STANDARD_ERROR_CLASSES => exception dwp_handle_exception(exception, work_item) ensure dwp_make_available end end rescue *STANDARD_ERROR_CLASSES => exception dwp_handle_exception(exception, work_item) end end ensure dwp_teardown end
number()
click to toggle source
Helpers
# File lib/dat-worker-pool/worker.rb, line 102 def number; @dwp_number; end
params()
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 103 def params; @dwp_runner.worker_params; end
queue()
click to toggle source
# File lib/dat-worker-pool/worker.rb, line 104 def queue; @dwp_runner.queue; end
work!(work_item)
click to toggle source
overwrite this method to add custom work logic; this has to be overwritten or the workers will not know how to handle a work item
# File lib/dat-worker-pool/worker.rb, line 108 def work!(work_item) raise NotImplementedError end