class Tochtli::ReplyQueue

Attributes

connection[R]
logger[R]
queue[R]

Public Class Methods

new(rabbit_connection, logger=nil) click to toggle source
# File lib/tochtli/reply_queue.rb, line 5
def initialize(rabbit_connection, logger=nil)
  @connection              = rabbit_connection
  @logger                  = logger || rabbit_connection.logger
  @message_handlers        = {}
  @message_timeout_threads = {}

  subscribe
end

Public Instance Methods

handle_reply(reply, correlation_id=nil) click to toggle source
# File lib/tochtli/reply_queue.rb, line 73
def handle_reply(reply, correlation_id=nil)
  correlation_id ||= reply.properties.correlation_id if reply.is_a?(Tochtli::Message)
  raise ArgumentError, "Correlated message ID expected" unless correlation_id
  if (handler = @message_handlers.delete(correlation_id))
    if (timeout_thread = @message_timeout_threads.delete(correlation_id))
      timeout_thread.kill
      timeout_thread.join # make sure timeout thread is dead
    end

    if !reply.is_a?(Tochtli::ErrorMessage) && !reply.is_a?(Exception)

      begin

        handler.call(reply)

      rescue Exception
        logger.error $!
        logger.error $!.backtrace.join("\n")
        handler.on_error($!)
      end

    else
      handler.on_error(reply)
    end

  else
    logger.error "[Tochtli::ReplyQueue] Unexpected message delivery '#{correlation_id}':\n\t#{reply.inspect})"
  end
end
handle_timeout(original_message) click to toggle source
# File lib/tochtli/reply_queue.rb, line 103
def handle_timeout(original_message)
  if (handler = @message_handlers.delete(original_message.id))
    @message_timeout_threads.delete(original_message.id)
    handler.on_timeout original_message
  else
    raise "Internal error, timeout handler not found for message: #{original_message.id}, #{original_message.inspect}"
  end
end
name() click to toggle source
# File lib/tochtli/reply_queue.rb, line 14
def name
  @queue.name
end
on_delivery(delivery_info, metadata, payload) click to toggle source
# File lib/tochtli/reply_queue.rb, line 57
def on_delivery(delivery_info, metadata, payload)
  class_name       = metadata.type.camelize.gsub(/[^a-zA-Z0-9\:]/, '_') # basic sanity
  reply_class      = eval(class_name)
  reply            = reply_class.new({}, metadata)
  attributes       = JSON.parse(payload)
  reply.attributes = attributes

  logger.debug "[#{Time.now} AMQP] Replay for #{reply.properties.correlation_id}: #{reply.inspect}"

  handle_reply reply

rescue Exception
  logger.error $!
  logger.error $!.backtrace.join("\n")
end
reconnect(channel) click to toggle source
# File lib/tochtli/reply_queue.rb, line 32
def reconnect(channel)
  if @queue
    channel.connection.logger.debug "Recovering reply queue binding (original: #{@original_queue_name}, current: #{@queue.name})"

    # Re-bind queue after name change (auto-generated new on server has been re-generated)
    exchange = @connection.create_exchange(channel)
    @queue.unbind exchange, routing_key: @original_queue_name
    @queue.bind exchange, routing_key: @queue.name
  end

  @original_queue_name = @queue.name
end
register_message_handler(message, handler=nil, timeout=nil, &block) click to toggle source
# File lib/tochtli/reply_queue.rb, line 45
def register_message_handler(message, handler=nil, timeout=nil, &block)
  @message_handlers[message.id] = handler || block
  if timeout
    timeout_thread                       = Thread.start do
      sleep timeout
      logger.warn "[#{Time.now} AMQP] TIMEOUT on message '#{message.id}' timeout: #{timeout}"
      handle_timeout message
    end
    @message_timeout_threads[message.id] = timeout_thread
  end
end
subscribe() click to toggle source
# File lib/tochtli/reply_queue.rb, line 18
def subscribe
  channel  = @connection.channel
  exchange = @connection.exchange

  @queue               = channel.queue('', exclusive: true, auto_delete: true)
  @original_queue_name = @queue.name
  @queue.bind exchange, routing_key: @queue.name

  @consumer = Consumer.new(self, channel, @queue)
  @consumer.on_delivery(&method(:on_delivery))

  @queue.subscribe_with(@consumer)
end