class Kril::Consumer
High level abstraction for consuming records from topics.
Public Class Methods
new(avro: nil, kafka: nil, config: {})
click to toggle source
avro - Avro instance for deserializing records [AvroTurf::Messaging] kafka - Kafka instance for creating consumers [Kafka] config - consumer configuration (optional) [Hash]
# File lib/kril/consumer.rb, line 9 def initialize(avro: nil, kafka: nil, config: {}) config[:group_id] ||= 'kril-consumer' @avro = avro @kafka = kafka @config = config end
Public Instance Methods
consume_all(topic) { |decode(message), consumer| ... }
click to toggle source
Consume all records from a topic. Each record will be yielded to block along with consumer instance. Will listen to topic after all records have been consumed.
topic - topic to consume from [String] yields - record, consumer [String, Kafka::Consumer] return - [nil]
# File lib/kril/consumer.rb, line 42 def consume_all(topic) config = @config.clone config[:group_id] = SecureRandom.uuid consumer = build_consumer(topic, true, config) consumer.each_message do |message| yield decode(message), consumer end ensure consumer.stop end
consume_one(topic)
click to toggle source
Consume a single record from any partition. Will block indefinitely if no record present.
topic - topic to consume from [String] return - deserialized record [String]
# File lib/kril/consumer.rb, line 21 def consume_one(topic) consumer = build_consumer(topic, true, @config) msg = nil consumer.each_message do |message| msg = decode(message) consumer.mark_message_as_processed(message) consumer.commit_offsets consumer.stop end msg ensure consumer.stop end
Private Instance Methods
build_consumer(topic, start_from_beginning, config)
click to toggle source
# File lib/kril/consumer.rb, line 55 def build_consumer(topic, start_from_beginning, config) consumer = @kafka.consumer(config) consumer.subscribe(topic, start_from_beginning: start_from_beginning) consumer end
decode(message)
click to toggle source
# File lib/kril/consumer.rb, line 61 def decode(message) { key: message.key, value: @avro.decode(message.value), offset: message.offset, create_time: message.create_time, topic: message.topic, partition: message.partition } end