class RackRabbit::Client
Attributes
rabbit[R]
Public Class Methods
define_class_method_for(method_name)
click to toggle source
# File lib/rack-rabbit/client.rb, line 159 def self.define_class_method_for(method_name) define_singleton_method(method_name) do |*params| options = params.last.is_a?(Hash) ? params.pop : {} client = Client.new(options.delete(:rabbit)) response = client.send(method_name, *params, options) client.disconnect response end end
new(options = nil)
click to toggle source
# File lib/rack-rabbit/client.rb, line 15 def initialize(options = nil) @rabbit = Adapter.load(DEFAULT_RABBIT.merge(options || {})) connect end
Public Instance Methods
connect()
click to toggle source
# File lib/rack-rabbit/client.rb, line 22 def connect rabbit.connect end
default_content_encoding()
click to toggle source
# File lib/rack-rabbit/client.rb, line 149 def default_content_encoding 'utf-8' end
default_content_type()
click to toggle source
# File lib/rack-rabbit/client.rb, line 145 def default_content_type 'text/plain' end
default_timestamp()
click to toggle source
# File lib/rack-rabbit/client.rb, line 153 def default_timestamp Time.now.to_i end
delete(queue, path, options = {})
click to toggle source
# File lib/rack-rabbit/client.rb, line 44 def delete(queue, path, options = {}) request(queue, path, "", options.merge(:method => :DELETE)) end
disconnect()
click to toggle source
# File lib/rack-rabbit/client.rb, line 26 def disconnect rabbit.disconnect end
enqueue(queue, path, body, options = {})
click to toggle source
# File lib/rack-rabbit/client.rb, line 98 def enqueue(queue, path, body, options = {}) method = options[:method] || :POST headers = options[:headers] || {} rabbit.publish(body, :routing_key => queue, :priority => options[:priority], :content_type => options[:content_type] || default_content_type, :content_encoding => options[:content_encoding] || default_content_encoding, :timestamp => options[:timestamp] || default_timestamp, :headers => headers.merge({ RackRabbit::HEADER::METHOD => method.to_s.upcase, RackRabbit::HEADER::PATH => path }) ) true end
get(queue, path, options = {})
click to toggle source
# File lib/rack-rabbit/client.rb, line 32 def get(queue, path, options = {}) request(queue, path, "", options.merge(:method => :GET)) end
post(queue, path, body, options = {})
click to toggle source
# File lib/rack-rabbit/client.rb, line 36 def post(queue, path, body, options = {}) request(queue, path, body, options.merge(:method => :POST)) end
publish(exchange, path, body, options = {})
click to toggle source
# File lib/rack-rabbit/client.rb, line 121 def publish(exchange, path, body, options = {}) method = options[:method] || :POST headers = options[:headers] || {} rabbit.publish(body, :exchange => exchange, :exchange_type => options[:exchange_type] || options[:type] || :fanout, :routing_key => options[:routing_key] || options[:route], :priority => options[:priority], :content_type => options[:content_type] || default_content_type, :content_encoding => options[:content_encoding] || default_content_encoding, :timestamp => options[:timestamp] || default_timestamp, :headers => headers.merge({ RackRabbit::HEADER::METHOD => method.to_s.upcase, RackRabbit::HEADER::PATH => path }) ) true end
put(queue, path, body, options = {})
click to toggle source
# File lib/rack-rabbit/client.rb, line 40 def put(queue, path, body, options = {}) request(queue, path, body, options.merge(:method => :PUT)) end
request(queue, path, body, options = {})
click to toggle source
# File lib/rack-rabbit/client.rb, line 50 def request(queue, path, body, options = {}) id = options[:id] || SecureRandom.uuid # allow dependency injection for test purposes lock = Mutex.new condition = ConditionVariable.new method = options[:method] || :GET headers = options[:headers] || {} response = nil timeout = options[:timeout] rabbit.with_reply_queue do |reply_queue| rabbit.subscribe(:queue => reply_queue) do |message| if message.correlation_id == id lock.synchronize do response = Response.new(message.status, message.headers, message.body) reply_queue.delete(if_empty: true) condition.signal end end end rabbit.publish(body, :correlation_id => id, :routing_key => queue, :reply_to => reply_queue.name, :priority => options[:priority], :content_type => options[:content_type] || default_content_type, :content_encoding => options[:content_encoding] || default_content_encoding, :timestamp => options[:timestamp] || default_timestamp, :headers => headers.merge({ RackRabbit::HEADER::METHOD => method.to_s.upcase, RackRabbit::HEADER::PATH => path }) ) end lock.synchronize do condition.wait(lock, timeout) unless response end response end