class TopologicalInventory::Providers::Common::Operations::AsyncWorker
Attributes
metrics[R]
processor[R]
queue[R]
thread[R]
Public Class Methods
new(processor, queue: nil, metrics: nil)
click to toggle source
# File lib/topological_inventory/providers/common/operations/async_worker.rb, line 13 def initialize(processor, queue: nil, metrics: nil) @processor = processor @queue = queue || Queue.new @metrics = metrics end
Public Instance Methods
enqueue(msg)
click to toggle source
# File lib/topological_inventory/providers/common/operations/async_worker.rb, line 29 def enqueue(msg) queue << msg end
listen()
click to toggle source
# File lib/topological_inventory/providers/common/operations/async_worker.rb, line 33 def listen loop do # the queue thread waits for a message to come during `Queue#pop` msg = queue.pop process_message(msg) end end
start()
click to toggle source
# File lib/topological_inventory/providers/common/operations/async_worker.rb, line 19 def start return if thread.present? @thread = Thread.new { listen } end
stop()
click to toggle source
# File lib/topological_inventory/providers/common/operations/async_worker.rb, line 25 def stop thread&.exit end
Private Instance Methods
process_message(message)
click to toggle source
# File lib/topological_inventory/providers/common/operations/async_worker.rb, line 45 def process_message(message) result = processor.process!(message, metrics) metrics&.record_operation(message.message, :status => result) rescue => err model, method = message.message.to_s.split(".") logger.error("#{model}##{method}: async worker failure: #{err.cause}\n#{err}\n#{err.backtrace.join("\n")}") metrics&.record_operation(message.message, :status => operation_status[:error]) ensure message.ack TopologicalInventory::Providers::Common::Operations::HealthCheck.touch_file logger.debug("Operations::AsyncWorker queue length: #{queue.length}") if queue.length >= 20 && queue.length % 5 == 0 end