class Wire::RabbitMQTransport

Public Class Methods

new(host, port) click to toggle source
# File lib/wire.rb, line 103
def initialize (host, port)
  @connection = Bunny.new("amqp://#{host}:#{port}")
  @connection.start

  @semaphore = Mutex.new

  @uuid = UUID.new
  @caller_id = @uuid.generate

  @topic_hash = {}
  @message_hash = {}
end

Public Instance Methods

initialize_service(service_name) click to toggle source
# File lib/wire.rb, line 116
def initialize_service (service_name)
  @semaphore.synchronize do
    topic = @topic_hash[service_name]

    if topic == nil
      request_channel = @connection.create_channel
      topic = request_channel.topic("#{service_name}.request")

      response_queue_name = @uuid.generate
      response_channel = @connection.create_channel
      response_exchange = response_channel.topic("#{service_name}.response")
      response_queue = response_channel.queue("#{response_queue_name}", {:durable => false, :exclusive => true, :auto_delete => true})

      response_queue.bind(response_exchange, :routing_key => "#{@caller_id}").subscribe do |delivery_info, metadata, payload|
        if (message_callback = @message_hash[metadata[:correlation_id]]) != nil
          result_body = JSON.parse(payload)
          message_callback.set_result(result_body)
        end
      end
    end

    return topic
  end
end
transmit(service_name, version, invocation_signal, timeout_seconds) click to toggle source
# File lib/wire.rb, line 145
def transmit(service_name, version, invocation_signal, timeout_seconds)
  message_id = "#{@uuid.generate}".freeze
  @message_hash[message_id] = MessageCallback.new(@message_hash, message_id, message = Message.new, timeout_seconds)
  initialize_service(service_name).publish(invocation_signal.to_json, :routing_key => "#{version}.2", :headers => {'x-opt-callerId' => "#{@caller_id}"}, :content_type => 'application/json', :message_id => "#{message_id}", :timestamp => Time.new.to_i)

  message
end
transmit_in_only(service_name, version, invocation_signal) click to toggle source
# File lib/wire.rb, line 141
def transmit_in_only(service_name, version, invocation_signal)
  initialize_service(service_name).publish(invocation_signal.to_json, :routing_key => "#{version}.2", :headers => {'x-opt-callerId' => "#{@caller_id}"}, :content_type => 'application/json', :message_id => "#{@uuid.generate}", :timestamp => Time.new.to_i)
end