class Deimos::Utils::InlineConsumer
Class which can process/consume messages inline.
Constants
- MAX_MESSAGE_WAIT_TIME
- MAX_TOPIC_WAIT_TIME
Public Class Methods
consume(topic:, frk_consumer:, num_messages: 10)
click to toggle source
Consume
the last X messages from a topic. @param topic [String] @param frk_consumer
[Class] @param num_messages [Integer] If this number is >= the number
of messages in the topic, all messages will be consumed.
# File lib/deimos/utils/inline_consumer.rb, line 109 def self.consume(topic:, frk_consumer:, num_messages: 10) listener = SeekListener.new( handler: frk_consumer, group_id: SecureRandom.hex, topic: topic, heartbeat_interval: 1 ) listener.num_messages = num_messages # Add the start_time and last_message_time attributes to the # consumer class so we can kill it if it's gone on too long class << frk_consumer attr_accessor :start_time, :last_message_time end subscribers = [] subscribers << ActiveSupport::Notifications. subscribe('phobos.listener.process_message') do frk_consumer.last_message_time = Time.zone.now end subscribers << ActiveSupport::Notifications. subscribe('phobos.listener.start_handler') do frk_consumer.start_time = Time.zone.now frk_consumer.last_message_time = nil end subscribers << ActiveSupport::Notifications. subscribe('heartbeat.consumer.kafka') do if frk_consumer.last_message_time if Time.zone.now - frk_consumer.last_message_time > MAX_MESSAGE_WAIT_TIME raise Phobos::AbortError end elsif Time.zone.now - frk_consumer.start_time > MAX_TOPIC_WAIT_TIME Deimos.config.logger.error('Aborting - initial wait too long') raise Phobos::AbortError end end listener.start subscribers.each { |s| ActiveSupport::Notifications.unsubscribe(s) } end
get_messages_for(topic:, schema: nil, namespace: nil, key_config: nil, config_class: nil, num_messages: 10)
click to toggle source
Get the last X messages from a topic. You can specify a subclass of Deimos::Consumer
or Deimos::Producer
, or provide the schema, namespace and key_config directly. @param topic [String] @param config_class [Class < Deimos::Consumer|Deimos::Producer>] @param schema [String] @param namespace [String] @param key_config [Hash] @param num_messages [Number] @return [Array<Hash>]
# File lib/deimos/utils/inline_consumer.rb, line 82 def self.get_messages_for(topic:, schema: nil, namespace: nil, key_config: nil, config_class: nil, num_messages: 10) if config_class MessageBankHandler.config_class = config_class elsif schema.nil? || key_config.nil? raise 'You must specify either a config_class or a schema, namespace and key_config!' else MessageBankHandler.class_eval do schema schema namespace namespace key_config key_config @decoder = nil @key_decoder = nil end end self.consume(topic: topic, frk_consumer: MessageBankHandler, num_messages: num_messages) messages = MessageBankHandler.total_messages messages.size <= num_messages ? messages : messages[-num_messages..-1] end