class Leveret::DelayQueue
Connects to a special queue which keeps messages in a holding pattern until a timeout expires and then publishes those messages back to the main queue for processing.
Attributes
queue[R]
Public Class Methods
new()
click to toggle source
# File lib/leveret/delay_queue.rb, line 11 def initialize @queue = connect_to_queue end
Public Instance Methods
republish(message)
click to toggle source
Place a message onto the delay queue, which will later be expired and sent back to the main exchange
@param [Message] A message received and processed already
# File lib/leveret/delay_queue.rb, line 18 def republish(message) delay_exchange.publish(message.params.serialize, expiration: configuration.delay_time, persistent: true, routing_key: message.routing_key, priority: message.priority) end
Private Instance Methods
connect_to_queue()
click to toggle source
# File lib/leveret/delay_queue.rb, line 25 def connect_to_queue queue = channel.queue(configuration.delay_queue_name, durable: true, arguments: { 'x-dead-letter-exchange': configuration.exchange_name }) queue.bind(delay_exchange) log.info "Connected to #{configuration.delay_queue_name}, bound to #{configuration.delay_exchange_name}" queue end
delay_exchange()
click to toggle source
# File lib/leveret/delay_queue.rb, line 33 def delay_exchange @delay_exchange ||= channel.exchange(Leveret.configuration.delay_exchange_name, type: :fanout, durable: :true) end