class Leveret::Queue

Facilitates the publishing or subscribing of messages to the message queue.

@!attribute [r] name

@return [String] Name of the queue. This will have Leveret.queue_name_prefix prepended to it when creating a
  corresponding queue in RabbitMQ.

@!attribute [r] queue

@return [Bunny::Queue] The backend RabbitMQ queue
@see http://reference.rubybunny.info/Bunny/Queue.html Bunny::Queue Documentation

Constants

PRIORITY_MAP

Map the symbol names for priorities to the integers that RabbitMQ requires.

Attributes

name[R]
queue[R]

Public Class Methods

new(name = nil) click to toggle source

Create a new queue with the name given in the params, if no name is given it will default to {Configuration#default_queue_name}. On instantiation constructor will immedaitely connect to RabbitMQ backend and create a queue with the appropriate name, or join an existing one.

@param [String] name Name of the queue to connect to.

# File lib/leveret/queue.rb, line 25
def initialize(name = nil)
  @name = name || Leveret.configuration.default_queue_name
  @queue = connect_to_queue
end

Public Instance Methods

publish(payload, options = {}) click to toggle source

Publish a mesage onto the queue. Fire and forget, this method is non-blocking and will not wait until the message is definitely on the queue.

@param [Hash] payload The data we wish to send onto the queue, this will be serialized and automatically

deserialized when received by a {#subscribe} block.

@option options [Symbol] :priority (:normal) The priority this message should be treated with on the queue

see {PRIORITY_MAP} for available options.

@return [void]

# File lib/leveret/queue.rb, line 39
def publish(payload, options = {})
  priority_id = PRIORITY_MAP[options[:priority]] || PRIORITY_MAP[:normal]
  payload = serialize_payload(payload)

  log.debug "Publishing #{payload.inspect} for queue #{name} (Priority: #{priority_id})"
  exchange.publish(payload, persistent: true, routing_key: name, priority: priority_id)
end
subscribe() { |incoming| ... } click to toggle source

Subscribe to this queue and yield a block for every message received. This method does not block, receiving and dispatching of messages will be handled in a separate thread.

The receiving block is responsible for acknowledging or rejecting the message. This must be done using the same channel the message was received # on, {#Leveret.channel}. {Worker#ack_message} provides an example implementation of this acknowledgement.

@note The receiving block is responsible for acking/rejecting the message. Please see the note for more details.

@yieldparam incoming [Message] Delivery info, properties and the params wrapped up into a convenient object

@return [void]

# File lib/leveret/queue.rb, line 59
def subscribe
  log.info "Subscribing to #{name}"
  queue.subscribe(manual_ack: true) do |delivery_info, properties, msg|
    log.debug "Received #{msg} from #{name}"
    incoming = Leveret::Message.new(delivery_info, properties, deserialize_payload(msg))
    yield incoming
  end
end

Private Instance Methods

connect_to_queue() click to toggle source

Create or return a representation of the queue on the RabbitMQ backend

@return [Bunny::Queue] RabbitMQ queue

# File lib/leveret/queue.rb, line 91
def connect_to_queue
  queue = channel.queue(mq_name, durable: true, auto_delete: false, arguments: { 'x-max-priority' => 2 })
  queue.bind(exchange, routing_key: name)
  log.debug "Connected to #{mq_name}, bound on #{name}"
  queue
end
deserialize_payload(json) click to toggle source

Convert a set of serialized parameters into a {Parameters} object

@param [String] JSON representation of the parameters

@return [Parameters] Useful object representation of the parameters

# File lib/leveret/queue.rb, line 84
def deserialize_payload(json)
  Leveret::Parameters.from_json(json)
end
mq_name() click to toggle source

Calculate the name of the queue that should be used on the RabbitMQ backend

@return [String] Backend queue name

# File lib/leveret/queue.rb, line 101
def mq_name
  @mq_name ||= [Leveret.configuration.queue_name_prefix, name].join('_')
end
serialize_payload(params) click to toggle source

Convert a set of parameters passed into a serialized form suitable for transport on the message queue

@param [Hash] Paramets to be serialized

@return [String] Encoded params ready to be sent onto the queue

# File lib/leveret/queue.rb, line 75
def serialize_payload(params)
  Leveret::Parameters.new(params).serialize
end