class Deimos::Utils::InlineConsumer

Class which can process/consume messages inline.

Constants

MAX_MESSAGE_WAIT_TIME
MAX_TOPIC_WAIT_TIME

Public Class Methods

consume(topic:, frk_consumer:, num_messages: 10) click to toggle source

Consume the last X messages from a topic. @param topic [String] @param frk_consumer [Class] @param num_messages [Integer] If this number is >= the number

of messages in the topic, all messages will be consumed.
# File lib/deimos/utils/inline_consumer.rb, line 109
def self.consume(topic:, frk_consumer:, num_messages: 10)
  listener = SeekListener.new(
    handler: frk_consumer,
    group_id: SecureRandom.hex,
    topic: topic,
    heartbeat_interval: 1
  )
  listener.num_messages = num_messages

  # Add the start_time and last_message_time attributes to the
  # consumer class so we can kill it if it's gone on too long
  class << frk_consumer
    attr_accessor :start_time, :last_message_time
  end

  subscribers = []
  subscribers << ActiveSupport::Notifications.
    subscribe('phobos.listener.process_message') do
      frk_consumer.last_message_time = Time.zone.now
    end
  subscribers << ActiveSupport::Notifications.
    subscribe('phobos.listener.start_handler') do
      frk_consumer.start_time = Time.zone.now
      frk_consumer.last_message_time = nil
    end
  subscribers << ActiveSupport::Notifications.
    subscribe('heartbeat.consumer.kafka') do
      if frk_consumer.last_message_time
        if Time.zone.now - frk_consumer.last_message_time > MAX_MESSAGE_WAIT_TIME
          raise Phobos::AbortError
        end
      elsif Time.zone.now - frk_consumer.start_time > MAX_TOPIC_WAIT_TIME
        Deimos.config.logger.error('Aborting - initial wait too long')
        raise Phobos::AbortError
      end
    end
  listener.start
  subscribers.each { |s| ActiveSupport::Notifications.unsubscribe(s) }
end
get_messages_for(topic:, schema: nil, namespace: nil, key_config: nil, config_class: nil, num_messages: 10) click to toggle source

Get the last X messages from a topic. You can specify a subclass of Deimos::Consumer or Deimos::Producer, or provide the schema, namespace and key_config directly. @param topic [String] @param config_class [Class < Deimos::Consumer|Deimos::Producer>] @param schema [String] @param namespace [String] @param key_config [Hash] @param num_messages [Number] @return [Array<Hash>]

# File lib/deimos/utils/inline_consumer.rb, line 82
def self.get_messages_for(topic:, schema: nil, namespace: nil, key_config: nil,
                          config_class: nil, num_messages: 10)
  if config_class
    MessageBankHandler.config_class = config_class
  elsif schema.nil? || key_config.nil?
    raise 'You must specify either a config_class or a schema, namespace and key_config!'
  else
    MessageBankHandler.class_eval do
      schema schema
      namespace namespace
      key_config key_config
      @decoder = nil
      @key_decoder = nil
    end
  end
  self.consume(topic: topic,
               frk_consumer: MessageBankHandler,
               num_messages: num_messages)
  messages = MessageBankHandler.total_messages
  messages.size <= num_messages ? messages : messages[-num_messages..-1]
end