class KafkaReplicator::TopicsReplicator
Constants
- SKIP_TOPICS
Attributes
destination_kafka[R]
destination_producer[R]
logger[R]
replicated_topics[R]
skip_topics[R]
source_consumer[R]
source_kafka[R]
stopped[R]
Public Class Methods
new(source_brokers:, destination_brokers:, skip_topics: [])
click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 14 def initialize(source_brokers:, destination_brokers:, skip_topics: []) @source_brokers = source_brokers @destination_brokers = destination_brokers @skip_topics = SKIP_TOPICS | skip_topics @logger = Logger.new(STDOUT) end
Public Instance Methods
replicate()
click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 65 def replicate replicate rescue => e logger.error "Exception: #{e}" logger.error "Exception.cause: #{e.cause.inspect}" end
setup()
click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 21 def setup @stopped = false @replicated_topics = Set[] @source_consumer = nil @destination_producer = nil end
start()
click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 50 def start loop do break if stopped logger.info 'Setting up configuration...' setup logger.info 'Adding topics for replication...' subscribe_to_source_topics logger.info 'Starting replication...' replicate end end
stop()
click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 72 def stop logger.info 'Stopping replication...' source_consumer.stop @stopped = true end
Private Instance Methods
parse_message(value)
click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 117 def parse_message(value) MultiJson.load(value, symbolize_keys: true) rescue MultiJson::ParseError => exception logger.error exception.cause exception end
source_topics()
click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 125 def source_topics source_kafka.topics.reject { |topic_name| skip_topics.include?(topic_name) }.to_set end
subscribe_to_source_topics()
click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 133 def subscribe_to_source_topics destination_topics = destination_kafka.topics unreplicated_topics.each do |topic| source_consumer.subscribe(topic, start_from_beginning: true) replicated_topics << topic unless destination_topics.include?(topic) destination_kafka.create_topic( topic, num_partitions: source_kafka.partitions_for(topic), replication_factor: 3 # Need to be specified because otherwise ruby-kafa driver will make it equal to 1 ) end logger.info "Topic added: #{topic}" end end
unreplicated_topics()
click to toggle source
# File lib/kafka_replicator/topics_replicator.rb, line 129 def unreplicated_topics source_topics - replicated_topics end