class Eventsourcer::Brokers::KafkaBroker

Public Class Methods

publish(table_name:, previous_changes:) click to toggle source
# File lib/eventsourcer/brokers/kafka_broker.rb, line 10
def self.publish(table_name:, previous_changes:)
  producer = client.async_producer(max_queue_size: Eventsourcer.configuration.kafka_max_queue_size || 10000)
  begin
    produce_to_kafka(producer, table_name, previous_changes.to_json)
  rescue Kafka::BufferOverflow
    producer.deliver_messages
    produce_to_kafka(producer, table_name, previous_changes.to_json)
  end

  producer.deliver_messages
  producer.shutdown
end

Private Class Methods

client() click to toggle source
# File lib/eventsourcer/brokers/kafka_broker.rb, line 31
def self.client
  @mutex.synchronize do
    @client ||= Kafka.new(
      seed_brokers: Eventsourcer.configuration.kafka_seed_brokers || ["localhost:9092"],
      logger: Eventsourcer.configuration.logger || Rails.logger
    )
  end
end
produce_to_kafka(producer, table_name, previous_changes_json) click to toggle source
# File lib/eventsourcer/brokers/kafka_broker.rb, line 25
def self.produce_to_kafka(producer, table_name, previous_changes_json)
  producer.produce(previous_changes_json,
           topic: table_name,
           key: SecureRandom.uuid)
end