class RubyRabbitmqJanus::Rabbit::Listener::Base
Base
for listeners
Attributes
rabbit[RW]
responses[RW]
Public Class Methods
new(rabbit)
click to toggle source
Define an publisher
@param [String] rabbit Information connection to RabbitMQ server
Calls superclass method
RubyRabbitmqJanus::Rabbit::BaseEvent::new
# File lib/rrj/rabbit/listener/base.rb, line 13 def initialize(rabbit) super() @rabbit = rabbit.channel subscribe_queue end
Public Instance Methods
listen_events() { |event, response| ... }
click to toggle source
Listen a queue and return a body response
# File lib/rrj/rabbit/listener/base.rb, line 20 def listen_events semaphore.wait response = nil lock.synchronize do response = responses.shift check(response) end yield response.event, response end
Private Instance Methods
binding()
click to toggle source
# File lib/rrj/rabbit/listener/base.rb, line 34 def binding @rabbit.direct('amq.direct') end
check(response)
click to toggle source
# File lib/rrj/rabbit/listener/base.rb, line 48 def check(response) raise Errors::Rabbit::Listener::ResponseNil, response \ if response.nil? raise Errors::Rabbit::Listener::ResponseEmpty, response \ if response.to_hash.size.zero? end
info_subscribe(info, _prop, payload)
click to toggle source
# File lib/rrj/rabbit/listener/base.rb, line 42 def info_subscribe(info, _prop, payload) ::Log.debug info ::Log.debug '[X] Message reading' ::Log.info payload end
opts_subs()
click to toggle source
# File lib/rrj/rabbit/listener/base.rb, line 38 def opts_subs { block: false, manual_ack: false, arguments: { 'x-priority': 2 } } end
subscribe_queue()
click to toggle source
# File lib/rrj/rabbit/listener/base.rb, line 55 def subscribe_queue rabbit.prefetch(1) reply.bind(binding).subscribe(opts_subs) do |info, prop, payload| info_subscribe(info, prop, payload) synchronize_response(payload) end end
synchronize_response(payload)
click to toggle source
# File lib/rrj/rabbit/listener/base.rb, line 63 def synchronize_response(payload) lock.synchronize do response = response_class(payload) responses.push(response) end semaphore.signal end