class EventBus::Broker::Rabbit::Queue

Attributes

channel[R]

Public Class Methods

new(channel) click to toggle source
# File lib/event_bus/broker/rabbit/queue.rb, line 4
def initialize(channel)
  @channel = channel
  @channel.prefetch(1)
end
subscribe(channel, routing_key, &block) click to toggle source
# File lib/event_bus/broker/rabbit/queue.rb, line 9
def self.subscribe(channel, routing_key, &block)
  new(channel).subscribe(routing_key, &block)
end

Public Instance Methods

subscribe(routing_key, &block) click to toggle source
# File lib/event_bus/broker/rabbit/queue.rb, line 13
def subscribe(routing_key, &block)
  name = queue_name(routing_key)

  channel.queue(name, queue_options)
    .bind(topic, routing_key: routing_key)
    .subscribe(manual_ack: true) do |delivery_info, properties, payload|
      callback(delivery_info, properties, payload, &block)
    end
end

Private Instance Methods

callback(delivery_info, properties, payload, &block) click to toggle source
# File lib/event_bus/broker/rabbit/queue.rb, line 27
def callback(delivery_info, properties, payload, &block)
  event_name = delivery_info.routing_key

  event = EventBus::Event.new(event_name, payload)

  block.call(event, channel, delivery_info)
end
queue_name(routing_key) click to toggle source
# File lib/event_bus/broker/rabbit/queue.rb, line 43
def queue_name(routing_key)
  "#{EventBus::Config::APP_NAME.downcase}-#{routing_key.downcase}"
end
queue_options() click to toggle source
# File lib/event_bus/broker/rabbit/queue.rb, line 39
def queue_options
  { durable: true }
end
topic() click to toggle source
# File lib/event_bus/broker/rabbit/queue.rb, line 35
def topic
  Rabbit::Topic.topic(channel)
end