class EventHub::ActorListener
Listner Class
Public Class Methods
new(processor_instance)
click to toggle source
# File lib/eventhub/actor_listener.rb, line 9 def initialize(processor_instance) @actor_publisher = ActorPublisher.new_link @actor_watchdog = ActorWatchdog.new_link @connections = {} @processor_instance = processor_instance start end
Public Instance Methods
cleanup()
click to toggle source
# File lib/eventhub/actor_listener.rb, line 111 def cleanup EventHub.logger.info("Listener is cleaning up...") # close all open connections return unless @connections @connections.values.each do |connection| connection&.close end end
handle_payload(args = {})
click to toggle source
# File lib/eventhub/actor_listener.rb, line 72 def handle_payload(args = {}) response_messages = [] connection = args[:connection] # convert to EventHub message message = EventHub::Message.from_json(args[:payload]) # append to execution history message.append_to_execution_history(EventHub::Configuration.name) # return invalid messages to dispatcher if message.invalid? response_messages << message EventHub.logger.info("-> #{message} => return invalid to dispatcher") else begin response_messages = @processor_instance.send(:handle_message, message, pass_arguments(args)) rescue => exception # this catches unexpected exceptions in handle message method # deadletter the message via dispatcher message.status_code = EventHub::STATUS_DEADLETTER message.status_message = exception.to_s EventHub.logger.info("-> #{message} => return exception to dispatcher") response_messages << message end end Array(response_messages).each do |message| publish(message: message.to_json, connection: connection) end end
listen(args = {})
click to toggle source
# File lib/eventhub/actor_listener.rb, line 28 def listen(args = {}) with_listen(args) do |connection, channel, consumer, queue, queue_name| EventHub.logger.info("Listening to queue [#{queue_name}]") consumer.on_delivery do |delivery_info, metadata, payload| EventHub.logger.info("#{queue_name}: [#{delivery_info.delivery_tag}]"\ " delivery") @processor_instance.statistics.measure(payload.size) do handle_payload(payload: payload, connection: connection, queue_name: queue_name, content_type: metadata[:content_type], priority: metadata[:priority], delivery_tag: delivery_info.delivery_tag) channel.acknowledge(delivery_info.delivery_tag, false) end EventHub.logger.info("#{queue_name}: [#{delivery_info.delivery_tag}]"\ " acknowledged") end queue.subscribe_with(consumer, block: false) end rescue => error EventHub.logger.error("Unexpected exception: #{error}. It should restart now with this exception...") raise end
pass_arguments(args = {})
click to toggle source
# File lib/eventhub/actor_listener.rb, line 106 def pass_arguments(args = {}) keys_to_pass = [:queue_name, :content_type, :priority, :delivery_tag] args.select { |key| keys_to_pass.include?(key) } end
publish(args)
click to toggle source
# File lib/eventhub/actor_listener.rb, line 120 def publish(args) @actor_publisher.publish(args) end
restart()
click to toggle source
# File lib/eventhub/actor_listener.rb, line 24 def restart raise "Listener is restarting..." end
start()
click to toggle source
# File lib/eventhub/actor_listener.rb, line 17 def start EventHub.logger.info("Listener is starting...") EventHub::Configuration.processor[:listener_queues].each_with_index do |queue_name, index| async.listen(queue_name: queue_name, index: index) end end
with_listen(args = {}) { |connection, channel, consumer, queue, queue_name| ... }
click to toggle source
# File lib/eventhub/actor_listener.rb, line 55 def with_listen(args = {}, &block) connection = create_bunny_connection connection.start queue_name = args[:queue_name] @connections[queue_name] = connection channel = connection.create_channel channel.prefetch(1) queue = channel.queue(queue_name, durable: true) consumer = EventHub::Consumer.new(channel, queue, EventHub::Configuration.name + "-" + args[:index].to_s, false) yield connection, channel, consumer, queue, queue_name end