class PikaQ::Consumers::Base

Attributes

consumer[R]
consumer_tag[R]
exchange[R]
queue[R]
routing_key[R]

Public Class Methods

config(options = {}) click to toggle source
# File lib/pika_q/consumers/base.rb, line 10
def self.config(options = {})
  options ||= {} if options.nil?
  options = default_config.merge(options)
  @consumer_tag = options.fetch(:consumer_tag)
  @exchange = options.fetch(:exchange)
  @routing_key = options.fetch(:routing_key)
  @queue = options.fetch(:queue)
end
start(channel, &block) click to toggle source
# File lib/pika_q/consumers/base.rb, line 19
def self.start(channel, &block)
  @exchange = exchange.establish(channel) if exchange.respond_to? :establish
  @queue = queue.create(channel) if queue.respond_to? :create

  unless exchange.predeclared?
    queue.bind(exchange, { routing_key: routing_key })
  end

  @consumer = queue.subscribe(manual_ack: true,
                            block: true,
                            consumer_tag: consumer_tag,
                            &Proc.new)
  consumer
rescue Interrupt
  consumer.cancel
  consumer.channel.close
end

Private Class Methods

default_config() click to toggle source
# File lib/pika_q/consumers/base.rb, line 43
def self.default_config
  {
    consumer_tag: default_consumer_tag,
    exchange: PikaQ::Exchanges::Default,
    routing_key: nil,
    queue: ''
  }.freeze
end
default_consumer_tag(unique_id = SecureRandom.uuid) click to toggle source
# File lib/pika_q/consumers/base.rb, line 39
def self.default_consumer_tag(unique_id = SecureRandom.uuid)
  "#{self.to_s.gsub('::','.')}.#{ENV['RUBY_ENV']}.#{unique_id}".downcase
end