class Smith::Messaging::Sender

Public Class Methods

new(queue_def, opts={}, &blk) click to toggle source
# File lib/smith/messaging/sender.rb, line 9
def initialize(queue_def, opts={}, &blk)

  # This is for backward compatibility.
  @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts)

  @acl_type_cache = AclTypeCache.instance

  @reply_container = {}

  prefetch = option_or_default(@queue_def.options, :prefetch, Smith.config.agent.prefetch)

  @options = AmqpOptions.new(@queue_def.options)
  @options.routing_key = @queue_def.normalise

  @message_counts = Hash.new(0)

  @exchange_completion = EM::Completion.new
  @queue_completion = EM::Completion.new
  @channel_completion = EM::Completion.new

  open_channel(:prefetch => prefetch) do |channel|
    @channel_completion.succeed(channel)
    exchange_type = (opts[:fanout]) ? :fanout : :direct

    channel.send(exchange_type, @queue_def.normalise, @options.exchange) do |exchange|

      exchange.on_return do |basic_return, metadata, payload|
        logger.error {
          acl_type = @acl_type_cache.get_by_hash(metadata.properties[:type]).to_s rescue "Unknown ACL type"
          "Cannot deliver message type: #{acl_type}, from exchange: \"#{basic_return.exchange}\", using routing key: \"#{basic_return.routing_key}\""
        }
      end

      if opts[:fanout]
        @exchange_completion.succeed(exchange)
      else
        channel.queue(@queue_def.normalise, @options.queue) do |queue|
          queue.bind(exchange, :routing_key => @queue_def.normalise)

          @queue_completion.succeed(queue)
          @exchange_completion.succeed(exchange)
        end
      end
    end
  end

  blk.call(self) if blk
end

Public Instance Methods

consumer_count(&blk) click to toggle source
# File lib/smith/messaging/sender.rb, line 166
def consumer_count(&blk)
  status do |_, consumers|
    blk.call(consumers)
  end
end
counter() click to toggle source
# File lib/smith/messaging/sender.rb, line 172
def counter
  @message_counts[@queue_def.denormalise]
end
delete(&blk) click to toggle source
# File lib/smith/messaging/sender.rb, line 133
def delete(&blk)
  @queue_completion.completion do |queue|
    queue.delete do
      @exchange_completion.completion do |exchange|
        exchange.delete do
          @channel_completion.completion do |channel|
            channel.close(&blk)
          end
        end
      end
    end
  end
end
message_count(&blk) click to toggle source
# File lib/smith/messaging/sender.rb, line 160
def message_count(&blk)
  status do |messages|
    blk.call(messages)
  end
end
on_error(chain=false, &blk) click to toggle source

Define a channel error handler.

# File lib/smith/messaging/sender.rb, line 177
def on_error(chain=false, &blk)
  @channel_completion.completion do |channel|
    channel.on_error(&blk)
  end
end
on_reply(opts={}, &blk) click to toggle source

Set up a listener that will receive replies from the published messages. You must publish with intent to reply – tee he.

If you pass in a queue_name the same queue name will get used for every reply. This means that there are no create and teardown costs for each message. If no queue_name is given a random one will be assigned.

# File lib/smith/messaging/sender.rb, line 102
def on_reply(opts={}, &blk)
  @reply_proc = blk

  @timeout ||= Timeout.new(Smith.config.smith.timeout, :queue_name => @queue_def.denormalise)

  reply_queue = opts.clone.delete(:reply_queue_name) { random("#{@queue_def.denormalise}.") }

  queue_def = QueueDefinition.new(reply_queue, opts.merge(:auto_delete => true, :durable => false))
  logger.debug { "reply queue: #{queue_def.denormalise}" }

  @reply_queue_completion ||= EM::Completion.new.tap do |completion|
    Receiver.new(queue_def) do |queue|
      queue.subscribe do |payload, receiver|
        @reply_container.delete(receiver.correlation_id).tap do |reply|
          if reply
            reply[:timeout].cancel_timeout
            reply[:reply_proc].call(payload, receiver)
          else
            receiver.ack if opts[:auto_ack]
            logger.error { "No reply block for correlation_id: #{receiver.correlation_id}. This is probably a timed out message. Message: #{payload.to_json}" }
          end
        end
      end

      EM.next_tick do
        completion.succeed(queue)
      end
    end
  end
end
on_reply_error(&blk) click to toggle source

This gets called if there is a mismatch in the message_id & correlation_id.

# File lib/smith/messaging/sender.rb, line 148
def on_reply_error(&blk)
  @reply_error = blk
end
on_serialisation_error(&blk) click to toggle source

Called if there is a problem serialising an ACL. @yield [blk] block to run when there is an error. @yieldparam [Proc] the publish callback.

# File lib/smith/messaging/sender.rb, line 92
def on_serialisation_error(&blk)
  @on_serialisation_error = blk
end
on_timeout(timeout=nil, &blk) click to toggle source
# File lib/smith/messaging/sender.rb, line 85
def on_timeout(timeout=nil, &blk)
  @timeout = Timeout.new(timeout || Smith.config.smith.timeout, &blk)
end
publish(payload, opts={}, &blk) click to toggle source

If reply queue is set the block will be called when the message recipient replies to the message and it is received.

If a block is passed to this method but the :reply_queue option is not set it will be called when the message has been safely published.

If the :reply_queue is an empty string then a random queue name will be generated.

# File lib/smith/messaging/sender.rb, line 67
def publish(payload, opts={}, &blk)
  if @reply_queue_completion
    @reply_queue_completion.completion do |reply_queue|
      message_id = random
      logger.verbose { "message_id: #{message_id}" }

      #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ####
      #### TODO if there is a timeout delete   ####
      #### the proc from the @reply_container. ####
      #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ####
      @reply_container[message_id] = {:reply_proc => @reply_proc, :timeout => @timeout.clone.tap {|t| t.set_timeout(message_id) }}
      _publish(payload, @options.publish(opts, {:reply_to => reply_queue.queue_name, :message_id => message_id}), &blk)
    end
  else
    _publish(payload, @options.publish(opts), &blk)
  end
end
queue_name() click to toggle source
# File lib/smith/messaging/sender.rb, line 183
def queue_name
  @queue_def.denormalise
end
status(&blk) click to toggle source
# File lib/smith/messaging/sender.rb, line 152
def status(&blk)
  @queue_completion.completion do |queue|
    queue.status do |num_messages, num_consumers|
      blk.call(num_messages, num_consumers)
    end
  end
end

Private Instance Methods

_publish(message, opts, &blk) click to toggle source
# File lib/smith/messaging/sender.rb, line 189
def _publish(message, opts, &blk)
  logger.verbose { "Publishing to: [queue]: #{@queue_def.denormalise}. [options]: #{opts}" }
  logger.verbose { "ACL content: [queue]: #{@queue_def.denormalise}, [metadata type]: #{message.class}, [message]: #{message.inspect}" }

  increment_counter

  type = @acl_type_cache.get_by_type(message.class)

  @exchange_completion.completion do |exchange|
    begin
      exchange.publish(message.to_s, opts.merge(:type => type), &blk)
    rescue Protobuf::SerializationError => e
      if @on_serialisation_error
        @on_serialisation_error.call(e, blk)
      else
        raise ACL::Error.new(e)
      end
    end
  end
end
increment_counter(value=1) click to toggle source
# File lib/smith/messaging/sender.rb, line 210
def increment_counter(value=1)
  @message_counts[@queue_def.denormalise] += value
end