class AMQP::Hermes::Receiver
Attributes
_listening[RW]
exchange[R]
messages[R]
queue[R]
routing_key[R]
Public Class Methods
new(queue, topic=nil, options={})
click to toggle source
# File lib/amqp-hermes/receiver.rb, line 9 def initialize(queue, topic=nil, options={}) raise "You *MUST* specify a queue" if queue.nil? or queue.empty? @queue = queue if topic.is_a? Hash options = topic.dup topic = options.delete(:topic) end @routing_key = options.delete(:routing_key) @routing_key ||= "#{queue}.*" if @routing_key !~ Regexp.new(queue) @routing_key = "#{queue}.routing_key" end @handler = options.delete(:handler) || self options[:auto_delete] ||= true topic ||= "pub/sub" @exchange = channel.topic(topic, options) @messages = [] @_listening = false self.open_connection self.listen end
Public Instance Methods
clear()
click to toggle source
# File lib/amqp-hermes/receiver.rb, line 62 def clear @messages = [] end
inspect()
click to toggle source
# File lib/amqp-hermes/receiver.rb, line 66 def inspect %Q{#<Hermes::Receiver @queue="#{@queue}" @routing_key="#{@routing_key}" @exchange="#{@exchange}" open=#{self.open?} listening=#{self.listening?}>} end
listen()
click to toggle source
# File lib/amqp-hermes/receiver.rb, line 39 def listen Thread.new(self, @handler) do |receiver, handler| receiver._listening = true receiver.channel.queue(receiver.queue).bind( receiver.exchange, :routing_key => receiver.routing_key ).subscribe(:ack => true) do |headers, payload| handler.receive(AMQP::Hermes::Message.new(headers, payload)) headers.ack end end end
listening?()
click to toggle source
# File lib/amqp-hermes/receiver.rb, line 52 def listening? @_listening == true ? true : false end
receive(message)
click to toggle source
implement the handler interface
# File lib/amqp-hermes/receiver.rb, line 57 def receive(message) return nil if !message.kind_of?(AMQP::Hermes::Message) @messages << message end