class Karafka::Routing::Topic

Topic stores all the details on how we should interact with Kafka given topic It belongs to a consumer group as from 0.6 all the topics can work in the same consumer group It is a part of Karafka's DSL

Attributes

consumer[RW]
consumer_group[R]
id[R]

Public Class Methods

new(name, consumer_group) click to toggle source

@param [String, Symbol] name of a topic on which we want to listen @param consumer_group [Karafka::Routing::ConsumerGroup] owning consumer group of this topic

# File lib/karafka/routing/topic.rb, line 19
def initialize(name, consumer_group)
  @name = name.to_s
  @consumer_group = consumer_group
  @attributes = {}
  # @note We use identifier related to the consumer group that owns a topic, because from
  #   Karafka 0.6 we can handle multiple Kafka instances with the same process and we can
  #   have same topic name across multiple Kafkas
  @id = "#{consumer_group.id}_#{@name}"
end

Public Instance Methods

build() click to toggle source

Initializes default values for all the options that support defaults if their values are not yet specified. This is need to be done (cannot be lazy loaded on first use) because everywhere except Karafka server command, those would not be initialized on time - for example for Sidekiq

# File lib/karafka/routing/topic.rb, line 33
def build
  Karafka::AttributesMap.topic.each { |attr| send(attr) }
  self
end
responder() click to toggle source

@return [Class, nil] Class (not an instance) of a responder that should respond from

consumer back to Kafka (useful for piping data flows)
# File lib/karafka/routing/topic.rb, line 40
def responder
  @responder ||= Karafka::Responders::Builder.new(consumer).build
end
to_h() click to toggle source

@return [Hash] hash with all the topic attributes @note This is being used when we validate the consumer_group and its topics

# File lib/karafka/routing/topic.rb, line 50
def to_h
  map = Karafka::AttributesMap.topic.map do |attribute|
    [attribute, public_send(attribute)]
  end

  Hash[map].merge!(
    id: id,
    consumer: consumer
  )
end