class RabbitFeed::ConsumerConnection
Constants
- QUEUE_OPTIONS
- SEVEN_DAYS_IN_MS
- SUBSCRIPTION_OPTIONS
Attributes
queue[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
RabbitFeed::Connection::new
# File lib/rabbit_feed/consumer_connection.rb, line 20 def initialize super channel.prefetch(1) @queue = channel.queue RabbitFeed.configuration.queue, queue_options RabbitFeed.log.info { { event: :queue_declared, queue: RabbitFeed.configuration.queue, options: queue_options } } bind_on_accepted_routes end
Public Instance Methods
consume(&block)
click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 28 def consume(&block) raise 'This connection already has a consumer subscribed' if connection_in_use? synchronized do begin RabbitFeed.log.info { { event: :subscribe_to_queue, queue: RabbitFeed.configuration.queue } } consumer = queue.subscribe(SUBSCRIPTION_OPTIONS) do |delivery_info, _properties, payload| handle_message delivery_info, payload, &block end sleep # Sleep indefinitely, as the consumer runs in its own thread rescue SystemExit, SignalException RabbitFeed.log.info { { event: :unsubscribe_from_queue, queue: RabbitFeed.configuration.queue } } ensure (cancel_consumer consumer) if consumer.present? end end end
purge_queue()
click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 51 def purge_queue @queue.purge end
queue_depth()
click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 47 def queue_depth @queue.message_count end
Private Instance Methods
acknowledge(delivery_info)
click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 77 def acknowledge(delivery_info) queue.channel.ack(delivery_info.delivery_tag) RabbitFeed.log.debug { { event: :acknowledge, delivery_tag: delivery_info.delivery_tag } } end
bind_on_accepted_routes()
click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 65 def bind_on_accepted_routes if RabbitFeed::Consumer.event_routing.present? RabbitFeed::Consumer.event_routing.accepted_routes.each do |accepted_route| queue.bind(RabbitFeed.configuration.exchange, routing_key: accepted_route) RabbitFeed.log.info { { event: :queue_bound, queue: RabbitFeed.configuration.queue, exchange: RabbitFeed.configuration.exchange, routing_key: accepted_route } } end else queue.bind(RabbitFeed.configuration.exchange) RabbitFeed.log.info { { event: :queue_bound, queue: RabbitFeed.configuration.queue, exchange: RabbitFeed.configuration.exchange } } end end
cancel_consumer(consumer)
click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 93 def cancel_consumer(consumer) cancel_ok = consumer.cancel RabbitFeed.log.debug { { event: :consumer_cancelled, status: cancel_ok, queue: RabbitFeed.configuration.queue } } end
handle_message(delivery_info, payload) { |payload| ... }
click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 82 def handle_message(delivery_info, payload) RabbitFeed.log.debug { { event: :handling_message, delivery_tag: delivery_info.delivery_tag } } begin yield payload acknowledge delivery_info rescue => e handle_processing_exception delivery_info, e exit(1) if RabbitFeed.configuration.consumer_exit_after_fail end end
handle_processing_exception(delivery_info, exception)
click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 105 def handle_processing_exception(delivery_info, exception) negative_acknowledge delivery_info RabbitFeed.log.error { { event: :processing_exception, delivery_tag: delivery_info.delivery_tag, message: exception.message, backtrace: exception.backtrace.join(',') } } RabbitFeed.exception_notify exception end
negative_acknowledge(delivery_info)
click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 98 def negative_acknowledge(delivery_info) # Tell rabbit that we were unable to process the message # This will re-queue the message queue.channel.nack(delivery_info.delivery_tag, false, true) RabbitFeed.log.debug { { event: :negative_acknowledge, delivery_tag: delivery_info.delivery_tag } } end
queue_options()
click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 59 def queue_options { auto_delete: RabbitFeed.configuration.auto_delete_queue }.merge QUEUE_OPTIONS end