class TDL::RemoteBroker
Public Class Methods
new(hostname, port, request_queue_name, response_queue_name, request_timeout_millis)
click to toggle source
# File lib/tdl/queue/transport/remote_broker.rb, line 7 def initialize(hostname, port, request_queue_name, response_queue_name, request_timeout_millis) @stomp_client = Stomp::Client.new('', '', hostname, port) @request_queue = "/queue/#{request_queue_name}" @response_queue = "/queue/#{response_queue_name}" @serialization_provider = JSONRPCSerializationProvider.new @timer = ThreadTimer.new(request_timeout_millis, lambda = ->() { close unless closed? }) @timer.start end
Public Instance Methods
close()
click to toggle source
# File lib/tdl/queue/transport/remote_broker.rb, line 35 def close @stomp_client.unsubscribe(@request_queue) @stomp_client.close end
closed?()
click to toggle source
# File lib/tdl/queue/transport/remote_broker.rb, line 40 def closed? @stomp_client.closed? end
join()
click to toggle source
# File lib/tdl/queue/transport/remote_broker.rb, line 31 def join @stomp_client.join end
respond_to(request, response)
click to toggle source
# File lib/tdl/queue/transport/remote_broker.rb, line 25 def respond_to(request, response) serialized_response = @serialization_provider.serialize(response) @stomp_client.publish(@response_queue, serialized_response) @stomp_client.acknowledge(request.original_message) end
subscribe(handling_strategy)
click to toggle source
# File lib/tdl/queue/transport/remote_broker.rb, line 16 def subscribe(handling_strategy) @stomp_client.subscribe(@request_queue, {:ack => 'client-individual', 'activemq.prefetchSize' => 1}) do |msg| @timer.stop request = @serialization_provider.deserialize(msg) handling_strategy.process_next_request_from(self, request) @timer.start end end