class RabbitRPC::SynchronousConnection

Connects to RabbitMQ for blocking RPC calls. Wait for a response when querying other services

Constants

DEFAULT_HEARTBEAT

Attributes

callback_queue_name[R]

RabbitMQ related info

heartbeat[R]

RabbitMQ related info

message_id[R]

Message unique identifier and resposne

queue_name[R]

RabbitMQ related info

rabbit_mq_url[R]

RabbitMQ related info

response[R]

Message unique identifier and resposne

Public Class Methods

new(queue_name, callback_queue_name, rabbit_mq_url, heart_beat = nil) click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 20
def initialize(queue_name, callback_queue_name, rabbit_mq_url, heart_beat = nil)
  @queue_name          = queue_name
  @callback_queue_name = callback_queue_name
  @rabbit_mq_url       = rabbit_mq_url
  @heartbeat           = heart_beat || DEFAULT_HEARTBEAT

  @message_id          = Message.generate_id
end

Public Instance Methods

publish!(unpacked_message) click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 29
def publish!(unpacked_message)
  connect!

  send_request(unpacked_message.pack)

  if wait_for_response?(unpacked_message)
    callback_queue.subscribe(block: true, ack: true) do |delivery_info, properties, payload|
      if message_id == properties.try(:[],:correlation_id)
        @channel.acknowledge(delivery_info.delivery_tag, false)
        @response = Message.unpack payload

        logger.info "Received message #{@response}"
        delivery_info.consumer.cancel
      end
    end

    return @response
  end

  return response
end

Private Instance Methods

callback_queue() click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 58
def callback_queue
  @channel.queue(@callback_queue_name, auto_delete: false)
end
connect!() click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 53
def connect!
  @connection = Bunny.new(@rabbit_mq_url, heartbeat: @heartbeat).start
  @channel    = @connection.create_channel
end
exchange() click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 62
def exchange
  @exchange ||= @channel.default_exchange
end
send_request(message) click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 66
def send_request(message)
  exchange.publish(
    message,
    routing_key: @queue_name,
    message_id:  @message_id,
    reply_to:    @callback_queue_name,
    auto_delete: false
  )
end
wait_for_response?(message) click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 76
def wait_for_response?(message)
  !MessageParser.new(message).one_way?
end