class Warren::Subscription
Configures and wraps up subscriptions on a Bunny Channel/Queue
Attributes
channel[R]
Public Class Methods
new(channel:, config:)
click to toggle source
Great a new subscription. Handles queue creation, binding and attaching consumers to the queues
@param channel [Warren::Handler::Broadcast::Channel] A channel on which to register queues @param config [Hash] queue configuration hash
# File lib/warren/subscription.rb, line 17 def initialize(channel:, config:) @channel = channel @queue_name = config&.fetch('name') @queue_options = config&.fetch('options') @bindings = config&.fetch('bindings') end
Public Instance Methods
activate!()
click to toggle source
Ensures the queues and channels are set up to receive messages keys: additional routing_keys to bind
# File lib/warren/subscription.rb, line 44 def activate! establish_bindings! end
subscribe(consumer_tag, &block)
click to toggle source
Subscribes to the given queue
@param consumer_tag [String] Identifier for the consumer
@yieldparam [Bunny::DeliveryInfo] delivery_info Metadata about the delivery @yieldparam [Bunny::MessageProperties] properties @yieldparam [String] payload the contents of the message
@return [Bunny::Consumer] The bunny consumer object
# File lib/warren/subscription.rb, line 37 def subscribe(consumer_tag, &block) channel.prefetch(10) queue.subscribe(manual_ack: true, block: false, consumer_tag: consumer_tag, durable: true, &block) end
Private Instance Methods
add_binding(exchange, options)
click to toggle source
# File lib/warren/subscription.rb, line 50 def add_binding(exchange, options) queue.bind(exchange, options) end
establish_bindings!()
click to toggle source
# File lib/warren/subscription.rb, line 64 def establish_bindings! @bindings.each do |binding_config| exchange = exchange(binding_config['exchange']) transformed_options = merge_routing_key_prefix(binding_config['options']) add_binding(exchange, transformed_options) end end
exchange(config)
click to toggle source
# File lib/warren/subscription.rb, line 54 def exchange(config) channel.exchange(*config.values_at('name', 'options')) end
merge_routing_key_prefix(options)
click to toggle source
# File lib/warren/subscription.rb, line 72 def merge_routing_key_prefix(options) options.transform_values do |value| format(value, routing_key_prefix: channel.routing_key_prefix) end end
queue()
click to toggle source
# File lib/warren/subscription.rb, line 58 def queue raise StandardError, 'No queue configured' if @queue_name.nil? @queue ||= channel.queue(@queue_name, @queue_options) end