class TopologicalInventory::Providers::Common::Operations::AsyncWorker

Attributes

metrics[R]
processor[R]
queue[R]
thread[R]

Public Class Methods

new(processor, queue: nil, metrics: nil) click to toggle source
# File lib/topological_inventory/providers/common/operations/async_worker.rb, line 13
def initialize(processor, queue: nil, metrics: nil)
  @processor = processor
  @queue     = queue || Queue.new
  @metrics   = metrics
end

Public Instance Methods

enqueue(msg) click to toggle source
# File lib/topological_inventory/providers/common/operations/async_worker.rb, line 29
def enqueue(msg)
  queue << msg
end
listen() click to toggle source
# File lib/topological_inventory/providers/common/operations/async_worker.rb, line 33
def listen
  loop do
    # the queue thread waits for a message to come during `Queue#pop`
    msg = queue.pop
    process_message(msg)
  end
end
start() click to toggle source
# File lib/topological_inventory/providers/common/operations/async_worker.rb, line 19
def start
  return if thread.present?

  @thread = Thread.new { listen }
end
stop() click to toggle source
# File lib/topological_inventory/providers/common/operations/async_worker.rb, line 25
def stop
  thread&.exit
end

Private Instance Methods

process_message(message) click to toggle source
# File lib/topological_inventory/providers/common/operations/async_worker.rb, line 45
def process_message(message)
  result = processor.process!(message, metrics)
  metrics&.record_operation(message.message, :status => result)
rescue => err
  model, method = message.message.to_s.split(".")
  logger.error("#{model}##{method}: async worker failure: #{err.cause}\n#{err}\n#{err.backtrace.join("\n")}")
  metrics&.record_operation(message.message, :status => operation_status[:error])
ensure
  message.ack
  TopologicalInventory::Providers::Common::Operations::HealthCheck.touch_file
  logger.debug("Operations::AsyncWorker queue length: #{queue.length}") if queue.length >= 20 && queue.length % 5 == 0
end