class Karafka::Persistence::Consumers
Module used to provide a persistent cache across batch requests for a given topic and partition to store some additional details when the persistent mode for a given topic is turned on
Constants
- PERSISTENCE_SCOPE
Thread.current scope under which we store consumers data
Public Class Methods
Removes all persisted instances of consumers from the consumer cache @note This is used to reload consumers instances when code reloading in development mode
is present. This should not be used in production.
# File lib/karafka/persistence/consumers.rb, line 36 def clear Thread .list .select { |thread| thread[PERSISTENCE_SCOPE] } .each { |thread| thread[PERSISTENCE_SCOPE].clear } end
@return [Hash] current thread's persistence scope hash with all the consumers
# File lib/karafka/persistence/consumers.rb, line 18 def current Thread.current[PERSISTENCE_SCOPE] ||= Concurrent::Hash.new do |hash, key| hash[key] = Concurrent::Hash.new end end
Used to build (if block given) and/or fetch a current consumer instance that will be
used to process messages from a given topic and partition
@param topic [Karafka::Routing::Topic] topic instance for which we might cache @param partition [Integer] number of partition for which we want to cache @return [Karafka::BaseConsumer] base consumer descendant
# File lib/karafka/persistence/consumers.rb, line 29 def fetch(topic, partition) current[topic][partition] ||= topic.consumer.new(topic) end