class EventQ::QueueWorker

Attributes

is_running[RW]
worker_adapter[R]
worker_status[R]

Public Class Methods

new() click to toggle source
# File lib/eventq/queue_worker.rb, line 10
def initialize
  @worker_status = EventQ::WorkerStatus.new
  @is_running = false
  @last_gc_flush = Time.now
  @gc_flush_interval = 10
end

Public Instance Methods

call_on_error_block(error:, message: nil) click to toggle source
# File lib/eventq/queue_worker.rb, line 259
def call_on_error_block(error:, message: nil)
  call_block(:on_error_block, error, message)
end
call_on_retry_block(message) click to toggle source
# File lib/eventq/queue_worker.rb, line 267
def call_on_retry_block(message)
  call_block(:on_retry_block, message)
end
call_on_retry_exceeded_block(message) click to toggle source
# File lib/eventq/queue_worker.rb, line 263
def call_on_retry_exceeded_block(message)
  call_block(:on_retry_exceeded_block, message)
end
configure(queue, options = {}) click to toggle source
# File lib/eventq/queue_worker.rb, line 207
def configure(queue, options = {})
  # default thread count
  @thread_count = 1
  if options.key?(:thread_count)
    @thread_count = options[:thread_count] if options[:thread_count] > 0
  end

  # default sleep time in seconds
  @sleep = 0
  if options.key?(:sleep)
    EventQ.logger.warn("[#{self.class}] - :sleep is deprecated.")
  end

  @fork_count = 0
  if options.key?(:fork_count)
    @fork_count = options[:fork_count]
  end

  if options.key?(:gc_flush_interval)
    @gc_flush_interval = options[:gc_flush_interval]
  end

  # The default is to block the process where the worker starts.
  # You may not want it to block if an application needs to run multiple things at the same time.
  # Example:  Running a background worker and a web service on the same application.
  @block_process = true
  if options.key?(:block_process)
    @block_process = options[:block_process]
  end

  message_list = [
      "Process Count: #{@fork_count}",
      "Thread Count: #{@thread_count}",
      "Interval Sleep: #{@sleep}",
      "GC Flush Interval: #{@gc_flush_interval}",
      "Block process: #{@block_process}"
  ]
  EventQ.logger.info("[#{self.class}] - Configuring. #{message_list.join(' | ')}")
end
deserialize_message(payload) click to toggle source
# File lib/eventq/queue_worker.rb, line 186
def deserialize_message(payload)
  provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider)
  provider.deserialize(payload)
end
gc_flush() click to toggle source
# File lib/eventq/queue_worker.rb, line 196
def gc_flush
  if Time.now - last_gc_flush > @gc_flush_interval
    GC.start
    @last_gc_flush = Time.now
  end
end
last_gc_flush() click to toggle source
# File lib/eventq/queue_worker.rb, line 203
def last_gc_flush
  @last_gc_flush
end
on_error(&block) click to toggle source
# File lib/eventq/queue_worker.rb, line 255
def on_error(&block)
  @on_error_block = block
end
on_retry(&block) click to toggle source
# File lib/eventq/queue_worker.rb, line 251
def on_retry(&block)
  @on_retry_block = block
end
on_retry_exceeded(&block) click to toggle source
# File lib/eventq/queue_worker.rb, line 247
def on_retry_exceeded(&block)
  @on_retry_exceeded_block = block
end
process_message(block, message, retry_attempts, acceptance_args) click to toggle source

Method to be called by an adapter. This defines the common logic for processing a message. @param [Array] acceptance_args list of arguments that would be used to accept a message by an adapter. @return [Symbol, MessageArgs] :accepted, :duplicate, :reject

# File lib/eventq/queue_worker.rb, line 114
def process_message(block, message, retry_attempts, acceptance_args)
  abort = false
  error = false
  status = nil

  message_args = EventQ::MessageArgs.new(
      type: message.type,
      retry_attempts: retry_attempts,
      context: message.context,
      content_type: message.content_type,
      id: message.id,
      sent: message.created
  )

  EventQ.logger.debug("[#{self.class}] - Message received. Id: #{message.id}. Retry Attempts: #{retry_attempts}")

  if (!EventQ::NonceManager.is_allowed?(message.id))
    EventQ.logger.warn("[#{self.class}] - Duplicate Message received. Id: #{message.id}. Ignoring message.")
    status = :duplicate
    return status, message_args
  end

  # begin worker block for queue message
  begin
    block.call(message.content, message_args)

    if message_args.abort == true
      abort = true
      EventQ.logger.debug("[#{self.class}] - Message aborted. Id: #{message.id}.")
    else
      # accept the message as processed
      status = :accepted
      worker_adapter.acknowledge_message(*acceptance_args)
      EventQ.logger.debug("[#{self.class}] - Message acknowledged. Id: #{message.id}.")
    end
  rescue => e
    EventQ.logger.error do
      "[#{self.class}] - Unhandled error while attempting to process a queue message. Id: #{message.id}. " \
      "Error: #{e.message} #{e.backtrace.join("\n")}"
    end

    error = true
    call_on_error_block(error: e, message: message)
  end

  if error || abort
    EventQ::NonceManager.failed(message.id)
    status = :reject
  else
    EventQ::NonceManager.complete(message.id)
  end

  [status, message_args]
end
running?() click to toggle source
# File lib/eventq/queue_worker.rb, line 182
def running?
  @is_running
end
serialize_message(msg) click to toggle source
# File lib/eventq/queue_worker.rb, line 191
def serialize_message(msg)
  provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider)
  provider.serialize(msg)
end
start(queue, options = {}, &block) click to toggle source
# File lib/eventq/queue_worker.rb, line 17
def start(queue, options = {}, &block)
  EventQ.logger.info("[#{self.class}] - Preparing to start listening for messages.")

  # Make sure mandatory options are specified
  mandatory = [:worker_adapter, :client]
  missing = mandatory - options.keys
  raise "[#{self.class}] - Missing options. #{missing} must be specified." unless missing.empty?

  @worker_adapter = options[:worker_adapter]
  worker_adapter.context = self

  raise "[#{self.class}] - Worker is already running." if running?

  configure(queue, options)
  worker_adapter.configure(options)

  queue_name = EventQ.create_queue_name(queue)
  EventQ.logger.info("[#{self.class}] - Listening for messages on queue: #{queue_name}}")

  # Allow the worker to be started on a thread or on the main process.
  # Using the thread won't block the parent process, whereas starting on the main process will.
  if @block_process
    start_worker(block, options, queue)
  else
    Thread.new { start_worker(block, options, queue) }
  end
  @is_running = true
end
start_process(options, queue, block) click to toggle source
# File lib/eventq/queue_worker.rb, line 66
def start_process(options, queue, block)
  %w'INT TERM'.each do |sig|
    Signal.trap(sig) {
      stop
      exit
    }
  end

  # need to set it again since we might be in a fork.
  @is_running = true
  tracker = track_process(Process.pid)

  # Execute any specific adapter worker logic before the threads are launched.
  # This could range from setting instance variables, extra options, etc.
  worker_adapter.pre_process(self, options)

  if @thread_count > 0
    @thread_count.times do
      thr = Thread.new do
        start_thread(queue, options, block)
      end

      # Allow the thread to kill the parent process if an error occurs
      thr.abort_on_exception = true
      track_thread(tracker, thr)
    end
  else
    start_thread(queue, options, block)
  end

  # Only on the main process should you be able to not wait on a thread, otherwise
  # any forked process will just immediately quit
  unless options[:wait] == false && options[:fork_count] == 0
    worker_status.threads.each { |thr| thr.thread.join }
  end
end
start_thread(queue, options, block) click to toggle source
# File lib/eventq/queue_worker.rb, line 103
def start_thread(queue, options, block)
  worker_adapter.thread_process_iteration(queue, options, block)
rescue Exception => e # rubocop:disable Lint/RescueException
  EventQ.logger.error(e)
  call_on_error_block(error: e, message: e.message)
  raise Exceptions::WorkerThreadError, e.message, e.backtrace
end
start_worker(block, options, queue) click to toggle source
# File lib/eventq/queue_worker.rb, line 46
def start_worker(block, options, queue)
  if @fork_count > 0
    @fork_count.times do
      pid = fork do
        start_process(options, queue, block)
      end
      # For the parent worker to know about the list of PIDS of the forks, we have to track them after the fork
      # is created. In a fork the collection would be copied and there is no shared reference between processes.
      # So each fork gets its own copy of the @worker_status variable.
      track_process(pid)
    end

    Process.waitall
  else
    # No need to track process/threads separately as we are in the main parent process,
    # and the logic inside start_process will handle it correctly.
    start_process(options, queue, block)
  end
end
stop() click to toggle source
# File lib/eventq/queue_worker.rb, line 169
def stop
  EventQ.logger.info("[#{self.class}] - Stopping.")
  @is_running = false
  # Need to notify all processes(forks) to stop as well.
  worker_status.pids.each do |pid|
    begin
      Process.kill('TERM', pid) if Process.pid != pid
    rescue Errno::ESRCH
      # Continue on stopping if the process already died and can't be found.
    end
  end
end

Private Instance Methods

call_block(block_name, *args) click to toggle source
# File lib/eventq/queue_worker.rb, line 273
def call_block(block_name, *args)
  block_variable = "@#{block_name}"
  if instance_variable_get(block_variable)
    EventQ.logger.debug { "[#{self.class}] - Executing #{block_variable}." }
    begin
      instance_variable_get(block_variable).call(*args)
    rescue => e
      EventQ.logger.error("[#{self.class}] - An error occurred executing the #{block_variable}. Error: #{e}")
    end
  else
    EventQ.logger.debug { "[#{self.class}] - No #{block_variable} specified." }
  end
end
track_process(pid) click to toggle source
# File lib/eventq/queue_worker.rb, line 287
def track_process(pid)
  tracker = EventQ::WorkerProcess.new(pid)
  worker_status.processes.push(tracker)
  tracker
end
track_thread(process_tracker, thread) click to toggle source
# File lib/eventq/queue_worker.rb, line 293
def track_thread(process_tracker, thread)
  tracker = EventQ::WorkerThread.new(thread)
  process_tracker.threads.push(tracker)
  tracker
end