class Lagomorph::RpcCall
Public Class Methods
new(session)
click to toggle source
# File lib/lagomorph/rpc_call.rb, line 8 def initialize(session) @session = session @results = {} @mutex = Monitor.new end
Public Instance Methods
close_channel()
click to toggle source
# File lib/lagomorph/rpc_call.rb, line 34 def close_channel return unless @prepared_channel @channel.close @prepared_channel = false end
dispatch(queue_name, method, *params)
click to toggle source
# File lib/lagomorph/rpc_call.rb, line 14 def dispatch(queue_name, method, *params) @queue_name = queue_name correlation_id = calculate_correlation_id @mutex.synchronize do @results[correlation_id] = ::Queue.new end prepare_channel payload = prepare_payload(method, *params) publish_rpc_call(payload, correlation_id) response = block_till_receive_response(correlation_id) if response.key?('result') response['result'] else fail(RpcError, response.fetch('error', 'Unknown error')) end end
Private Instance Methods
block_till_receive_response(correlation_id)
click to toggle source
# File lib/lagomorph/rpc_call.rb, line 76 def block_till_receive_response(correlation_id) raw_response = @results[correlation_id].pop # blocks until can pop response = parse_response(raw_response) @results.delete(correlation_id) response end
calculate_correlation_id()
click to toggle source
# File lib/lagomorph/rpc_call.rb, line 66 def calculate_correlation_id SecureRandom.uuid end
listen_for_responses()
click to toggle source
# File lib/lagomorph/rpc_call.rb, line 70 def listen_for_responses @reply_queue.subscribe(block: false) do |metadata, payload| @results[metadata.correlation_id].push(payload) end end
parse_response(response)
click to toggle source
# File lib/lagomorph/rpc_call.rb, line 84 def parse_response(response) JsonParser.new.parse_response(response) end
prepare_channel()
click to toggle source
# File lib/lagomorph/rpc_call.rb, line 42 def prepare_channel return if @prepared_channel @channel = @session.create_channel(1) @exchange = @channel.default_exchange @reply_queue = QueueBuilder.new(@channel).reply_queue listen_for_responses @prepared_channel = true end
prepare_payload(method, *params)
click to toggle source
# File lib/lagomorph/rpc_call.rb, line 62 def prepare_payload(method, *params) JsonParser.new.build_request(method, *params) end
publish_rpc_call(request, correlation_id)
click to toggle source
# File lib/lagomorph/rpc_call.rb, line 55 def publish_rpc_call(request, correlation_id) @exchange.publish(request, routing_key: @queue_name, correlation_id: correlation_id, reply_to: @reply_queue.name ) end