class Avro2Kafka
Constants
- VERSION
Attributes
options[R]
Public Class Methods
logger()
click to toggle source
# File lib/avro2kafka.rb, line 10 def self.logger @logger ||= Logr::Logger.new('avro2kafka') end
new(options)
click to toggle source
# File lib/avro2kafka.rb, line 14 def initialize(options) @path = ARGV.first @options = options end
Public Instance Methods
broker_list()
click to toggle source
# File lib/avro2kafka.rb, line 45 def broker_list options.fetch(:broker_list).split(',') end
extra_data()
click to toggle source
# File lib/avro2kafka.rb, line 49 def extra_data options.fetch(:data, []).each_with_object({}) do |data, hash| key, value = data.split('=') hash[key] = value end end
filename()
click to toggle source
# File lib/avro2kafka.rb, line 37 def filename File.basename(@path) end
kafka_options()
click to toggle source
# File lib/avro2kafka.rb, line 56 def kafka_options { broker_list: broker_list, topic: topic, data: extra_data, keys: options.fetch(:key, '').split(',').map(&:strip), } end
publish()
click to toggle source
# File lib/avro2kafka.rb, line 23 def publish Avro2Kafka.logger.event('started_publishing', { filename: filename, topic: topic }.merge(extra_data)) .monitored("Started publishing #{filename}", "Started publishing #{filename} to the #{topic} Kafka topic.") .info("Started publishing #{filename}") records = AvroReader.new(reader).read KafkaPublisher.new(**kafka_options).publish(records) Avro2Kafka.logger.event('finished_publishing', { filename: filename, topic: topic }.merge(extra_data)) .monitored("Finished publishing #{filename}", "Finished publishing #{filename} to the #{topic} Kafka topic.") .metric('lines_processed', records.count) .info("Finished publishing #{filename}") end
reader()
click to toggle source
# File lib/avro2kafka.rb, line 19 def reader ARGF.tap { |argf| argf.rewind } end
topic()
click to toggle source
# File lib/avro2kafka.rb, line 41 def topic options.fetch(:topic) end