class Babs::Client
Attributes
call_id[RW]
condition[R]
lock[R]
reply_queue[R]
response[RW]
route[RW]
Public Class Methods
new(options = {})
click to toggle source
# File lib/babs/client.rb, line 13 def initialize(options = {}) # make this connection part a singleton and a LOT of time is saved, as well as reusing the same connection @conn = Bunny.new(:automatically_recover => false) @conn.start @ch = @conn.create_channel @defaults = Hashie::Mash.new({ server_queue: nil, exchange: @ch.default_exchange }.merge(options) ) @lock = Mutex.new @condition = ConditionVariable.new end
Public Instance Methods
close_connection()
click to toggle source
# File lib/babs/client.rb, line 30 def close_connection @ch.close @conn.close end
listen_for_response()
click to toggle source
# File lib/babs/client.rb, line 35 def listen_for_response # listen on a new queue for this response @reply_queue = @ch.queue("", :exclusive => true) @reply_queue.subscribe do |delivery_info, properties, payload| puts "response_id #{properties[:correlation_id]}" puts properties[:correlation_id] == self.call_id ? "correct id" : "BAD id" if properties[:correlation_id] == self.call_id self.response = payload self.lock.synchronize{self.condition.signal} end end end
request(options = {})
click to toggle source
# File lib/babs/client.rb, line 65 def request(options = {}) options = Hashie::Mash.new(options) # grab out the expected data method = options.delete(:method) params = options.delete(:params) # merge the connection options with the defaults routing_options = @defaults.merge(options) response = listen_for_response response = send_request(routing_options, method, params) # parse and return response Hashie::Mash.new(JSON.parse(response)) end
send_request(routing_options, method, params)
click to toggle source
# File lib/babs/client.rb, line 48 def send_request(routing_options, method, params) self.call_id = SecureRandom.uuid data_string = {method: method, params: params}.to_json routing_options.exchange.publish( data_string, routing_key: routing_options.server_queue, correlation_id: call_id, reply_to: @reply_queue.name) puts "call id #{call_id}" self.response = nil # params to synchronize are mutex, timeout_in_seconds lock.synchronize{condition.wait(lock, 5)} response end