class Basquiat::Adapters::RabbitMq::DelayedDelivery
Attributes
options[R]
Public Class Methods
new(session)
click to toggle source
Calls superclass method
Basquiat::Adapters::RabbitMq::BaseStrategy::new
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 18 def initialize(session) super setup_delayed_delivery @queue_name = session.queue_name end
setup(opts)
click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 11 def setup(opts) @options = { retries: 5, exchange_name: 'basquiat.dlx', queue_name_preffix: 'basquiat.ddlq' }.deep_merge(opts) end
Public Instance Methods
requeue(message)
click to toggle source
@param [Message] message the, well, message to be requeued
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 31 def requeue(message) @exchange.publish(Basquiat::Json.encode(message), routing_key: requeue_route_for(message.di.routing_key)) ack(message) end
run(message) { || ... }
click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 24 def run(message) message.routing_key = extract_event_info(message.routing_key)[0] yield public_send(message.action, message) end
Private Instance Methods
create_and_bind_rejected_queue()
click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 68 def create_and_bind_rejected_queue queue = session.channel.queue("#{options[:queue_name_preffix]}_rejected", durable: true) queue.bind(@exchange, routing_key: 'rejected.#') end
extract_event_info(key)
click to toggle source
@param [#match] key the current routing key of the message @return [Array<String, Integer>] a 2 item array composed of the event.name (aka original routing_key) and
the current timeout
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 52 def extract_event_info(key) matched = key.match(/^(\d+)\.#{session.queue.name}\.(.+)$/) matched && [matched.captures[1], matched.captures[0].to_i] || [key, 500] end
max_timeout()
click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 86 def max_timeout 2**(options[:retries] - 1) * 1_000 end
options()
click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 57 def options self.class.options end
prepare_timeout_queues()
click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 73 def prepare_timeout_queues (0..options[:retries] - 1).each do |iteration| timeout = 2**iteration queue = session.channel.queue("#{options[:queue_name_preffix]}_#{timeout}", durable: true, arguments: { 'x-dead-letter-exchange' => session.exchange.name, 'x-message-ttl' => timeout * 1_000 }) queue.bind(@exchange, routing_key: "#{timeout * 1_000}.#") end end
requeue_route_for(key)
click to toggle source
@param [#match] key the current routing key of the message @return [String] the calculated routing key for a republish / requeue
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 40 def requeue_route_for(key) event_name, timeout = extract_event_info(key) if timeout == max_timeout "rejected.#{@queue_name}.#{event_name}" else "#{timeout * 2}.#{@queue_name}.#{event_name}" end end
setup_delayed_delivery()
click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 61 def setup_delayed_delivery @exchange = session.channel.topic(options[:exchange_name], durable: true) session.bind_queue("*.#{session.queue.name}.#") prepare_timeout_queues create_and_bind_rejected_queue end