class Sneakers::Handlers::DelayedRetry
Public Class Methods
new(channel, queue, opts)
click to toggle source
# File lib/sneakers/handlers/delayed_retry.rb, line 6 def initialize(channel, queue, opts) @worker_queue_name = queue.name Sneakers.logger.debug do "#{log_prefix} creating handler, opts=#{opts}" end @channel = channel @opts = opts error_exchange_name = @opts[:error_exchange_name] || 'error_exchange' Sneakers.logger.debug do "#{log_prefix} creating exchange=#{error_exchange_name}" end @error_exchange = @channel.exchange( error_exchange_name, type: 'direct', durable: exchange_durable? ) error_queue_name = @opts[:error_queue_name] || "error.#{@worker_queue_name}" Sneakers.logger.debug do "#{log_prefix} creating queue=#{error_queue_name}" end error_queue = @channel.queue( error_queue_name, durable: queue_durable? ) error_queue.bind(@error_exchange, routing_key: @worker_queue_name) @max_retries = @opts[:number_of_retries] || 5 @sleep_before_retry = @opts[:sleep_before_retry] || 0 @retriable_errors = @opts[:retriable_errors] || [] @on_retry = @opts[:on_retry] || proc {} @on_error = @opts[:on_error] || proc {} end
Public Instance Methods
acknowledge(hdr, _props, _msg)
click to toggle source
# File lib/sneakers/handlers/delayed_retry.rb, line 43 def acknowledge(hdr, _props, _msg) @channel.acknowledge(hdr.delivery_tag, false) end
error(hdr, props, msg, err)
click to toggle source
# File lib/sneakers/handlers/delayed_retry.rb, line 55 def error(hdr, props, msg, err) handle_retry(hdr, props, msg, err) end
noop(hdr, props, msg)
click to toggle source
# File lib/sneakers/handlers/delayed_retry.rb, line 59 def noop(hdr, props, msg); end
reject(hdr, props, msg, requeue = false)
click to toggle source
# File lib/sneakers/handlers/delayed_retry.rb, line 47 def reject(hdr, props, msg, requeue = false) if requeue @channel.reject(hdr.delivery_tag, requeue) else handle_retry(hdr, props, msg, :reject) end end
Private Instance Methods
exchange_durable?()
click to toggle source
# File lib/sneakers/handlers/delayed_retry.rb, line 129 def exchange_durable? queue_durable? end
failure_count(headers)
click to toggle source
# File lib/sneakers/handlers/delayed_retry.rb, line 106 def failure_count(headers) if headers.nil? || headers['x-death'].nil? 0 else x_death_array = headers['x-death'].select do |x_death| x_death['queue'] == @worker_queue_name end if x_death_array.count > 0 && x_death_array.first['count'] x_death_array.inject(0) { |sum, x_death| sum + x_death['count'] } else x_death_array.count end end end
handle_retry(hdr, props, msg, reason)
click to toggle source
# File lib/sneakers/handlers/delayed_retry.rb, line 63 def handle_retry(hdr, props, msg, reason) num_attempts = failure_count(props[:headers]) + 1 if (num_attempts <= @max_retries) && retriable_on?(reason) Sneakers.logger.info do "#{log_prefix} msg=retrying, count=#{num_attempts}, headers=#{props[:headers]}" end sleep(@sleep_before_retry) @on_retry.call(reason, msg, num_attempts) rescue nil @channel.reject(hdr.delivery_tag, false) else Sneakers.logger.info do "#{log_prefix} msg=failing, retry_count=#{num_attempts}, reason=#{reason}" end @on_error.call(reason, msg, num_attempts) rescue nil error_data = { error: reason.to_s, num_attempts: num_attempts, failed_at: Time.now.iso8601, }.tap do |hash| if reason.is_a?(Exception) hash[:error_class] = reason.class.to_s hash[:error_message] = reason.to_s if reason.backtrace hash[:backtrace] = reason.backtrace.take(10).join(', ') end end end @error_exchange.publish(msg, routing_key: hdr.routing_key, headers: { error_data: error_data }) @channel.acknowledge(hdr.delivery_tag, false) end end
log_prefix()
click to toggle source
# File lib/sneakers/handlers/delayed_retry.rb, line 121 def log_prefix "DelayedRetry handler [queue=#{@worker_queue_name}]" end
queue_durable?()
click to toggle source
# File lib/sneakers/handlers/delayed_retry.rb, line 125 def queue_durable? @opts.fetch(:queue_options, {}).fetch(:durable, false) end
retriable_on?(exception)
click to toggle source
# File lib/sneakers/handlers/delayed_retry.rb, line 102 def retriable_on?(exception) @retriable_errors.map { |error_class| exception.is_a?(error_class) }.any? end