class Rabbitek::Retryer

A service to retry a failed message consuming

Public Class Methods

call(*args) click to toggle source
# File lib/rabbitek/server/retryer.rb, line 9
def self.call(*args)
  new(*args).call
end
new(consumer, message) click to toggle source
# File lib/rabbitek/server/retryer.rb, line 13
def initialize(consumer, message)
  @consumer = consumer
  @message = message

  headers = message.properties.headers || {}
  dead_headers = headers.fetch('x-death', []).last || {}

  @retry_count = headers.fetch('x-retry-count', 0)
  @expiration = dead_headers.fetch('original-expiration', 1000).to_i
end

Public Instance Methods

call() click to toggle source
# File lib/rabbitek/server/retryer.rb, line 24
def call
  warn_log

  # acknowledge existing message
  @consumer.ack!(@message.delivery_info)

  if @retry_count <= 25
    # Set the new expiration with an increasing factor
    @expiration *= 1.5

    # Publish to retry queue with new expiration
    publish_to_retry_queue
  else
    publish_to_dead_queue
  end
end
publish_to_dead_queue() click to toggle source
# File lib/rabbitek/server/retryer.rb, line 60
def publish_to_dead_queue
  # TODO: implement dead queue
end
publish_to_retry_queue() click to toggle source
# File lib/rabbitek/server/retryer.rb, line 51
def publish_to_retry_queue
  @consumer.retry_or_delayed_exchange.publish(
    @message.raw_payload,
    expiration: @expiration.to_i,
    routing_key: @message.delivery_info.routing_key,
    headers: { 'x-retry-count': @retry_count + 1, 'x-dead-letter-routing-key': @message.delivery_info.routing_key }
  )
end
warn_log() click to toggle source
# File lib/rabbitek/server/retryer.rb, line 41
def warn_log
  warn(
    message: 'Failure!',
    retry_count: @retry_count,
    expiration: @expiration,
    consumer: @consumer.class.to_s,
    jid: @consumer.jid
  )
end