class Rabbitek::MessageProcessor

Single message processor

Attributes

delivery_info[R]
payload[R]
properties[R]
starter[R]

Public Class Methods

new(starter, delivery_info, properties, payload) click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 12
def initialize(starter, delivery_info, properties, payload)
  @starter = starter
  @delivery_info = delivery_info
  @properties = properties
  @payload = payload
end

Public Instance Methods

process() click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 19
def process
  consumer.set_context

  metrics_add_processed_count

  hook_walker = Utils::HookWalker.new(Rabbitek.config.server_hooks)
  hook_walker.call!(consumer, message) { |*args| run_job(*args) }
rescue StandardError => e
  on_message_errored(e)
end

Private Instance Methods

consumer() click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 68
def consumer
  @consumer ||= consumer_instance(message.delivery_info.routing_key)
end
consumer_instance(routing_key) click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 72
def consumer_instance(routing_key)
  Thread.current[:worker_classes] ||= {}
  klass = Thread.current[:worker_classes][routing_key] ||= routing_key.constantize
  klass.new(channel, work_queue, retry_or_delayed_queue, retry_or_delayed_exchange)
rescue NameError
  nil # TODO: to dead queue
end
message() click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 64
def message
  @message ||= Message.new(delivery_info: delivery_info, properties: properties, payload: payload)
end
metrics_add_errored_count() click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 60
def metrics_add_errored_count
  Yabeda.rabbitek.errored_messages_count.increment(metrics_tags, by: 1)
end
metrics_add_processed_count() click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 56
def metrics_add_processed_count
  Yabeda.rabbitek.processed_messages_count.increment(metrics_tags, by: 1)
end
metrics_tags() click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 80
def metrics_tags
  { consumer: consumer.class }
end
on_message_errored(exception) click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 51
def on_message_errored(exception)
  error(message: exception.inspect, backtrace: exception.backtrace, consumer: consumer.class, jid: consumer.jid)
  metrics_add_errored_count
end
run_job(modified_consumer, message) click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 35
def run_job(modified_consumer, message)
  if modified_consumer.opts[:batch]
    run_job_batched(modified_consumer, message)
  else
    modified_consumer.perform(message)
    modified_consumer.ack!(message.delivery_info) unless modified_consumer.opts[:manual_ack]
  end
end
run_job_batched(modified_consumer, message) click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 44
def run_job_batched(modified_consumer, message)
  Batcher.new(modified_consumer).perform(message) do |batch|
    modified_consumer.perform(batch)
    modified_consumer.ack!(batch.last.delivery_info, true) unless modified_consumer.opts[:manual_ack]
  end
end