class Kril::Producer

High level abstraction for producing records to topics.

Public Class Methods

new(avro: nil, kafka: nil, config: {}) click to toggle source

avro - Avro instance for deserializing records [AvroTurf::Messaging] kafka - Kafka instance for creating producers [Kafka] config - producer configuration (optional) [Hash]

# File lib/kril/producer.rb, line 9
def initialize(avro: nil, kafka: nil, config: {})
  config[:required_acks] ||= 1
  config[:delivery_threshold] ||= 1
  sync_config = config.dup
  @avro = avro
  @async = kafka.async_producer(config)
  sync_config.delete(:delivery_threshold)
  @sync = kafka.producer(sync_config)
end

Public Instance Methods

send(record:, schema_name:, namespace: nil, topic: nil, syncronous: false) click to toggle source

Commit a record to a topic.

record - record to serialize and commit [String] schema_name - name of schema to encode record from [String] namespace - namespace of schema (optional) [String] topic - name of topic. Will be schema_name if nil (optional) [String] synchronous - blocks until commit if true (optional) [Boolean]

# File lib/kril/producer.rb, line 26
def send(record:,
         schema_name:,
         namespace: nil,
         topic: nil,
         syncronous: false)
  topic ||= schema_name
  encoded = @avro.encode(record,
                         schema_name: schema_name,
                         namespace: namespace)
  if syncronous
    @sync.produce(encoded, topic: topic)
    @sync.deliver_messages
  else
    @async.produce(encoded, topic: topic)
  end
ensure
  @async.shutdown
  @sync.shutdown
end