class SneakersRetry::Handlers::Maxretry2
Maxretry uses dead letter policies on Rabbitmq to requeue and retry messages after failure (rejections, errors and timeouts). When the maximum number of retries is reached it will put the message on an error queue. This handler will only retry at the queue level. To accomplish that, the setup is a bit complex.
Input:
worker_exchange (eXchange) worker_queue (Queue)
We create:
worker_queue-retry - (X) where we setup the worker queue to dead-letter. worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to worker_queue-retry-requeue. worker_queue-error - (X) where to send max-retry failures worker_queue-error - (Q) bound to worker_queue-error. worker_queue-retry-requeue - (X) exchange to bind worker_queue to for requeuing directly to the worker_queue.
This requires that you setup arguments to the worker queue to line up the dead letter queue. See the example for more information.
Many of these can be override with options:
-
retry_exchange - sets retry exchange & queue
-
retry_error_exchange - sets error exchange and queue
-
retry_requeue_exchange - sets the exchange created to re-queue things back to the worker queue.
Public Class Methods
# File lib/sneakers-retry/handlers/maxretry2.rb, line 35 def initialize(channel, queue, opts) puts "################################" @worker_queue_name = queue.name Sneakers.logger.debug do "#{log_prefix} creating handler, opts=#{opts}" end @channel = channel @opts = opts # Construct names, defaulting where suitable retry_name = @opts[:retry_exchange] || "#{@worker_queue_name}-retry" error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error" requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue" retry_routing_key = @opts[:retry_routing_key] || "#" # Create the exchanges @retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name| Sneakers.logger.debug { "#{log_prefix} creating exchange=#{name}" } @channel.exchange(name, :type => 'topic', :durable => exchange_durable?) end # Create the queues and bindings Sneakers.logger.debug do "#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}" end @retry_queue = @channel.queue(retry_name, :durable => queue_durable?, :arguments => { :'x-dead-letter-exchange' => requeue_name, :'x-message-ttl' => @opts[:retry_timeout] || 60000 }) @retry_queue.bind(@retry_exchange, :routing_key => '#') Sneakers.logger.debug do "#{log_prefix} creating queue=#{error_name}" end @error_queue = @channel.queue(error_name, :durable => queue_durable?) @error_queue.bind(@error_exchange, :routing_key => '#') # Finally, bind the worker queue to our requeue exchange queue.bind(@requeue_exchange, :routing_key => retry_routing_key) @max_retries = @opts[:retry_max_times] || 5 end
Public Instance Methods
# File lib/sneakers-retry/handlers/maxretry2.rb, line 85 def acknowledge(hdr, props, msg) @channel.acknowledge(hdr.delivery_tag, false) end
# File lib/sneakers-retry/handlers/maxretry2.rb, line 100 def error(hdr, props, msg, err) handle_retry(hdr, props, msg, err) end
# File lib/sneakers-retry/handlers/maxretry2.rb, line 108 def noop(hdr, props, msg) end
# File lib/sneakers-retry/handlers/maxretry2.rb, line 89 def reject(hdr, props, msg, requeue = false) if requeue # This was explicitly rejected specifying it be requeued so we do not # want it to pass through our retry logic. @channel.reject(hdr.delivery_tag, requeue) else handle_retry(hdr, props, msg, :reject) end end
# File lib/sneakers-retry/handlers/maxretry2.rb, line 104 def timeout(hdr, props, msg) handle_retry(hdr, props, msg, :timeout) end
Private Instance Methods
# File lib/sneakers-retry/handlers/maxretry2.rb, line 201 def exchange_durable? queue_durable? end
Uses the x-death header to determine the number of failures this job has seen in the past. This does not count the current failure. So for instance, the first time the job fails, this will return 0, the second time, 1, etc. @param headers [Hash] Hash of headers that Rabbit delivers as part of
the message
@return [Integer] Count of number of failures.
# File lib/sneakers-retry/handlers/maxretry2.rb, line 170 def failure_count(headers) if headers.nil? || headers['x-death'].nil? 0 else x_death_array = headers['x-death'].select do |x_death| x_death['queue'] == @worker_queue_name end if x_death_array.count > 0 && x_death_array.first['count'] # Newer versions of RabbitMQ return headers with a count key x_death_array.inject(0) {|sum, x_death| sum + x_death['count']} else # Older versions return a separate x-death header for each failure x_death_array.count end end end
Helper logic for retry handling. This will reject the message if there are remaining retries left on it, otherwise it will publish it to the error exchange along with the reason. @param hdr [Bunny::DeliveryInfo] @param props [Bunny::MessageProperties] @param msg [String] The message @param reason [String, Symbol, Exception] Reason for the retry, included
in the JSON we put on the error exchange.
# File lib/sneakers-retry/handlers/maxretry2.rb, line 120 def handle_retry(hdr, props, msg, reason) # +1 for the current attempt num_attempts = failure_count(props[:headers]) + 1 if num_attempts <= @max_retries # We call reject which will route the message to the # x-dead-letter-exchange (ie. retry exchange) on the queue Sneakers.logger.info do "#{log_prefix} msg=retrying, count=#{num_attempts}, headers=#{props[:headers]}" end @channel.reject(hdr.delivery_tag, false) # TODO: metrics else # Retried more than the max times # Publish the original message with the routing_key to the error exchange Sneakers.logger.info do "#{log_prefix} msg=failing, retry_count=#{num_attempts}, reason=#{reason}" end data = { error: reason.to_s, num_attempts: num_attempts, failed_at: Time.now.iso8601, properties: props.to_hash }.tap do |hash| if reason.is_a?(Exception) hash[:error_class] = reason.class.to_s hash[:error_message] = "#{reason}" if reason.backtrace hash[:backtrace] = reason.backtrace.take(10) end end end.to_json @error_exchange.publish(msg, { routing_key: hdr.routing_key, headers: { retry_info: data } }) @channel.acknowledge(hdr.delivery_tag, false) # TODO: metrics end end
Prefix all of our log messages so they are easier to find. We don't have the worker, so the next best thing is the queue name.
# File lib/sneakers-retry/handlers/maxretry2.rb, line 190 def log_prefix "Maxretry handler [queue=#{@worker_queue_name}]" end
# File lib/sneakers-retry/handlers/maxretry2.rb, line 197 def queue_durable? @opts.fetch(:queue_options, {}).fetch(:durable, false) end