class Avro2Kafka::KafkaPublisher

Attributes

data[R]
keys[R]
producer[R]
topic[R]

Public Class Methods

new(broker_list:, topic:, keys: [], data: {}) click to toggle source
# File lib/avro2kafka/kafka_publisher.rb, line 8
def initialize(broker_list:, topic:, keys: [], data: {})
  @producer = Poseidon::Producer.new(broker_list, 'avro2kafka')
  @topic = topic
  @keys = keys
  @data = data
end

Public Instance Methods

publish(records) click to toggle source
# File lib/avro2kafka/kafka_publisher.rb, line 15
def publish(records)
  records.each_slice(100) do |batch|
    messages = batch.map { |record| prepare_record(record) }
    producer.send_messages(messages)
  end
end

Private Instance Methods

prepare_record(record) click to toggle source
# File lib/avro2kafka/kafka_publisher.rb, line 24
def prepare_record(record)
  record = record.merge(data) if record.is_a?(Hash)
  if keys.empty?
    Poseidon::MessageToSend.new(topic, record.to_json)
  else
    message_key = keys.map { |key| record[key] }.join
    Poseidon::MessageToSend.new(topic, record.to_json, message_key)
  end
end