class Avro2Kafka

Constants

VERSION

Attributes

options[R]

Public Class Methods

logger() click to toggle source
# File lib/avro2kafka.rb, line 10
def self.logger
  @logger ||= Logr::Logger.new('avro2kafka')
end
new(options) click to toggle source
# File lib/avro2kafka.rb, line 14
def initialize(options)
  @path = ARGV.first
  @options = options
end

Public Instance Methods

broker_list() click to toggle source
# File lib/avro2kafka.rb, line 45
def broker_list
  options.fetch(:broker_list).split(',')
end
extra_data() click to toggle source
# File lib/avro2kafka.rb, line 49
def extra_data
  options.fetch(:data, []).each_with_object({}) do |data, hash|
    key, value = data.split('=')
    hash[key] = value
  end
end
filename() click to toggle source
# File lib/avro2kafka.rb, line 37
def filename
  File.basename(@path)
end
kafka_options() click to toggle source
# File lib/avro2kafka.rb, line 56
def kafka_options
  {
    broker_list: broker_list,
    topic: topic,
    data: extra_data,
    keys: options.fetch(:key, '').split(',').map(&:strip),
  }
end
publish() click to toggle source
# File lib/avro2kafka.rb, line 23
def publish
  Avro2Kafka.logger.event('started_publishing', { filename: filename, topic: topic }.merge(extra_data))
                   .monitored("Started publishing #{filename}", "Started publishing #{filename} to the #{topic} Kafka topic.")
                   .info("Started publishing #{filename}")

  records = AvroReader.new(reader).read
  KafkaPublisher.new(**kafka_options).publish(records)

  Avro2Kafka.logger.event('finished_publishing', { filename: filename, topic: topic }.merge(extra_data))
                   .monitored("Finished publishing #{filename}", "Finished publishing #{filename} to the #{topic} Kafka topic.")
                   .metric('lines_processed', records.count)
                   .info("Finished publishing #{filename}")
end
reader() click to toggle source
# File lib/avro2kafka.rb, line 19
def reader
  ARGF.tap { |argf| argf.rewind }
end
topic() click to toggle source
# File lib/avro2kafka.rb, line 41
def topic
  options.fetch(:topic)
end