class RabbitRPC::SynchronousConnection
Connects to RabbitMQ for blocking RPC calls. Wait for a response when querying other services
Constants
- DEFAULT_HEARTBEAT
Attributes
callback_queue_name[R]
RabbitMQ related info
heartbeat[R]
RabbitMQ related info
message_id[R]
Message
unique identifier and resposne
queue_name[R]
RabbitMQ related info
rabbit_mq_url[R]
RabbitMQ related info
response[R]
Message
unique identifier and resposne
Public Class Methods
new(queue_name, callback_queue_name, rabbit_mq_url, heart_beat = nil)
click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 20 def initialize(queue_name, callback_queue_name, rabbit_mq_url, heart_beat = nil) @queue_name = queue_name @callback_queue_name = callback_queue_name @rabbit_mq_url = rabbit_mq_url @heartbeat = heart_beat || DEFAULT_HEARTBEAT @message_id = Message.generate_id end
Public Instance Methods
publish!(unpacked_message)
click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 29 def publish!(unpacked_message) connect! send_request(unpacked_message.pack) if wait_for_response?(unpacked_message) callback_queue.subscribe(block: true, ack: true) do |delivery_info, properties, payload| if message_id == properties.try(:[],:correlation_id) @channel.acknowledge(delivery_info.delivery_tag, false) @response = Message.unpack payload logger.info "Received message #{@response}" delivery_info.consumer.cancel end end return @response end return response end
Private Instance Methods
callback_queue()
click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 58 def callback_queue @channel.queue(@callback_queue_name, auto_delete: false) end
connect!()
click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 53 def connect! @connection = Bunny.new(@rabbit_mq_url, heartbeat: @heartbeat).start @channel = @connection.create_channel end
exchange()
click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 62 def exchange @exchange ||= @channel.default_exchange end
send_request(message)
click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 66 def send_request(message) exchange.publish( message, routing_key: @queue_name, message_id: @message_id, reply_to: @callback_queue_name, auto_delete: false ) end
wait_for_response?(message)
click to toggle source
# File lib/rabbit_rpc/synchronous_connection.rb, line 76 def wait_for_response?(message) !MessageParser.new(message).one_way? end