class Warren::Subscription

Configures and wraps up subscriptions on a Bunny Channel/Queue

Attributes

channel[R]

Public Class Methods

new(channel:, config:) click to toggle source

Great a new subscription. Handles queue creation, binding and attaching consumers to the queues

@param channel [Warren::Handler::Broadcast::Channel] A channel on which to register queues @param config [Hash] queue configuration hash

# File lib/warren/subscription.rb, line 17
def initialize(channel:, config:)
  @channel = channel
  @queue_name = config&.fetch('name')
  @queue_options = config&.fetch('options')
  @bindings = config&.fetch('bindings')
end

Public Instance Methods

activate!() click to toggle source

Ensures the queues and channels are set up to receive messages keys: additional routing_keys to bind

# File lib/warren/subscription.rb, line 44
def activate!
  establish_bindings!
end
subscribe(consumer_tag, &block) click to toggle source

Subscribes to the given queue

@param consumer_tag [String] Identifier for the consumer

@yieldparam [Bunny::DeliveryInfo] delivery_info Metadata about the delivery @yieldparam [Bunny::MessageProperties] properties @yieldparam [String] payload the contents of the message

@return [Bunny::Consumer] The bunny consumer object

# File lib/warren/subscription.rb, line 37
def subscribe(consumer_tag, &block)
  channel.prefetch(10)
  queue.subscribe(manual_ack: true, block: false, consumer_tag: consumer_tag, durable: true, &block)
end

Private Instance Methods

add_binding(exchange, options) click to toggle source
# File lib/warren/subscription.rb, line 50
def add_binding(exchange, options)
  queue.bind(exchange, options)
end
establish_bindings!() click to toggle source
# File lib/warren/subscription.rb, line 64
def establish_bindings!
  @bindings.each do |binding_config|
    exchange = exchange(binding_config['exchange'])
    transformed_options = merge_routing_key_prefix(binding_config['options'])
    add_binding(exchange, transformed_options)
  end
end
exchange(config) click to toggle source
# File lib/warren/subscription.rb, line 54
def exchange(config)
  channel.exchange(*config.values_at('name', 'options'))
end
merge_routing_key_prefix(options) click to toggle source
# File lib/warren/subscription.rb, line 72
def merge_routing_key_prefix(options)
  options.transform_values do |value|
    format(value, routing_key_prefix: channel.routing_key_prefix)
  end
end
queue() click to toggle source
# File lib/warren/subscription.rb, line 58
def queue
  raise StandardError, 'No queue configured' if @queue_name.nil?

  @queue ||= channel.queue(@queue_name, @queue_options)
end