class DaemonObjects::Amqp::Worker
Constants
- DEFAULTS
Attributes
arguments[RW]
channel[RW]
consumer[RW]
exchange[RW]
logger[RW]
queue_name[RW]
routing_key[RW]
Public Class Methods
new(channel, consumer, options={})
click to toggle source
# File lib/daemon_objects/amqp/worker.rb, line 11 def initialize(channel, consumer, options={}) self.consumer = consumer self.channel = channel parse_options(DEFAULTS.merge(options)) end
Public Instance Methods
channel=(value)
click to toggle source
# File lib/daemon_objects/amqp/worker.rb, line 24 def channel=(value) value.on_error(&method(:handle_channel_exception)) @channel = value end
handle_channel_exception(channel, channel_close)
click to toggle source
# File lib/daemon_objects/amqp/worker.rb, line 45 def handle_channel_exception(channel, channel_close) raise StandardError, "ERROR channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}" end
handle_message(channel, delivery_tag, payload)
click to toggle source
# File lib/daemon_objects/amqp/worker.rb, line 49 def handle_message(channel, delivery_tag, payload) response = consumer.handle_message (payload) channel.acknowledge(delivery_tag, true) response rescue Exception => e channel.reject(delivery_tag) logger.error "Error occurred handling message, the payload was: #{payload}, the error was: '#{e}'." e end
parse_options(options)
click to toggle source
# File lib/daemon_objects/amqp/worker.rb, line 18 def parse_options(options) options.each do |k,v| self.send("#{k}=", v) if self.respond_to?("#{k}=") end end
start()
click to toggle source
# File lib/daemon_objects/amqp/worker.rb, line 29 def start queue = channel.queue(queue_name, :durable => true, :arguments => arguments) queue.bind(exchange, :routing_key => routing_key) if exchange queue.subscribe(:block => true, :ack => true) do |delivery_info, properties, payload| exception = handle_message(channel, delivery_info.delivery_tag, payload) response_payload = consumer.get_response(payload, exception) if consumer.respond_to?(:get_response) if response_payload channel.default_exchange.publish(response_payload.to_json, :routing_key => properties.reply_to, :correlation_id => properties.message_id) end end end