class Traffiq::AMQP

Attributes

exchange[R]

Public Class Methods

new(queue_url) click to toggle source
# File lib/traffiq/amqp.rb, line 7
def initialize(queue_url)
  @conn = Bunny.new(queue_url)
  @conn.start

  @channel = @conn.create_channel
end

Public Instance Methods

bind_queue(name, options = {}) click to toggle source

Binds a queue to the exchange and sets it's routing_key to `name`.

@param [String] name The name of the queue and routing key to use. @param [Hash] options Queue options.

For options look at Bunny's Queue options.

http://rubybunny.info/articles/queues.html
# File lib/traffiq/amqp.rb, line 59
def bind_queue(name, options = {})
  raise Traffiq::NoExchangeError.new if @exchange.nil?

  options = {
    durable: true,
    auto_delete: false,
  }.merge(options)

  @channel.queue(name, options)
          .bind(@exchange, routing_key: name)
end
close() click to toggle source

Closes connection to the Rabbit server.

# File lib/traffiq/amqp.rb, line 106
def close
  @channel.close
  @conn.close
end
connected?() click to toggle source

Checks if the connection to the Rabbit server is opened.

# File lib/traffiq/amqp.rb, line 15
def connected?
  @conn.connected? && @channel.open?
end
define_exchange(name, options = {}) click to toggle source

Defines a topic exchange.

This defines a durable topic exchange by default.

@param [String] name The name of the exchange. @param [Hash] options The options to define the exchange with.

For options, look at Bunny's exchange options.

http://rubybunny.info/articles/exchanges.html
# File lib/traffiq/amqp.rb, line 45
def define_exchange(name, options = {})
  options = {
    durable: true,
  }.merge(options)
  @exchange = @channel.topic(name, options)
end
exchanges() click to toggle source

Returns the exchanges that the channel has, except the default exchange.

# File lib/traffiq/amqp.rb, line 20
def exchanges
  @channel.exchanges
end
on_uncaught_exception(&block) click to toggle source

Sets up the block to run when an error happens.

@param [Block] block The block to run on uncaught exceptions.

# File lib/traffiq/amqp.rb, line 32
def on_uncaught_exception(&block)
  @channel.on_uncaught_exception(&block)
end
publish(routing_key, payload = {}, options = {}) click to toggle source

Publishes a message to a specific routing key.

@param [String] routing_key The routing key where you want to send the message to. @param [Hash] payload What you want the message to have. It'll be converted to JSON. @param [Hash] options Publish options

@option opts [Boolean] :bind_to_queue If you want to bind a queue just in case there are no subscribers.

# File lib/traffiq/amqp.rb, line 99
def publish(routing_key, payload = {}, options = {})
  raise Traffiq::NoExchangeError.new if @exchange.nil?
  bind_queue(routing_key) if options[:bind_to_queue]
  @exchange.publish(MultiJson.dump(payload), routing_key: routing_key, persistent: true)
end
queues() click to toggle source

Returns the queues that are binded on this connection.

# File lib/traffiq/amqp.rb, line 25
def queues
  @channel.queues
end
subscribe(routing_key, options = {}, &block) click to toggle source

Subscribes to a specific routing_key. Executes the block when there's a message routed there.

A queue will be binded with the `routing_key` name.

@param [String] routing_key Routing key to subscribe to. @param [Hash] options Subscribe options for the queue. @param [Block] &block the block you want to execute when a message arrive.

For options look at Bunny's Queue#subscribe options.

http://rubybunny.info/articles/queues.html
# File lib/traffiq/amqp.rb, line 82
def subscribe(routing_key, options = {}, &block)
  q = bind_queue(routing_key)
  options = options.merge(manual_ack: true)

  q.subscribe(options) do |delivery_info, metadata, payload|
    block.call(delivery_info, metadata, payload)
    @channel.ack(delivery_info.delivery_tag)
  end
end