class KafkaReplicator::TopicsReplicator

Constants

SKIP_TOPICS

Attributes

destination_kafka[R]
destination_producer[R]
logger[R]
replicated_topics[R]
skip_topics[R]
source_consumer[R]
source_kafka[R]
stopped[R]

Public Class Methods

new(source_brokers:, destination_brokers:, skip_topics: []) click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 14
def initialize(source_brokers:, destination_brokers:, skip_topics: [])
  @source_brokers = source_brokers
  @destination_brokers = destination_brokers
  @skip_topics = SKIP_TOPICS | skip_topics
  @logger = Logger.new(STDOUT)
end

Public Instance Methods

replicate() click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 65
def replicate
  replicate
rescue => e
  logger.error "Exception: #{e}"
  logger.error "Exception.cause: #{e.cause.inspect}"
end
setup() click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 21
def setup
  @stopped = false
  @replicated_topics = Set[]
  @source_consumer = nil
  @destination_producer = nil
end
start() click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 50
def start
  loop do
    break if stopped

    logger.info 'Setting up configuration...'
    setup

    logger.info 'Adding topics for replication...'
    subscribe_to_source_topics

    logger.info 'Starting replication...'
    replicate
  end
end
stop() click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 72
def stop
  logger.info 'Stopping replication...'
  source_consumer.stop
  @stopped = true
end

Private Instance Methods

parse_message(value) click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 117
def parse_message(value)
  MultiJson.load(value, symbolize_keys: true)
rescue MultiJson::ParseError => exception
  logger.error exception.cause

  exception
end
source_topics() click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 125
def source_topics
  source_kafka.topics.reject { |topic_name| skip_topics.include?(topic_name) }.to_set
end
subscribe_to_source_topics() click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 133
def subscribe_to_source_topics
  destination_topics = destination_kafka.topics

  unreplicated_topics.each do |topic|
    source_consumer.subscribe(topic, start_from_beginning: true)
    replicated_topics << topic

    unless destination_topics.include?(topic)
      destination_kafka.create_topic(
        topic,
        num_partitions: source_kafka.partitions_for(topic),
        replication_factor: 3 # Need to be specified because otherwise ruby-kafa driver will make it equal to 1
      )
    end

    logger.info "Topic added: #{topic}"
  end
end
unreplicated_topics() click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 129
def unreplicated_topics
  source_topics - replicated_topics
end