class Synapses::Consumer

@author Alexander Semyonov <al@semyonov.us>

Attributes

channel[RW]

@return [AMQP::Channel]

Public Class Methods

inherited(child) click to toggle source
Calls superclass method
# File lib/synapses/consumer.rb, line 26
def self.inherited(child)
  super
  child.subscriptions = subscriptions.dup
end
new(channel = Synapses.default_channel) click to toggle source

@param [AMQP::Channel] channel

# File lib/synapses/consumer.rb, line 47
def initialize(channel = Synapses.default_channel)
  @channel = channel

  queue.subscribe(&method(:message_handler))
end
on(message_type = nil, &block) click to toggle source
# File lib/synapses/consumer.rb, line 38
def self.on(message_type = nil, &block)
  if message_type
    subscriptions[message_type.message_type] << [message_type, block]
  else
    subscriptions[nil] << [nil, block]
  end
end
queue(name, contract=Synapses.default_contract) click to toggle source

@param [String] name @param [Synapses::Contract] contract

# File lib/synapses/consumer.rb, line 33
def self.queue(name, contract=Synapses.default_contract)
  self.queue_name = name
  self.contract = contract
end

Public Instance Methods

message_handler(metadata, payload) click to toggle source

@param [AMQP::Header] metadata

# File lib/synapses/consumer.rb, line 54
def message_handler(metadata, payload)
  if (typed_subscriptions = self.subscriptions[metadata.type]).any?
    typed_subscriptions.each do |message_class, block|
      message = message_class.parse(metadata, payload)
      block.call(message)
    end
  end

  if (typeless_subscriptions = self.subscriptions[nil]).any?
    typeless_subscriptions.each do |_, block|
      if block.arity == 2
        block.call(metadata, payload)
      else
        message = Messages.parse(metadata, payload)
        block.call(message)
      end
    end
  end

  unless (typed_subscriptions + typeless_subscriptions).any?
    puts "#{self} -> #{metadata.type}, #{payload}"
    #puts "#{self} received a message:"
    #puts "  metadata.routing_key : #{metadata.routing_key}"
    #puts "  metadata.content_type: #{metadata.content_type}"
    #puts "  metadata.priority    : #{metadata.priority}"
    #puts "  metadata.headers     : #{metadata.headers.inspect}"
    #puts "  metadata.timestamp   : #{metadata.timestamp.inspect}"
    #puts "  metadata.type        : #{metadata.type}"
    #puts "  metadata.delivery_tag: #{metadata.delivery_tag}"
    #puts "  metadata.redelivered : #{metadata.redelivered}"
    ##puts "  metadata.app_id      : #{metadata.app_id}"
    #puts "  metadata.exchange    : #{metadata.exchange}"
    #puts
    #puts "  Received a message: #{payload}"
  end
rescue => e
  puts e
end
queue() click to toggle source
# File lib/synapses/consumer.rb, line 96
def queue
  @queue ||= begin
    queue = contract.queue(queue_name, channel)
    queue.bind(exchange)
    queue
  end
end