class Leveret::Queue
Facilitates the publishing or subscribing of messages to the message queue.
@!attribute [r] name
@return [String] Name of the queue. This will have Leveret.queue_name_prefix prepended to it when creating a corresponding queue in RabbitMQ.
@!attribute [r] queue
@return [Bunny::Queue] The backend RabbitMQ queue @see http://reference.rubybunny.info/Bunny/Queue.html Bunny::Queue Documentation
Constants
- PRIORITY_MAP
Map the symbol names for priorities to the integers that RabbitMQ requires.
Attributes
Public Class Methods
Create a new queue with the name given in the params, if no name is given it will default to {Configuration#default_queue_name}. On instantiation constructor will immedaitely connect to RabbitMQ backend and create a queue with the appropriate name, or join an existing one.
@param [String] name Name of the queue to connect to.
# File lib/leveret/queue.rb, line 25 def initialize(name = nil) @name = name || Leveret.configuration.default_queue_name @queue = connect_to_queue end
Public Instance Methods
Publish a mesage onto the queue. Fire and forget, this method is non-blocking and will not wait until the message is definitely on the queue.
@param [Hash] payload The data we wish to send onto the queue, this will be serialized and automatically
deserialized when received by a {#subscribe} block.
@option options [Symbol] :priority (:normal) The priority this message should be treated with on the queue
see {PRIORITY_MAP} for available options.
@return [void]
# File lib/leveret/queue.rb, line 39 def publish(payload, options = {}) priority_id = PRIORITY_MAP[options[:priority]] || PRIORITY_MAP[:normal] payload = serialize_payload(payload) log.debug "Publishing #{payload.inspect} for queue #{name} (Priority: #{priority_id})" exchange.publish(payload, persistent: true, routing_key: name, priority: priority_id) end
Subscribe to this queue and yield a block for every message received. This method does not block, receiving and dispatching of messages will be handled in a separate thread.
The receiving block is responsible for acknowledging or rejecting the message. This must be done using the same channel the message was received # on, {#Leveret.channel}. {Worker#ack_message} provides an example implementation of this acknowledgement.
@note The receiving block is responsible for acking/rejecting the message. Please see the note for more details.
@yieldparam incoming [Message] Delivery info, properties and the params wrapped up into a convenient object
@return [void]
# File lib/leveret/queue.rb, line 59 def subscribe log.info "Subscribing to #{name}" queue.subscribe(manual_ack: true) do |delivery_info, properties, msg| log.debug "Received #{msg} from #{name}" incoming = Leveret::Message.new(delivery_info, properties, deserialize_payload(msg)) yield incoming end end
Private Instance Methods
Create or return a representation of the queue on the RabbitMQ backend
@return [Bunny::Queue] RabbitMQ queue
# File lib/leveret/queue.rb, line 91 def connect_to_queue queue = channel.queue(mq_name, durable: true, auto_delete: false, arguments: { 'x-max-priority' => 2 }) queue.bind(exchange, routing_key: name) log.debug "Connected to #{mq_name}, bound on #{name}" queue end
Convert a set of serialized parameters into a {Parameters} object
@param [String] JSON representation of the parameters
@return [Parameters] Useful object representation of the parameters
# File lib/leveret/queue.rb, line 84 def deserialize_payload(json) Leveret::Parameters.from_json(json) end
Calculate the name of the queue that should be used on the RabbitMQ backend
@return [String] Backend queue name
# File lib/leveret/queue.rb, line 101 def mq_name @mq_name ||= [Leveret.configuration.queue_name_prefix, name].join('_') end
Convert a set of parameters passed into a serialized form suitable for transport on the message queue
@param [Hash] Paramets to be serialized
@return [String] Encoded params ready to be sent onto the queue
# File lib/leveret/queue.rb, line 75 def serialize_payload(params) Leveret::Parameters.new(params).serialize end