class Smith::Messaging::Foo
This class gets passed into the receive block and is a representation of both the message and the message metadata. It also handles requeues and retries. In short it's very much a convenience class which is why I have no idea what to call it!
Attributes
Public Class Methods
# File lib/smith/messaging/receiver.rb, line 247 def initialize(metadata, data, opts={}, requeue_opts, &blk) @opts = opts @metadata = metadata @reply_queue = @opts[:reply_queue] @requeue_opts = requeue_opts @acl_type_cache = AclTypeCache.instance @time = Time.now # TODO add some better error checking/diagnostics. clazz = @acl_type_cache.get_by_hash(metadata.type) @message = clazz.new.parse_from_string(data) if @opts[:threading] EM.defer do blk.call(@message, self) ack if @opts[:auto_ack] end else blk.call(@message, self) ack if @opts[:auto_ack] end end
Public Instance Methods
acknowledge the message.
# File lib/smith/messaging/receiver.rb, line 293 def ack(multiple=false) @metadata.ack(multiple) end
the correlation_id
# File lib/smith/messaging/receiver.rb, line 349 def correlation_id @metadata.correlation_id end
Publish the ACL
to the error queue set up for this queue. This method is only available if the :error_queue option is set to true. Note this is the receive queue name which in most cases is the same at the sender name but if you are using fanout queues it will be different.
@param [ACL] acl Optional ACL
. With any options this method will fail the entire
ACL. This may not be what you want though. So this opton allows you to fail another ACL. WARNING: there is no type checking at the moment. If you publish an ACL that this agent can't process and republish that ACL at a future date the agent will blow up.
@param [Hash] opts Options hash. This currently only supports on option:
:ack. If you publish a different ACL from the one received you will have to ack that message yourself and make sure `:ack => nil`
@yieldparam [Fixnum] The number of ACLs on the error queue.
# File lib/smith/messaging/receiver.rb, line 315 def fail(acl=nil, opts={:ack => true}, &blk) if @opts[:error_queue] message = (acl) ? acl : @message Sender.new("#{queue_name}.error") do |queue| logger.debug { "Republishing ACL to error queue: \"#{queue.queue_name}\"" } queue.publish(message) do queue.number_of_messages do |count| @metadata.ack if opts[:ack] blk && blk && blk.call(count + 1) end end end else raise ArgumentError, "You cannot fail this queue as you haven't specified the :error_queue option" end end
Return the queue_name. Note this is the receive queue name which in most cases is the same at the sender name but if you are using fanout queues it will be different.
@return [String] the name of the queue
# File lib/smith/messaging/receiver.rb, line 359 def queue_name @a561facf ||= begin queue_name = @metadata.channel.queues.keys.detect { |q| q == @metadata.exchange } if queue_name @a561facf = remove_namespace(queue_name) else raise UnknownQueue, "Queue not found. You are probably you are using fanout queues: #{remove_namespace(@metadata.exchange)}" end end end
reject the message. Optionally requeuing it.
# File lib/smith/messaging/receiver.rb, line 344 def reject(opts={}) @metadata.reject(opts) end
Remove the Smith
namespace prefix (the default is `smith.`)
@param [String] queue_name
The name of the queue
@param [String] The name of the queue with the namespace prefix.
# File lib/smith/messaging/receiver.rb, line 376 def remove_namespace(queue_name) queue_name.gsub(/^#{Smith.config.smith.namespace}\./, '') end
Send a message to the reply_to queue as specified in the message header.
# File lib/smith/messaging/receiver.rb, line 274 def reply(acl=nil, &blk) raise ArgumentError, "you cannot supply an ACL and a blcok." if acl && blk raise ArgumentError, "you must supply either an ACL or a blcok." if acl.nil? && blk.nil? if @metadata.reply_to @reply_queue.publish((blk) ? blk.call : acl, :correlation_id => @metadata.message_id) else logger.error { "Cannot reply to a message that has no reply_to: #{@metadata.exchange}." } end end
Requeue
the current mesage on the current queue. A requeue number is added to the message header which is used to ensure the correct number of requeues.
# File lib/smith/messaging/receiver.rb, line 288 def requeue Requeue.new(@message, @metadata, @requeue_opts).requeue end