class EventQ::QueueWorker
Attributes
is_running[RW]
worker_adapter[R]
worker_status[R]
Public Class Methods
new()
click to toggle source
# File lib/eventq/queue_worker.rb, line 10 def initialize @worker_status = EventQ::WorkerStatus.new @is_running = false @last_gc_flush = Time.now @gc_flush_interval = 10 end
Public Instance Methods
call_on_error_block(error:, message: nil)
click to toggle source
# File lib/eventq/queue_worker.rb, line 259 def call_on_error_block(error:, message: nil) call_block(:on_error_block, error, message) end
call_on_retry_block(message)
click to toggle source
# File lib/eventq/queue_worker.rb, line 267 def call_on_retry_block(message) call_block(:on_retry_block, message) end
call_on_retry_exceeded_block(message)
click to toggle source
# File lib/eventq/queue_worker.rb, line 263 def call_on_retry_exceeded_block(message) call_block(:on_retry_exceeded_block, message) end
configure(queue, options = {})
click to toggle source
# File lib/eventq/queue_worker.rb, line 207 def configure(queue, options = {}) # default thread count @thread_count = 1 if options.key?(:thread_count) @thread_count = options[:thread_count] if options[:thread_count] > 0 end # default sleep time in seconds @sleep = 0 if options.key?(:sleep) EventQ.logger.warn("[#{self.class}] - :sleep is deprecated.") end @fork_count = 0 if options.key?(:fork_count) @fork_count = options[:fork_count] end if options.key?(:gc_flush_interval) @gc_flush_interval = options[:gc_flush_interval] end # The default is to block the process where the worker starts. # You may not want it to block if an application needs to run multiple things at the same time. # Example: Running a background worker and a web service on the same application. @block_process = true if options.key?(:block_process) @block_process = options[:block_process] end message_list = [ "Process Count: #{@fork_count}", "Thread Count: #{@thread_count}", "Interval Sleep: #{@sleep}", "GC Flush Interval: #{@gc_flush_interval}", "Block process: #{@block_process}" ] EventQ.logger.info("[#{self.class}] - Configuring. #{message_list.join(' | ')}") end
deserialize_message(payload)
click to toggle source
# File lib/eventq/queue_worker.rb, line 186 def deserialize_message(payload) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) provider.deserialize(payload) end
gc_flush()
click to toggle source
# File lib/eventq/queue_worker.rb, line 196 def gc_flush if Time.now - last_gc_flush > @gc_flush_interval GC.start @last_gc_flush = Time.now end end
last_gc_flush()
click to toggle source
# File lib/eventq/queue_worker.rb, line 203 def last_gc_flush @last_gc_flush end
on_error(&block)
click to toggle source
# File lib/eventq/queue_worker.rb, line 255 def on_error(&block) @on_error_block = block end
on_retry(&block)
click to toggle source
# File lib/eventq/queue_worker.rb, line 251 def on_retry(&block) @on_retry_block = block end
on_retry_exceeded(&block)
click to toggle source
# File lib/eventq/queue_worker.rb, line 247 def on_retry_exceeded(&block) @on_retry_exceeded_block = block end
process_message(block, message, retry_attempts, acceptance_args)
click to toggle source
Method to be called by an adapter. This defines the common logic for processing a message. @param [Array] acceptance_args list of arguments that would be used to accept a message by an adapter. @return [Symbol, MessageArgs] :accepted, :duplicate, :reject
# File lib/eventq/queue_worker.rb, line 114 def process_message(block, message, retry_attempts, acceptance_args) abort = false error = false status = nil message_args = EventQ::MessageArgs.new( type: message.type, retry_attempts: retry_attempts, context: message.context, content_type: message.content_type, id: message.id, sent: message.created ) EventQ.logger.debug("[#{self.class}] - Message received. Id: #{message.id}. Retry Attempts: #{retry_attempts}") if (!EventQ::NonceManager.is_allowed?(message.id)) EventQ.logger.warn("[#{self.class}] - Duplicate Message received. Id: #{message.id}. Ignoring message.") status = :duplicate return status, message_args end # begin worker block for queue message begin block.call(message.content, message_args) if message_args.abort == true abort = true EventQ.logger.debug("[#{self.class}] - Message aborted. Id: #{message.id}.") else # accept the message as processed status = :accepted worker_adapter.acknowledge_message(*acceptance_args) EventQ.logger.debug("[#{self.class}] - Message acknowledged. Id: #{message.id}.") end rescue => e EventQ.logger.error do "[#{self.class}] - Unhandled error while attempting to process a queue message. Id: #{message.id}. " \ "Error: #{e.message} #{e.backtrace.join("\n")}" end error = true call_on_error_block(error: e, message: message) end if error || abort EventQ::NonceManager.failed(message.id) status = :reject else EventQ::NonceManager.complete(message.id) end [status, message_args] end
running?()
click to toggle source
# File lib/eventq/queue_worker.rb, line 182 def running? @is_running end
serialize_message(msg)
click to toggle source
# File lib/eventq/queue_worker.rb, line 191 def serialize_message(msg) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) provider.serialize(msg) end
start(queue, options = {}, &block)
click to toggle source
# File lib/eventq/queue_worker.rb, line 17 def start(queue, options = {}, &block) EventQ.logger.info("[#{self.class}] - Preparing to start listening for messages.") # Make sure mandatory options are specified mandatory = [:worker_adapter, :client] missing = mandatory - options.keys raise "[#{self.class}] - Missing options. #{missing} must be specified." unless missing.empty? @worker_adapter = options[:worker_adapter] worker_adapter.context = self raise "[#{self.class}] - Worker is already running." if running? configure(queue, options) worker_adapter.configure(options) queue_name = EventQ.create_queue_name(queue) EventQ.logger.info("[#{self.class}] - Listening for messages on queue: #{queue_name}}") # Allow the worker to be started on a thread or on the main process. # Using the thread won't block the parent process, whereas starting on the main process will. if @block_process start_worker(block, options, queue) else Thread.new { start_worker(block, options, queue) } end @is_running = true end
start_process(options, queue, block)
click to toggle source
# File lib/eventq/queue_worker.rb, line 66 def start_process(options, queue, block) %w'INT TERM'.each do |sig| Signal.trap(sig) { stop exit } end # need to set it again since we might be in a fork. @is_running = true tracker = track_process(Process.pid) # Execute any specific adapter worker logic before the threads are launched. # This could range from setting instance variables, extra options, etc. worker_adapter.pre_process(self, options) if @thread_count > 0 @thread_count.times do thr = Thread.new do start_thread(queue, options, block) end # Allow the thread to kill the parent process if an error occurs thr.abort_on_exception = true track_thread(tracker, thr) end else start_thread(queue, options, block) end # Only on the main process should you be able to not wait on a thread, otherwise # any forked process will just immediately quit unless options[:wait] == false && options[:fork_count] == 0 worker_status.threads.each { |thr| thr.thread.join } end end
start_thread(queue, options, block)
click to toggle source
# File lib/eventq/queue_worker.rb, line 103 def start_thread(queue, options, block) worker_adapter.thread_process_iteration(queue, options, block) rescue Exception => e # rubocop:disable Lint/RescueException EventQ.logger.error(e) call_on_error_block(error: e, message: e.message) raise Exceptions::WorkerThreadError, e.message, e.backtrace end
start_worker(block, options, queue)
click to toggle source
# File lib/eventq/queue_worker.rb, line 46 def start_worker(block, options, queue) if @fork_count > 0 @fork_count.times do pid = fork do start_process(options, queue, block) end # For the parent worker to know about the list of PIDS of the forks, we have to track them after the fork # is created. In a fork the collection would be copied and there is no shared reference between processes. # So each fork gets its own copy of the @worker_status variable. track_process(pid) end Process.waitall else # No need to track process/threads separately as we are in the main parent process, # and the logic inside start_process will handle it correctly. start_process(options, queue, block) end end
stop()
click to toggle source
# File lib/eventq/queue_worker.rb, line 169 def stop EventQ.logger.info("[#{self.class}] - Stopping.") @is_running = false # Need to notify all processes(forks) to stop as well. worker_status.pids.each do |pid| begin Process.kill('TERM', pid) if Process.pid != pid rescue Errno::ESRCH # Continue on stopping if the process already died and can't be found. end end end
Private Instance Methods
call_block(block_name, *args)
click to toggle source
# File lib/eventq/queue_worker.rb, line 273 def call_block(block_name, *args) block_variable = "@#{block_name}" if instance_variable_get(block_variable) EventQ.logger.debug { "[#{self.class}] - Executing #{block_variable}." } begin instance_variable_get(block_variable).call(*args) rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the #{block_variable}. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No #{block_variable} specified." } end end
track_process(pid)
click to toggle source
# File lib/eventq/queue_worker.rb, line 287 def track_process(pid) tracker = EventQ::WorkerProcess.new(pid) worker_status.processes.push(tracker) tracker end
track_thread(process_tracker, thread)
click to toggle source
# File lib/eventq/queue_worker.rb, line 293 def track_thread(process_tracker, thread) tracker = EventQ::WorkerThread.new(thread) process_tracker.threads.push(tracker) tracker end