class Lagomorph::RpcCall

Public Class Methods

new(session) click to toggle source
# File lib/lagomorph/rpc_call.rb, line 8
def initialize(session)
  @session = session
  @results = {}
  @mutex = Monitor.new
end

Public Instance Methods

close_channel() click to toggle source
# File lib/lagomorph/rpc_call.rb, line 34
def close_channel
  return unless @prepared_channel
  @channel.close
  @prepared_channel = false
end
dispatch(queue_name, method, *params) click to toggle source
# File lib/lagomorph/rpc_call.rb, line 14
def dispatch(queue_name, method, *params)
  @queue_name = queue_name

  correlation_id = calculate_correlation_id
  @mutex.synchronize do
    @results[correlation_id] = ::Queue.new
  end

  prepare_channel
  payload = prepare_payload(method, *params)
  publish_rpc_call(payload, correlation_id)
  response = block_till_receive_response(correlation_id)

  if response.key?('result')
    response['result']
  else
    fail(RpcError, response.fetch('error', 'Unknown error'))
  end
end

Private Instance Methods

block_till_receive_response(correlation_id) click to toggle source
# File lib/lagomorph/rpc_call.rb, line 76
def block_till_receive_response(correlation_id)
  raw_response = @results[correlation_id].pop # blocks until can pop
  response = parse_response(raw_response)
  @results.delete(correlation_id)

  response
end
calculate_correlation_id() click to toggle source
# File lib/lagomorph/rpc_call.rb, line 66
def calculate_correlation_id
  SecureRandom.uuid
end
listen_for_responses() click to toggle source
# File lib/lagomorph/rpc_call.rb, line 70
def listen_for_responses
  @reply_queue.subscribe(block: false) do |metadata, payload|
    @results[metadata.correlation_id].push(payload)
  end
end
parse_response(response) click to toggle source
# File lib/lagomorph/rpc_call.rb, line 84
def parse_response(response)
  JsonParser.new.parse_response(response)
end
prepare_channel() click to toggle source
# File lib/lagomorph/rpc_call.rb, line 42
def prepare_channel
  return if @prepared_channel

  @channel = @session.create_channel(1)

  @exchange    = @channel.default_exchange
  @reply_queue = QueueBuilder.new(@channel).reply_queue

  listen_for_responses

  @prepared_channel = true
end
prepare_payload(method, *params) click to toggle source
# File lib/lagomorph/rpc_call.rb, line 62
def prepare_payload(method, *params)
  JsonParser.new.build_request(method, *params)
end
publish_rpc_call(request, correlation_id) click to toggle source
# File lib/lagomorph/rpc_call.rb, line 55
def publish_rpc_call(request, correlation_id)
  @exchange.publish(request, routing_key:    @queue_name,
                             correlation_id: correlation_id,
                             reply_to:       @reply_queue.name
  )
end