class Rabbitek::Starter

Main server startup

Attributes

connection[R]
consumers[R]
opts[R]
queue_name[R]

Public Class Methods

new(connection, configuration) click to toggle source
# File lib/rabbitek/server/starter.rb, line 9
def initialize(connection, configuration)
  @connection = connection
  @queue_name = configuration[:parameters][:queue]
  @consumers = configuration[:consumers]
  @opts = configuration[:parameters]
end

Public Instance Methods

channel() click to toggle source
# File lib/rabbitek/server/starter.rb, line 26
def channel
  @channel ||= begin
                 channel = connection.create_channel
                 channel.basic_qos(opts[:basic_qos]) if opts[:basic_qos].present?
                 channel
               end
end
retry_or_delayed_exchange() click to toggle source
# File lib/rabbitek/server/starter.rb, line 50
def retry_or_delayed_exchange
  @retry_or_delayed_exchange ||= Utils::Common.exchange(
    channel,
    :direct,
    Utils::RabbitObjectNames.retry_or_delayed_bind_exchange(opts[:bind_exchange])
  )
end
retry_or_delayed_queue() click to toggle source
# File lib/rabbitek/server/starter.rb, line 42
def retry_or_delayed_queue
  @retry_or_delayed_queue ||= Utils::Common.queue(
    channel,
    Utils::RabbitObjectNames.retry_or_delayed_queue(opts[:queue]),
    arguments: { 'x-dead-letter-exchange': opts[:bind_exchange] }
  )
end
start() click to toggle source
# File lib/rabbitek/server/starter.rb, line 16
def start
  setup_bindings!

  work_queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
    Rabbitek.reloader.call do
      MessageProcessor.new(self, delivery_info, properties, payload).process
    end
  end
end
work_exchange() click to toggle source
# File lib/rabbitek/server/starter.rb, line 34
def work_exchange
  @work_exchange ||= Utils::Common.exchange(channel, 'direct', opts[:bind_exchange])
end
work_queue() click to toggle source
# File lib/rabbitek/server/starter.rb, line 38
def work_queue
  @work_queue ||= Utils::Common.queue(channel, queue_name, opts[:queue_attributes])
end

Private Instance Methods

setup_bindings!() click to toggle source
# File lib/rabbitek/server/starter.rb, line 62
def setup_bindings!
  consumers.each do |worker_class|
    work_queue.bind(work_exchange, routing_key: worker_class.to_s)
    retry_or_delayed_queue.bind(retry_or_delayed_exchange, routing_key: worker_class.to_s)
  end
end