class Avro2Kafka::KafkaPublisher
Attributes
data[R]
keys[R]
producer[R]
topic[R]
Public Class Methods
new(broker_list:, topic:, keys: [], data: {})
click to toggle source
# File lib/avro2kafka/kafka_publisher.rb, line 8 def initialize(broker_list:, topic:, keys: [], data: {}) @producer = Poseidon::Producer.new(broker_list, 'avro2kafka') @topic = topic @keys = keys @data = data end
Public Instance Methods
publish(records)
click to toggle source
# File lib/avro2kafka/kafka_publisher.rb, line 15 def publish(records) records.each_slice(100) do |batch| messages = batch.map { |record| prepare_record(record) } producer.send_messages(messages) end end
Private Instance Methods
prepare_record(record)
click to toggle source
# File lib/avro2kafka/kafka_publisher.rb, line 24 def prepare_record(record) record = record.merge(data) if record.is_a?(Hash) if keys.empty? Poseidon::MessageToSend.new(topic, record.to_json) else message_key = keys.map { |key| record[key] }.join Poseidon::MessageToSend.new(topic, record.to_json, message_key) end end