class Warren::DelayExchange

Configures and wraps up delay exchange on a Bunny Channel/Queue A delay exchange routes immediately onto a queue with a ttl once messages on this queue expire they are dead-lettered back onto to original exchange Note: This does not currently support the rabbitmq-delayed-message-exchange plugin.

Attributes

channel[R]

Public Class Methods

new(channel:, config:) click to toggle source

Create a new delay exchange. Handles queue creation, binding and attaching consumers to the queues

@param channel [Warren::Handler::Broadcast::Channel] A channel on which to register queues @param config [Hash] queue configuration hash

# File lib/warren/delay_exchange.rb, line 22
def initialize(channel:, config:)
  @channel = channel
  @exchange_config = config&.fetch('exchange', nil)
  @bindings = config&.fetch('bindings', []) || []
end

Public Instance Methods

activate!() click to toggle source

Ensures the queues and channels are set up to receive messages keys: additional routing_keys to bind

# File lib/warren/delay_exchange.rb, line 32
def activate!
  establish_bindings!
end
publish(payload, routing_key:, headers: {}) click to toggle source

Post a message to the delay exchange.

@param payload [String] The message payload @param routing_key [String] The routing key of the re-sent message @param headers [Hash] A hash of headers. Typically: { attempts: <Integer> } @option headers [Integer] :attempts The number of times the message has been processed

@return [Void]

# File lib/warren/delay_exchange.rb, line 46
def publish(payload, routing_key:, headers: {})
  raise StandardError, 'No delay queue configured' unless configured?

  message = Warren::Message::Simple.new(routing_key, payload, headers)
  channel.publish(message, exchange: exchange)
end

Private Instance Methods

add_binding(queue, options) click to toggle source
# File lib/warren/delay_exchange.rb, line 59
def add_binding(queue, options)
  queue.bind(exchange, options)
end
configured?() click to toggle source
# File lib/warren/delay_exchange.rb, line 55
def configured?
  @exchange_config&.key?('name')
end
establish_bindings!() click to toggle source
# File lib/warren/delay_exchange.rb, line 71
def establish_bindings!
  @bindings.each do |binding_config|
    queue = queue(binding_config['queue'])
    transformed_options = merge_routing_key_prefix(binding_config['options'])
    add_binding(queue, transformed_options)
  end
end
exchange() click to toggle source
# File lib/warren/delay_exchange.rb, line 63
def exchange
  @exchange ||= channel.exchange(*@exchange_config.values_at('name', 'options'))
end
merge_routing_key_prefix(options) click to toggle source
# File lib/warren/delay_exchange.rb, line 79
def merge_routing_key_prefix(options)
  options.transform_values do |value|
    format(value, routing_key_prefix: channel.routing_key_prefix)
  end
end
queue(config) click to toggle source
# File lib/warren/delay_exchange.rb, line 67
def queue(config)
  channel.queue(*config.values_at('name', 'options'))
end