class RackRabbit::Client

Attributes

rabbit[R]

Public Class Methods

define_class_method_for(method_name) click to toggle source
# File lib/rack-rabbit/client.rb, line 159
def self.define_class_method_for(method_name)
  define_singleton_method(method_name) do |*params|
    options  = params.last.is_a?(Hash) ? params.pop : {}
    client   = Client.new(options.delete(:rabbit))
    response = client.send(method_name, *params, options)
    client.disconnect
    response
  end
end
new(options = nil) click to toggle source
# File lib/rack-rabbit/client.rb, line 15
def initialize(options = nil)
  @rabbit = Adapter.load(DEFAULT_RABBIT.merge(options || {}))
  connect
end

Public Instance Methods

connect() click to toggle source
# File lib/rack-rabbit/client.rb, line 22
def connect
  rabbit.connect
end
default_content_encoding() click to toggle source
# File lib/rack-rabbit/client.rb, line 149
def default_content_encoding
  'utf-8'
end
default_content_type() click to toggle source
# File lib/rack-rabbit/client.rb, line 145
def default_content_type
  'text/plain'
end
default_timestamp() click to toggle source
# File lib/rack-rabbit/client.rb, line 153
def default_timestamp
  Time.now.to_i
end
delete(queue, path, options = {}) click to toggle source
# File lib/rack-rabbit/client.rb, line 44
def delete(queue, path, options = {})
  request(queue, path, "", options.merge(:method => :DELETE))
end
disconnect() click to toggle source
# File lib/rack-rabbit/client.rb, line 26
def disconnect
  rabbit.disconnect
end
enqueue(queue, path, body, options = {}) click to toggle source
# File lib/rack-rabbit/client.rb, line 98
def enqueue(queue, path, body, options = {})

  method  = options[:method]  || :POST
  headers = options[:headers] || {}

  rabbit.publish(body,
    :routing_key      => queue,
    :priority         => options[:priority],
    :content_type     => options[:content_type]     || default_content_type,
    :content_encoding => options[:content_encoding] || default_content_encoding,
    :timestamp        => options[:timestamp]        || default_timestamp,
    :headers          => headers.merge({
      RackRabbit::HEADER::METHOD => method.to_s.upcase,
      RackRabbit::HEADER::PATH   => path
    })
  )

  true

end
get(queue, path, options = {}) click to toggle source
# File lib/rack-rabbit/client.rb, line 32
def get(queue, path, options = {})
  request(queue, path, "", options.merge(:method => :GET))
end
post(queue, path, body, options = {}) click to toggle source
# File lib/rack-rabbit/client.rb, line 36
def post(queue, path, body, options = {})
  request(queue, path, body, options.merge(:method => :POST))
end
publish(exchange, path, body, options = {}) click to toggle source
# File lib/rack-rabbit/client.rb, line 121
def publish(exchange, path, body, options = {})

  method  = options[:method]  || :POST
  headers = options[:headers] || {}

  rabbit.publish(body,
    :exchange         => exchange,
    :exchange_type    => options[:exchange_type] || options[:type] || :fanout,
    :routing_key      => options[:routing_key]   || options[:route],
    :priority         => options[:priority],
    :content_type     => options[:content_type]     || default_content_type,
    :content_encoding => options[:content_encoding] || default_content_encoding,
    :timestamp        => options[:timestamp]        || default_timestamp,
    :headers          => headers.merge({
      RackRabbit::HEADER::METHOD => method.to_s.upcase,
      RackRabbit::HEADER::PATH   => path
    })
  )

  true

end
put(queue, path, body, options = {}) click to toggle source
# File lib/rack-rabbit/client.rb, line 40
def put(queue, path, body, options = {})
  request(queue, path, body, options.merge(:method => :PUT))
end
request(queue, path, body, options = {}) click to toggle source
# File lib/rack-rabbit/client.rb, line 50
def request(queue, path, body, options = {})

  id        = options[:id] || SecureRandom.uuid    # allow dependency injection for test purposes
  lock      = Mutex.new
  condition = ConditionVariable.new
  method    = options[:method]  || :GET
  headers   = options[:headers] || {}
  response  = nil
  timeout   = options[:timeout]

  rabbit.with_reply_queue do |reply_queue|

    rabbit.subscribe(:queue => reply_queue) do |message|
      if message.correlation_id == id
        lock.synchronize do
          response = Response.new(message.status, message.headers, message.body)
          reply_queue.delete(if_empty: true)
          condition.signal
        end
      end
    end

    rabbit.publish(body,
      :correlation_id   => id,
      :routing_key      => queue,
      :reply_to         => reply_queue.name,
      :priority         => options[:priority],
      :content_type     => options[:content_type]     || default_content_type,
      :content_encoding => options[:content_encoding] || default_content_encoding,
      :timestamp        => options[:timestamp]        || default_timestamp,
      :headers          => headers.merge({
        RackRabbit::HEADER::METHOD => method.to_s.upcase,
        RackRabbit::HEADER::PATH   => path
      })
    )

  end

  lock.synchronize do
    condition.wait(lock, timeout) unless response
  end

  response

end