class Warren::Fox
A fox is a rabbitMQ consumer. It handles subscription to the queue and passing message on to the registered Subscriber
Constants
- FOX
A little cute fox emoji to easily flag output from the consumers
- MAX_RECONNECT_DELAY
Maximum wait time between database retries: 5 minutes
Attributes
Public Class Methods
Creates a fox, a RabbitMQ consumer. Subscribes to the queues defined in `subscription` and passes messages on to the subscriber
@param name [String] The name of the consumer @param subscription [Warren::Subscription] Describes the queue to subscribe to @param adaptor [#recovered?,#handle,#env] An adaptor to handle framework specifics @param subscribed_class [Warren::Subscriber::Base] The class to process received messages @param delayed [Warren::DelayExchange] The details handling delayed message broadcast
# File lib/warren/fox.rb, line 36 def initialize(name:, subscription:, adaptor:, subscribed_class:, delayed:) @consumer_tag = "#{adaptor.env}_#{name}_#{Process.pid}" @subscription = subscription @delayed = delayed @logger = Warren::LogTagger.new(logger: adaptor.logger, tag: "#{FOX} #{@consumer_tag}") @adaptor = adaptor @subscribed_class = subscribed_class @state = :initialized end
Public Instance Methods
If the fox is paused, and a recovery attempt is scheduled, will prompt the framework adaptor to attempt to recover. (Such as reconnecting to the database). If this operation is successful will resubscribe to the queue, otherwise a further recovery attempt will be scheduled. Successive recovery attempts will be gradually further apart, up to the MAX_RECONNECT_DELAY
of 5 minutes.
# File lib/warren/fox.rb, line 100 def attempt_recovery return unless paused? && recovery_due? warn { "Attempting recovery: #{@recovery_attempts}" } if recovered? running! subscribe! else @recovery_attempts += 1 @recover_at = Time.now + delay_for_attempt end end
Temporarily unsubscribes the consumer, and schedules an attempted recovery. Recovery is triggered by the {#attempt_recovery} method which gets called periodically by {Warren::Client}
@return [Void]
# File lib/warren/fox.rb, line 85 def pause! return unless running? unsubscribe! @recovery_attempts = 0 @recover_at = Time.now paused! end
Starts up the fox, automatically registering the configured queues and bindings before subscribing to the queue.
@return [Void]
# File lib/warren/fox.rb, line 55 def run! starting! subscription.activate! # Set up the queues delayed.activate! running! # Transition to running state subscribe! # Subscribe to the queue info { 'Started consumer' } end
Stop the consumer and unsubscribes from the queue. Blocks until fully unsubscribed.
@return [Void]
# File lib/warren/fox.rb, line 70 def stop! info { 'Stopping consumer' } stopping! unsubscribe! info { 'Stopped consumer' } stopped! end
Private Instance Methods
# File lib/warren/fox.rb, line 132 def delay_for_attempt [2**@recovery_attempts, MAX_RECONNECT_DELAY].min end
# File lib/warren/fox.rb, line 157 def log_message(payload) debug { 'Started message process' } debug { payload } yield ensure debug { 'Finished message process' } end
# File lib/warren/fox.rb, line 144 def process(delivery_info, properties, payload) log_message(payload) do message = @subscribed_class.new(self, delivery_info, properties, payload) @adaptor.handle { message._process_ } rescue Warren::Exceptions::TemporaryIssue => e warn { "Temporary Issue: #{e.message}" } pause! message.requeue(e) rescue StandardError => e message.dead_letter(e) end end
# File lib/warren/fox.rb, line 140 def recovered? @adaptor.recovered? end
# File lib/warren/fox.rb, line 136 def recovery_due? Time.now > @recover_at end
Our consumer operates in another thread. It is non blocking.
# File lib/warren/fox.rb, line 116 def subscribe! raise StandardError, 'Consumer already exists' unless @consumer.nil? @consumer = @subscription.subscribe(@consumer_tag) do |delivery_info, properties, payload| process(delivery_info, properties, payload) end end
Cancels the consumer and un-registers it
# File lib/warren/fox.rb, line 125 def unsubscribe! info { 'Unsubscribing' } @consumer&.cancel @consumer = nil info { 'Unsubscribed' } end