class Basquiat::Adapters::RabbitMq::DelayedDelivery

Attributes

options[R]

Public Class Methods

new(session) click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 18
def initialize(session)
  super
  setup_delayed_delivery
  @queue_name = session.queue_name
end
setup(opts) click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 11
def setup(opts)
  @options = { retries:            5,
               exchange_name:      'basquiat.dlx',
               queue_name_preffix: 'basquiat.ddlq' }.deep_merge(opts)
end

Public Instance Methods

requeue(message) click to toggle source

@param [Message] message the, well, message to be requeued

# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 31
def requeue(message)
  @exchange.publish(Basquiat::Json.encode(message), routing_key: requeue_route_for(message.di.routing_key))
  ack(message)
end
run(message) { || ... } click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 24
def run(message)
  message.routing_key = extract_event_info(message.routing_key)[0]
  yield
  public_send(message.action, message)
end

Private Instance Methods

create_and_bind_rejected_queue() click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 68
def create_and_bind_rejected_queue
  queue = session.channel.queue("#{options[:queue_name_preffix]}_rejected", durable: true)
  queue.bind(@exchange, routing_key: 'rejected.#')
end
extract_event_info(key) click to toggle source

@param [#match] key the current routing key of the message @return [Array<String, Integer>] a 2 item array composed of the event.name (aka original routing_key) and

the current timeout
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 52
def extract_event_info(key)
  matched = key.match(/^(\d+)\.#{session.queue.name}\.(.+)$/)
  matched && [matched.captures[1], matched.captures[0].to_i] || [key, 500]
end
max_timeout() click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 86
def max_timeout
  2**(options[:retries] - 1) * 1_000
end
options() click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 57
def options
  self.class.options
end
prepare_timeout_queues() click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 73
def prepare_timeout_queues
  (0..options[:retries] - 1).each do |iteration|
    timeout = 2**iteration
    queue   = session.channel.queue("#{options[:queue_name_preffix]}_#{timeout}",
                                    durable:   true,
                                    arguments: {
                                      'x-dead-letter-exchange' => session.exchange.name,
                                      'x-message-ttl'          => timeout * 1_000
                                    })
    queue.bind(@exchange, routing_key: "#{timeout * 1_000}.#")
  end
end
requeue_route_for(key) click to toggle source

@param [#match] key the current routing key of the message @return [String] the calculated routing key for a republish / requeue

# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 40
def requeue_route_for(key)
  event_name, timeout = extract_event_info(key)
  if timeout == max_timeout
    "rejected.#{@queue_name}.#{event_name}"
  else
    "#{timeout * 2}.#{@queue_name}.#{event_name}"
  end
end
setup_delayed_delivery() click to toggle source
# File lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb, line 61
def setup_delayed_delivery
  @exchange = session.channel.topic(options[:exchange_name], durable: true)
  session.bind_queue("*.#{session.queue.name}.#")
  prepare_timeout_queues
  create_and_bind_rejected_queue
end