class RubyRabbitmqJanus::Rabbit::Listener::Base

Base for listeners

Attributes

rabbit[RW]
responses[RW]

Public Class Methods

new(rabbit) click to toggle source

Define an publisher

@param [String] rabbit Information connection to RabbitMQ server

# File lib/rrj/rabbit/listener/base.rb, line 13
def initialize(rabbit)
  super()
  @rabbit = rabbit.channel
  subscribe_queue
end

Public Instance Methods

listen_events() { |event, response| ... } click to toggle source

Listen a queue and return a body response

# File lib/rrj/rabbit/listener/base.rb, line 20
def listen_events
  semaphore.wait
  response = nil
  lock.synchronize do
    response = responses.shift
    check(response)
  end
  yield response.event, response
end

Private Instance Methods

binding() click to toggle source
# File lib/rrj/rabbit/listener/base.rb, line 34
def binding
  @rabbit.direct('amq.direct')
end
check(response) click to toggle source
# File lib/rrj/rabbit/listener/base.rb, line 48
def check(response)
  raise Errors::Rabbit::Listener::ResponseNil, response \
    if response.nil?
  raise Errors::Rabbit::Listener::ResponseEmpty, response \
    if response.to_hash.size.zero?
end
info_subscribe(info, _prop, payload) click to toggle source
# File lib/rrj/rabbit/listener/base.rb, line 42
def info_subscribe(info, _prop, payload)
  ::Log.debug info
  ::Log.debug '[X] Message reading'
  ::Log.info payload
end
opts_subs() click to toggle source
# File lib/rrj/rabbit/listener/base.rb, line 38
def opts_subs
  { block: false, manual_ack: false, arguments: { 'x-priority': 2 } }
end
subscribe_queue() click to toggle source
# File lib/rrj/rabbit/listener/base.rb, line 55
def subscribe_queue
  rabbit.prefetch(1)
  reply.bind(binding).subscribe(opts_subs) do |info, prop, payload|
    info_subscribe(info, prop, payload)
    synchronize_response(payload)
  end
end
synchronize_response(payload) click to toggle source
# File lib/rrj/rabbit/listener/base.rb, line 63
def synchronize_response(payload)
  lock.synchronize do
    response = response_class(payload)
    responses.push(response)
  end
  semaphore.signal
end