class AMQP::Hermes::Receiver

Attributes

_listening[RW]
exchange[R]
messages[R]
queue[R]
routing_key[R]

Public Class Methods

new(queue, topic=nil, options={}) click to toggle source
# File lib/amqp-hermes/receiver.rb, line 9
def initialize(queue, topic=nil, options={})
  raise "You *MUST* specify a queue" if queue.nil? or queue.empty?
  @queue = queue

  if topic.is_a? Hash
    options = topic.dup
    topic = options.delete(:topic)
  end

  @routing_key = options.delete(:routing_key)
  @routing_key ||= "#{queue}.*"

  if @routing_key !~ Regexp.new(queue)
    @routing_key = "#{queue}.routing_key"
  end

  @handler = options.delete(:handler) || self

  options[:auto_delete] ||= true

  topic ||= "pub/sub"
  @exchange = channel.topic(topic, options)

  @messages = []
  @_listening = false

  self.open_connection
  self.listen
end

Public Instance Methods

clear() click to toggle source
# File lib/amqp-hermes/receiver.rb, line 62
def clear
  @messages = []
end
inspect() click to toggle source
# File lib/amqp-hermes/receiver.rb, line 66
def inspect
  %Q{#<Hermes::Receiver @queue="#{@queue}" @routing_key="#{@routing_key}" @exchange="#{@exchange}" open=#{self.open?} listening=#{self.listening?}>}
end
listen() click to toggle source
# File lib/amqp-hermes/receiver.rb, line 39
def listen
  Thread.new(self, @handler) do |receiver, handler|
    receiver._listening = true

    receiver.channel.queue(receiver.queue).bind(
      receiver.exchange, :routing_key => receiver.routing_key
    ).subscribe(:ack => true) do |headers, payload|
      handler.receive(AMQP::Hermes::Message.new(headers, payload))
      headers.ack
    end
  end
end
listening?() click to toggle source
# File lib/amqp-hermes/receiver.rb, line 52
def listening?
  @_listening == true ? true : false
end
receive(message) click to toggle source

implement the handler interface

# File lib/amqp-hermes/receiver.rb, line 57
def receive(message)
  return nil if !message.kind_of?(AMQP::Hermes::Message)
  @messages << message
end