class Rabbitek::Retryer
A service to retry a failed message consuming
Public Class Methods
call(*args)
click to toggle source
# File lib/rabbitek/server/retryer.rb, line 9 def self.call(*args) new(*args).call end
new(consumer, message)
click to toggle source
# File lib/rabbitek/server/retryer.rb, line 13 def initialize(consumer, message) @consumer = consumer @message = message headers = message.properties.headers || {} dead_headers = headers.fetch('x-death', []).last || {} @retry_count = headers.fetch('x-retry-count', 0) @expiration = dead_headers.fetch('original-expiration', 1000).to_i end
Public Instance Methods
call()
click to toggle source
# File lib/rabbitek/server/retryer.rb, line 24 def call warn_log # acknowledge existing message @consumer.ack!(@message.delivery_info) if @retry_count <= 25 # Set the new expiration with an increasing factor @expiration *= 1.5 # Publish to retry queue with new expiration publish_to_retry_queue else publish_to_dead_queue end end
publish_to_dead_queue()
click to toggle source
# File lib/rabbitek/server/retryer.rb, line 60 def publish_to_dead_queue # TODO: implement dead queue end
publish_to_retry_queue()
click to toggle source
# File lib/rabbitek/server/retryer.rb, line 51 def publish_to_retry_queue @consumer.retry_or_delayed_exchange.publish( @message.raw_payload, expiration: @expiration.to_i, routing_key: @message.delivery_info.routing_key, headers: { 'x-retry-count': @retry_count + 1, 'x-dead-letter-routing-key': @message.delivery_info.routing_key } ) end
warn_log()
click to toggle source
# File lib/rabbitek/server/retryer.rb, line 41 def warn_log warn( message: 'Failure!', retry_count: @retry_count, expiration: @expiration, consumer: @consumer.class.to_s, jid: @consumer.jid ) end