class Synapses::Consumer
@author Alexander Semyonov <al@semyonov.us>
Attributes
channel[RW]
@return [AMQP::Channel]
Public Class Methods
inherited(child)
click to toggle source
Calls superclass method
# File lib/synapses/consumer.rb, line 26 def self.inherited(child) super child.subscriptions = subscriptions.dup end
new(channel = Synapses.default_channel)
click to toggle source
@param [AMQP::Channel] channel
# File lib/synapses/consumer.rb, line 47 def initialize(channel = Synapses.default_channel) @channel = channel queue.subscribe(&method(:message_handler)) end
on(message_type = nil, &block)
click to toggle source
# File lib/synapses/consumer.rb, line 38 def self.on(message_type = nil, &block) if message_type subscriptions[message_type.message_type] << [message_type, block] else subscriptions[nil] << [nil, block] end end
queue(name, contract=Synapses.default_contract)
click to toggle source
@param [String] name @param [Synapses::Contract] contract
# File lib/synapses/consumer.rb, line 33 def self.queue(name, contract=Synapses.default_contract) self.queue_name = name self.contract = contract end
Public Instance Methods
message_handler(metadata, payload)
click to toggle source
@param [AMQP::Header] metadata
# File lib/synapses/consumer.rb, line 54 def message_handler(metadata, payload) if (typed_subscriptions = self.subscriptions[metadata.type]).any? typed_subscriptions.each do |message_class, block| message = message_class.parse(metadata, payload) block.call(message) end end if (typeless_subscriptions = self.subscriptions[nil]).any? typeless_subscriptions.each do |_, block| if block.arity == 2 block.call(metadata, payload) else message = Messages.parse(metadata, payload) block.call(message) end end end unless (typed_subscriptions + typeless_subscriptions).any? puts "#{self} -> #{metadata.type}, #{payload}" #puts "#{self} received a message:" #puts " metadata.routing_key : #{metadata.routing_key}" #puts " metadata.content_type: #{metadata.content_type}" #puts " metadata.priority : #{metadata.priority}" #puts " metadata.headers : #{metadata.headers.inspect}" #puts " metadata.timestamp : #{metadata.timestamp.inspect}" #puts " metadata.type : #{metadata.type}" #puts " metadata.delivery_tag: #{metadata.delivery_tag}" #puts " metadata.redelivered : #{metadata.redelivered}" ##puts " metadata.app_id : #{metadata.app_id}" #puts " metadata.exchange : #{metadata.exchange}" #puts #puts " Received a message: #{payload}" end rescue => e puts e end
queue()
click to toggle source
# File lib/synapses/consumer.rb, line 96 def queue @queue ||= begin queue = contract.queue(queue_name, channel) queue.bind(exchange) queue end end