class Karafka::Routing::Builder

Builder used as a DSL layer for building consumers and telling them which topics to consume @example Build a simple (most common) route

consumers do
  topic :new_videos do
    consumer NewVideosConsumer
  end
end

Constants

CONTRACT

Consumer group consistency checking contract

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/karafka/routing/builder.rb, line 18
def initialize
  super
  @draws = Concurrent::Array.new
end

Public Instance Methods

active() click to toggle source

@return [Array<Karafka::Routing::ConsumerGroup>] only active consumer groups that

we want to use. Since Karafka supports multi-process setup, we need to be able
to pick only those consumer groups that should be active in our given process context
# File lib/karafka/routing/builder.rb, line 52
def active
  select(&:active?)
end
clear() click to toggle source

Clears the builder and the draws memory

Calls superclass method
# File lib/karafka/routing/builder.rb, line 57
def clear
  @draws.clear
  super
end
draw(&block) click to toggle source

Used to draw routes for Karafka @param block [Proc] block we will evaluate within the builder context @yield Evaluates provided block in a builder context so we can describe routes @raise [Karafka::Errors::InvalidConfigurationError] raised when configuration

doesn't match with the config contract

@note After it is done drawing it will store and validate all the routes to make sure that

they are correct and that there are no topic/group duplications (this is forbidden)

@example

draw do
  topic :xyz do
  end
end
# File lib/karafka/routing/builder.rb, line 35
def draw(&block)
  @draws << block

  instance_eval(&block)

  each do |consumer_group|
    hashed_group = consumer_group.to_h
    validation_result = CONTRACT.call(hashed_group)
    next if validation_result.success?

    raise Errors::InvalidConfigurationError, validation_result.errors.to_h
  end
end
reload() click to toggle source

Redraws all the routes for the in-process code reloading. @note This won't allow registration of new topics without process restart but will trigger

cache invalidation so all the classes, etc are re-fetched after code reload
# File lib/karafka/routing/builder.rb, line 65
def reload
  draws = @draws.dup
  clear
  draws.each { |block| draw(&block) }
end

Private Instance Methods

consumer_group(group_id, &block) click to toggle source

Builds and saves given consumer group @param group_id [String, Symbol] name for consumer group @param block [Proc] proc that should be executed in the proxy context

# File lib/karafka/routing/builder.rb, line 76
def consumer_group(group_id, &block)
  consumer_group = ConsumerGroup.new(group_id.to_s)
  self << Proxy.new(consumer_group, &block).target
end
topic(topic_name, &block) click to toggle source

@param topic_name [String, Symbol] name of a topic from which we want to consumer @param block [Proc] proc we want to evaluate in the topic context

# File lib/karafka/routing/builder.rb, line 83
def topic(topic_name, &block)
  consumer_group(topic_name) do
    topic(topic_name, &block).tap(&:build)
  end
end