class PikaQ::Consumers::Base
Attributes
consumer[R]
consumer_tag[R]
exchange[R]
queue[R]
routing_key[R]
Public Class Methods
config(options = {})
click to toggle source
# File lib/pika_q/consumers/base.rb, line 10 def self.config(options = {}) options ||= {} if options.nil? options = default_config.merge(options) @consumer_tag = options.fetch(:consumer_tag) @exchange = options.fetch(:exchange) @routing_key = options.fetch(:routing_key) @queue = options.fetch(:queue) end
start(channel, &block)
click to toggle source
# File lib/pika_q/consumers/base.rb, line 19 def self.start(channel, &block) @exchange = exchange.establish(channel) if exchange.respond_to? :establish @queue = queue.create(channel) if queue.respond_to? :create unless exchange.predeclared? queue.bind(exchange, { routing_key: routing_key }) end @consumer = queue.subscribe(manual_ack: true, block: true, consumer_tag: consumer_tag, &Proc.new) consumer rescue Interrupt consumer.cancel consumer.channel.close end
Private Class Methods
default_config()
click to toggle source
# File lib/pika_q/consumers/base.rb, line 43 def self.default_config { consumer_tag: default_consumer_tag, exchange: PikaQ::Exchanges::Default, routing_key: nil, queue: '' }.freeze end
default_consumer_tag(unique_id = SecureRandom.uuid)
click to toggle source
# File lib/pika_q/consumers/base.rb, line 39 def self.default_consumer_tag(unique_id = SecureRandom.uuid) "#{self.to_s.gsub('::','.')}.#{ENV['RUBY_ENV']}.#{unique_id}".downcase end