class RabbitFeed::ConsumerConnection

Constants

QUEUE_OPTIONS
SEVEN_DAYS_IN_MS
SUBSCRIPTION_OPTIONS

Attributes

queue[R]

Public Class Methods

new() click to toggle source
Calls superclass method RabbitFeed::Connection::new
# File lib/rabbit_feed/consumer_connection.rb, line 20
def initialize
  super
  channel.prefetch(1)
  @queue = channel.queue RabbitFeed.configuration.queue, queue_options
  RabbitFeed.log.info { { event: :queue_declared, queue: RabbitFeed.configuration.queue, options: queue_options } }
  bind_on_accepted_routes
end

Public Instance Methods

consume(&block) click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 28
def consume(&block)
  raise 'This connection already has a consumer subscribed' if connection_in_use?
  synchronized do
    begin
      RabbitFeed.log.info { { event: :subscribe_to_queue, queue: RabbitFeed.configuration.queue } }

      consumer = queue.subscribe(SUBSCRIPTION_OPTIONS) do |delivery_info, _properties, payload|
        handle_message delivery_info, payload, &block
      end

      sleep # Sleep indefinitely, as the consumer runs in its own thread
    rescue SystemExit, SignalException
      RabbitFeed.log.info { { event: :unsubscribe_from_queue, queue: RabbitFeed.configuration.queue } }
    ensure
      (cancel_consumer consumer) if consumer.present?
    end
  end
end
purge_queue() click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 51
def purge_queue
  @queue.purge
end
queue_depth() click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 47
def queue_depth
  @queue.message_count
end

Private Instance Methods

acknowledge(delivery_info) click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 77
def acknowledge(delivery_info)
  queue.channel.ack(delivery_info.delivery_tag)
  RabbitFeed.log.debug { { event: :acknowledge, delivery_tag: delivery_info.delivery_tag } }
end
bind_on_accepted_routes() click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 65
def bind_on_accepted_routes
  if RabbitFeed::Consumer.event_routing.present?
    RabbitFeed::Consumer.event_routing.accepted_routes.each do |accepted_route|
      queue.bind(RabbitFeed.configuration.exchange, routing_key: accepted_route)
      RabbitFeed.log.info { { event: :queue_bound, queue: RabbitFeed.configuration.queue, exchange: RabbitFeed.configuration.exchange, routing_key: accepted_route } }
    end
  else
    queue.bind(RabbitFeed.configuration.exchange)
    RabbitFeed.log.info { { event: :queue_bound, queue: RabbitFeed.configuration.queue, exchange: RabbitFeed.configuration.exchange } }
  end
end
cancel_consumer(consumer) click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 93
def cancel_consumer(consumer)
  cancel_ok = consumer.cancel
  RabbitFeed.log.debug { { event: :consumer_cancelled, status: cancel_ok, queue: RabbitFeed.configuration.queue } }
end
handle_message(delivery_info, payload) { |payload| ... } click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 82
def handle_message(delivery_info, payload)
  RabbitFeed.log.debug { { event: :handling_message, delivery_tag: delivery_info.delivery_tag } }
  begin
    yield payload
    acknowledge delivery_info
  rescue => e
    handle_processing_exception delivery_info, e
    exit(1) if RabbitFeed.configuration.consumer_exit_after_fail
  end
end
handle_processing_exception(delivery_info, exception) click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 105
def handle_processing_exception(delivery_info, exception)
  negative_acknowledge delivery_info
  RabbitFeed.log.error { { event: :processing_exception, delivery_tag: delivery_info.delivery_tag, message: exception.message, backtrace: exception.backtrace.join(',') } }
  RabbitFeed.exception_notify exception
end
negative_acknowledge(delivery_info) click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 98
def negative_acknowledge(delivery_info)
  # Tell rabbit that we were unable to process the message
  # This will re-queue the message
  queue.channel.nack(delivery_info.delivery_tag, false, true)
  RabbitFeed.log.debug { { event: :negative_acknowledge, delivery_tag: delivery_info.delivery_tag } }
end
queue_options() click to toggle source
# File lib/rabbit_feed/consumer_connection.rb, line 59
def queue_options
  {
    auto_delete: RabbitFeed.configuration.auto_delete_queue
  }.merge QUEUE_OPTIONS
end