class TDL::RemoteBroker

Public Class Methods

new(hostname, port, request_queue_name, response_queue_name, request_timeout_millis) click to toggle source
# File lib/tdl/queue/transport/remote_broker.rb, line 7
def initialize(hostname, port, request_queue_name, response_queue_name, request_timeout_millis)
  @stomp_client = Stomp::Client.new('', '', hostname, port)
  @request_queue = "/queue/#{request_queue_name}"
  @response_queue = "/queue/#{response_queue_name}"
  @serialization_provider = JSONRPCSerializationProvider.new
  @timer = ThreadTimer.new(request_timeout_millis, lambda = ->() { close unless closed? })
  @timer.start
end

Public Instance Methods

close() click to toggle source
# File lib/tdl/queue/transport/remote_broker.rb, line 35
def close
  @stomp_client.unsubscribe(@request_queue)
  @stomp_client.close
end
closed?() click to toggle source
# File lib/tdl/queue/transport/remote_broker.rb, line 40
def closed?
  @stomp_client.closed?
end
join() click to toggle source
# File lib/tdl/queue/transport/remote_broker.rb, line 31
def join
  @stomp_client.join
end
respond_to(request, response) click to toggle source
# File lib/tdl/queue/transport/remote_broker.rb, line 25
def respond_to(request, response)
  serialized_response = @serialization_provider.serialize(response)
  @stomp_client.publish(@response_queue, serialized_response)
  @stomp_client.acknowledge(request.original_message)
end
subscribe(handling_strategy) click to toggle source
# File lib/tdl/queue/transport/remote_broker.rb, line 16
def subscribe(handling_strategy)
  @stomp_client.subscribe(@request_queue, {:ack => 'client-individual', 'activemq.prefetchSize' => 1}) do |msg|
    @timer.stop
    request = @serialization_provider.deserialize(msg)
    handling_strategy.process_next_request_from(self, request)
    @timer.start
  end
end