class Rabbitek::MessageProcessor
Single message processor
Attributes
delivery_info[R]
payload[R]
properties[R]
starter[R]
Public Class Methods
new(starter, delivery_info, properties, payload)
click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 12 def initialize(starter, delivery_info, properties, payload) @starter = starter @delivery_info = delivery_info @properties = properties @payload = payload end
Public Instance Methods
process()
click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 19 def process consumer.set_context metrics_add_processed_count hook_walker = Utils::HookWalker.new(Rabbitek.config.server_hooks) hook_walker.call!(consumer, message) { |*args| run_job(*args) } rescue StandardError => e on_message_errored(e) end
Private Instance Methods
consumer()
click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 68 def consumer @consumer ||= consumer_instance(message.delivery_info.routing_key) end
consumer_instance(routing_key)
click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 72 def consumer_instance(routing_key) Thread.current[:worker_classes] ||= {} klass = Thread.current[:worker_classes][routing_key] ||= routing_key.constantize klass.new(channel, work_queue, retry_or_delayed_queue, retry_or_delayed_exchange) rescue NameError nil # TODO: to dead queue end
message()
click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 64 def message @message ||= Message.new(delivery_info: delivery_info, properties: properties, payload: payload) end
metrics_add_errored_count()
click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 60 def metrics_add_errored_count Yabeda.rabbitek.errored_messages_count.increment(metrics_tags, by: 1) end
metrics_add_processed_count()
click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 56 def metrics_add_processed_count Yabeda.rabbitek.processed_messages_count.increment(metrics_tags, by: 1) end
on_message_errored(exception)
click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 51 def on_message_errored(exception) error(message: exception.inspect, backtrace: exception.backtrace, consumer: consumer.class, jid: consumer.jid) metrics_add_errored_count end
run_job(modified_consumer, message)
click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 35 def run_job(modified_consumer, message) if modified_consumer.opts[:batch] run_job_batched(modified_consumer, message) else modified_consumer.perform(message) modified_consumer.ack!(message.delivery_info) unless modified_consumer.opts[:manual_ack] end end
run_job_batched(modified_consumer, message)
click to toggle source
# File lib/rabbitek/server/message_processor.rb, line 44 def run_job_batched(modified_consumer, message) Batcher.new(modified_consumer).perform(message) do |batch| modified_consumer.perform(batch) modified_consumer.ack!(batch.last.delivery_info, true) unless modified_consumer.opts[:manual_ack] end end