class Wire::RabbitMQTransport
Public Class Methods
new(host, port)
click to toggle source
# File lib/wire.rb, line 103 def initialize (host, port) @connection = Bunny.new("amqp://#{host}:#{port}") @connection.start @semaphore = Mutex.new @uuid = UUID.new @caller_id = @uuid.generate @topic_hash = {} @message_hash = {} end
Public Instance Methods
initialize_service(service_name)
click to toggle source
# File lib/wire.rb, line 116 def initialize_service (service_name) @semaphore.synchronize do topic = @topic_hash[service_name] if topic == nil request_channel = @connection.create_channel topic = request_channel.topic("#{service_name}.request") response_queue_name = @uuid.generate response_channel = @connection.create_channel response_exchange = response_channel.topic("#{service_name}.response") response_queue = response_channel.queue("#{response_queue_name}", {:durable => false, :exclusive => true, :auto_delete => true}) response_queue.bind(response_exchange, :routing_key => "#{@caller_id}").subscribe do |delivery_info, metadata, payload| if (message_callback = @message_hash[metadata[:correlation_id]]) != nil result_body = JSON.parse(payload) message_callback.set_result(result_body) end end end return topic end end
transmit(service_name, version, invocation_signal, timeout_seconds)
click to toggle source
# File lib/wire.rb, line 145 def transmit(service_name, version, invocation_signal, timeout_seconds) message_id = "#{@uuid.generate}".freeze @message_hash[message_id] = MessageCallback.new(@message_hash, message_id, message = Message.new, timeout_seconds) initialize_service(service_name).publish(invocation_signal.to_json, :routing_key => "#{version}.2", :headers => {'x-opt-callerId' => "#{@caller_id}"}, :content_type => 'application/json', :message_id => "#{message_id}", :timestamp => Time.new.to_i) message end
transmit_in_only(service_name, version, invocation_signal)
click to toggle source
# File lib/wire.rb, line 141 def transmit_in_only(service_name, version, invocation_signal) initialize_service(service_name).publish(invocation_signal.to_json, :routing_key => "#{version}.2", :headers => {'x-opt-callerId' => "#{@caller_id}"}, :content_type => 'application/json', :message_id => "#{@uuid.generate}", :timestamp => Time.new.to_i) end