class Smith::Messaging::Foo

This class gets passed into the receive block and is a representation of both the message and the message metadata. It also handles requeues and retries. In short it's very much a convenience class which is why I have no idea what to call it!

Attributes

metadata[RW]

Public Class Methods

new(metadata, data, opts={}, requeue_opts, &blk) click to toggle source
# File lib/smith/messaging/receiver.rb, line 247
def initialize(metadata, data, opts={}, requeue_opts, &blk)
  @opts = opts
  @metadata = metadata
  @reply_queue = @opts[:reply_queue]
  @requeue_opts = requeue_opts

  @acl_type_cache = AclTypeCache.instance

  @time = Time.now

  # TODO add some better error checking/diagnostics.
  clazz = @acl_type_cache.get_by_hash(metadata.type)

  @message = clazz.new.parse_from_string(data)

  if @opts[:threading]
    EM.defer do
      blk.call(@message, self)
      ack if @opts[:auto_ack]
    end
  else
    blk.call(@message, self)
    ack if @opts[:auto_ack]
  end
end

Public Instance Methods

ack(multiple=false) click to toggle source

acknowledge the message.

# File lib/smith/messaging/receiver.rb, line 293
def ack(multiple=false)
  @metadata.ack(multiple)
end
Also aliased as: call
call(multiple=false)
Alias for: ack
correlation_id() click to toggle source

the correlation_id

# File lib/smith/messaging/receiver.rb, line 349
def correlation_id
  @metadata.correlation_id
end
fail(acl=nil, opts={:ack => true}, &blk) click to toggle source

Publish the ACL to the error queue set up for this queue. This method is only available if the :error_queue option is set to true. Note this is the receive queue name which in most cases is the same at the sender name but if you are using fanout queues it will be different.

@param [ACL] acl Optional ACL. With any options this method will fail the entire

ACL. This may not be what you want though. So this opton allows
you to fail another ACL. WARNING: there is no type checking at
the moment. If you publish an ACL that this agent can't process
and republish that ACL at a future date the agent will blow up.

@param [Hash] opts Options hash. This currently only supports on option:

:ack. If you publish a different ACL from the one received you will have to
ack that message yourself and make sure `:ack => nil`

@yieldparam [Fixnum] The number of ACLs on the error queue.

# File lib/smith/messaging/receiver.rb, line 315
def fail(acl=nil, opts={:ack => true}, &blk)
  if @opts[:error_queue]
    message = (acl) ? acl : @message
    Sender.new("#{queue_name}.error") do |queue|
      logger.debug { "Republishing ACL to error queue: \"#{queue.queue_name}\"" }
      queue.publish(message) do
        queue.number_of_messages do |count|
          @metadata.ack if opts[:ack]
          blk && blk && blk.call(count + 1)
        end
      end
    end
  else
    raise ArgumentError, "You cannot fail this queue as you haven't specified the :error_queue option"
  end
end
queue_name() click to toggle source

Return the queue_name. Note this is the receive queue name which in most cases is the same at the sender name but if you are using fanout queues it will be different.

@return [String] the name of the queue

# File lib/smith/messaging/receiver.rb, line 359
def queue_name
  @a561facf ||= begin
                  queue_name = @metadata.channel.queues.keys.detect { |q| q == @metadata.exchange }
                  if queue_name
                    @a561facf = remove_namespace(queue_name)
                  else
                    raise UnknownQueue, "Queue not found. You are probably you are using fanout queues: #{remove_namespace(@metadata.exchange)}"
                  end
                end
end
reject(opts={}) click to toggle source

reject the message. Optionally requeuing it.

# File lib/smith/messaging/receiver.rb, line 344
def reject(opts={})
  @metadata.reject(opts)
end
remove_namespace(queue_name) click to toggle source

Remove the Smith namespace prefix (the default is `smith.`)

@param [String] queue_name The name of the queue

@param [String] The name of the queue with the namespace prefix.

# File lib/smith/messaging/receiver.rb, line 376
def remove_namespace(queue_name)
  queue_name.gsub(/^#{Smith.config.smith.namespace}\./, '')
end
reply(acl=nil, &blk) click to toggle source

Send a message to the reply_to queue as specified in the message header.

# File lib/smith/messaging/receiver.rb, line 274
def reply(acl=nil, &blk)
  raise ArgumentError, "you cannot supply an ACL and a blcok." if acl && blk
  raise ArgumentError, "you must supply either an ACL or a blcok." if acl.nil? && blk.nil?

  if @metadata.reply_to
    @reply_queue.publish((blk) ? blk.call : acl, :correlation_id => @metadata.message_id)
  else
    logger.error { "Cannot reply to a message that has no reply_to: #{@metadata.exchange}." }
  end
end
requeue() click to toggle source

Requeue the current mesage on the current queue. A requeue number is added to the message header which is used to ensure the correct number of requeues.

# File lib/smith/messaging/receiver.rb, line 288
def requeue
  Requeue.new(@message, @metadata, @requeue_opts).requeue
end
to_proc() click to toggle source

Make call invoke ack. This makes the following idiom possible:

receiver('queue').subscribe do |payload, receiver|

blah(payload, &receiver)

end

which will ensure that ack is called properly.

# File lib/smith/messaging/receiver.rb, line 339
def to_proc
  proc { |obj| ack(obj) }
end