class Warren::DelayExchange
Configures and wraps up delay exchange on a Bunny Channel/Queue A delay exchange routes immediately onto a queue with a ttl once messages on this queue expire they are dead-lettered back onto to original exchange Note: This does not currently support the rabbitmq-delayed-message-exchange plugin.
Attributes
Public Class Methods
Create a new delay exchange. Handles queue creation, binding and attaching consumers to the queues
@param channel [Warren::Handler::Broadcast::Channel] A channel on which to register queues @param config [Hash] queue configuration hash
# File lib/warren/delay_exchange.rb, line 22 def initialize(channel:, config:) @channel = channel @exchange_config = config&.fetch('exchange', nil) @bindings = config&.fetch('bindings', []) || [] end
Public Instance Methods
Ensures the queues and channels are set up to receive messages keys: additional routing_keys to bind
# File lib/warren/delay_exchange.rb, line 32 def activate! establish_bindings! end
Post a message to the delay exchange.
@param payload [String] The message payload @param routing_key [String] The routing key of the re-sent message @param headers [Hash] A hash of headers. Typically: { attempts: <Integer> } @option headers [Integer] :attempts The number of times the message has been processed
@return [Void]
# File lib/warren/delay_exchange.rb, line 46 def publish(payload, routing_key:, headers: {}) raise StandardError, 'No delay queue configured' unless configured? message = Warren::Message::Simple.new(routing_key, payload, headers) channel.publish(message, exchange: exchange) end
Private Instance Methods
# File lib/warren/delay_exchange.rb, line 59 def add_binding(queue, options) queue.bind(exchange, options) end
# File lib/warren/delay_exchange.rb, line 55 def configured? @exchange_config&.key?('name') end
# File lib/warren/delay_exchange.rb, line 71 def establish_bindings! @bindings.each do |binding_config| queue = queue(binding_config['queue']) transformed_options = merge_routing_key_prefix(binding_config['options']) add_binding(queue, transformed_options) end end
# File lib/warren/delay_exchange.rb, line 63 def exchange @exchange ||= channel.exchange(*@exchange_config.values_at('name', 'options')) end
# File lib/warren/delay_exchange.rb, line 79 def merge_routing_key_prefix(options) options.transform_values do |value| format(value, routing_key_prefix: channel.routing_key_prefix) end end
# File lib/warren/delay_exchange.rb, line 67 def queue(config) channel.queue(*config.values_at('name', 'options')) end