class DaemonObjects::Amqp::Worker

Constants

DEFAULTS

Attributes

arguments[RW]
channel[RW]
consumer[RW]
exchange[RW]
logger[RW]
queue_name[RW]
routing_key[RW]

Public Class Methods

new(channel, consumer, options={}) click to toggle source
# File lib/daemon_objects/amqp/worker.rb, line 11
def initialize(channel, consumer, options={})
  self.consumer = consumer
  self.channel  = channel

  parse_options(DEFAULTS.merge(options))
end

Public Instance Methods

channel=(value) click to toggle source
# File lib/daemon_objects/amqp/worker.rb, line 24
def channel=(value)
  value.on_error(&method(:handle_channel_exception))
  @channel = value
end
handle_channel_exception(channel, channel_close) click to toggle source
# File lib/daemon_objects/amqp/worker.rb, line 45
def handle_channel_exception(channel, channel_close)
  raise StandardError, "ERROR channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
end
handle_message(channel, delivery_tag, payload) click to toggle source
# File lib/daemon_objects/amqp/worker.rb, line 49
def handle_message(channel, delivery_tag, payload)
  response = consumer.handle_message (payload)
  channel.acknowledge(delivery_tag, true)
  response
rescue Exception => e
  channel.reject(delivery_tag)
  logger.error "Error occurred handling message, the payload was: #{payload}, the error was: '#{e}'."
  e
end
parse_options(options) click to toggle source
# File lib/daemon_objects/amqp/worker.rb, line 18
def parse_options(options)
  options.each do |k,v|
    self.send("#{k}=", v) if self.respond_to?("#{k}=")
  end
end
start() click to toggle source
# File lib/daemon_objects/amqp/worker.rb, line 29
def start
  queue = channel.queue(queue_name, :durable => true, :arguments => arguments)
  queue.bind(exchange, :routing_key => routing_key) if exchange

  queue.subscribe(:block => true, :ack => true) do |delivery_info, properties, payload|
    exception = handle_message(channel, delivery_info.delivery_tag, payload)

    response_payload = consumer.get_response(payload, exception) if consumer.respond_to?(:get_response)
    if response_payload
      channel.default_exchange.publish(response_payload.to_json, 
                                       :routing_key    => properties.reply_to, 
                                       :correlation_id => properties.message_id)
    end
  end
end