class Warren::Fox

A fox is a rabbitMQ consumer. It handles subscription to the queue and passing message on to the registered Subscriber

Constants

FOX

A little cute fox emoji to easily flag output from the consumers

MAX_RECONNECT_DELAY

Maximum wait time between database retries: 5 minutes

Attributes

consumer_tag[R]
delayed[R]
state[R]
subscription[R]

Public Class Methods

new(name:, subscription:, adaptor:, subscribed_class:, delayed:) click to toggle source

Creates a fox, a RabbitMQ consumer. Subscribes to the queues defined in `subscription` and passes messages on to the subscriber

@param name [String] The name of the consumer @param subscription [Warren::Subscription] Describes the queue to subscribe to @param adaptor [#recovered?,#handle,#env] An adaptor to handle framework specifics @param subscribed_class [Warren::Subscriber::Base] The class to process received messages @param delayed [Warren::DelayExchange] The details handling delayed message broadcast

# File lib/warren/fox.rb, line 36
def initialize(name:, subscription:, adaptor:, subscribed_class:, delayed:)
  @consumer_tag = "#{adaptor.env}_#{name}_#{Process.pid}"
  @subscription = subscription
  @delayed = delayed
  @logger = Warren::LogTagger.new(logger: adaptor.logger, tag: "#{FOX} #{@consumer_tag}")
  @adaptor = adaptor
  @subscribed_class = subscribed_class
  @state = :initialized
end

Public Instance Methods

attempt_recovery() click to toggle source

If the fox is paused, and a recovery attempt is scheduled, will prompt the framework adaptor to attempt to recover. (Such as reconnecting to the database). If this operation is successful will resubscribe to the queue, otherwise a further recovery attempt will be scheduled. Successive recovery attempts will be gradually further apart, up to the MAX_RECONNECT_DELAY of 5 minutes.

# File lib/warren/fox.rb, line 100
def attempt_recovery
  return unless paused? && recovery_due?

  warn { "Attempting recovery: #{@recovery_attempts}" }
  if recovered?
    running!
    subscribe!
  else
    @recovery_attempts += 1
    @recover_at = Time.now + delay_for_attempt
  end
end
pause!() click to toggle source

Temporarily unsubscribes the consumer, and schedules an attempted recovery. Recovery is triggered by the {#attempt_recovery} method which gets called periodically by {Warren::Client}

@return [Void]

# File lib/warren/fox.rb, line 85
def pause!
  return unless running?

  unsubscribe!
  @recovery_attempts = 0
  @recover_at = Time.now
  paused!
end
run!() click to toggle source

Starts up the fox, automatically registering the configured queues and bindings before subscribing to the queue.

@return [Void]

# File lib/warren/fox.rb, line 55
def run!
  starting!
  subscription.activate! # Set up the queues
  delayed.activate!
  running!            # Transition to running state
  subscribe!          # Subscribe to the queue

  info { 'Started consumer' }
end
stop!() click to toggle source

Stop the consumer and unsubscribes from the queue. Blocks until fully unsubscribed.

@return [Void]

# File lib/warren/fox.rb, line 70
def stop!
  info { 'Stopping consumer' }
  stopping!
  unsubscribe!
  info { 'Stopped consumer' }
  stopped!
end

Private Instance Methods

delay_for_attempt() click to toggle source
# File lib/warren/fox.rb, line 132
def delay_for_attempt
  [2**@recovery_attempts, MAX_RECONNECT_DELAY].min
end
log_message(payload) { || ... } click to toggle source
# File lib/warren/fox.rb, line 157
def log_message(payload)
  debug { 'Started message process' }
  debug { payload }
  yield
ensure
  debug { 'Finished message process' }
end
process(delivery_info, properties, payload) click to toggle source
# File lib/warren/fox.rb, line 144
def process(delivery_info, properties, payload)
  log_message(payload) do
    message = @subscribed_class.new(self, delivery_info, properties, payload)
    @adaptor.handle { message._process_ }
  rescue Warren::Exceptions::TemporaryIssue => e
    warn { "Temporary Issue: #{e.message}" }
    pause!
    message.requeue(e)
  rescue StandardError => e
    message.dead_letter(e)
  end
end
recovered?() click to toggle source
# File lib/warren/fox.rb, line 140
def recovered?
  @adaptor.recovered?
end
recovery_due?() click to toggle source
# File lib/warren/fox.rb, line 136
def recovery_due?
  Time.now > @recover_at
end
subscribe!() click to toggle source

Our consumer operates in another thread. It is non blocking.

# File lib/warren/fox.rb, line 116
def subscribe!
  raise StandardError, 'Consumer already exists' unless @consumer.nil?

  @consumer = @subscription.subscribe(@consumer_tag) do |delivery_info, properties, payload|
    process(delivery_info, properties, payload)
  end
end
unsubscribe!() click to toggle source

Cancels the consumer and un-registers it

# File lib/warren/fox.rb, line 125
def unsubscribe!
  info { 'Unsubscribing' }
  @consumer&.cancel
  @consumer = nil
  info { 'Unsubscribed' }
end