class Kril::Producer
High level abstraction for producing records to topics.
Public Class Methods
new(avro: nil, kafka: nil, config: {})
click to toggle source
avro - Avro instance for deserializing records [AvroTurf::Messaging] kafka - Kafka instance for creating producers [Kafka] config - producer configuration (optional) [Hash]
# File lib/kril/producer.rb, line 9 def initialize(avro: nil, kafka: nil, config: {}) config[:required_acks] ||= 1 config[:delivery_threshold] ||= 1 sync_config = config.dup @avro = avro @async = kafka.async_producer(config) sync_config.delete(:delivery_threshold) @sync = kafka.producer(sync_config) end
Public Instance Methods
send(record:, schema_name:, namespace: nil, topic: nil, syncronous: false)
click to toggle source
Commit a record to a topic.
record - record to serialize and commit [String] schema_name - name of schema to encode record from [String] namespace - namespace of schema (optional) [String] topic - name of topic. Will be schema_name if nil (optional) [String] synchronous - blocks until commit if true (optional) [Boolean]
# File lib/kril/producer.rb, line 26 def send(record:, schema_name:, namespace: nil, topic: nil, syncronous: false) topic ||= schema_name encoded = @avro.encode(record, schema_name: schema_name, namespace: namespace) if syncronous @sync.produce(encoded, topic: topic) @sync.deliver_messages else @async.produce(encoded, topic: topic) end ensure @async.shutdown @sync.shutdown end