class Kril::Consumer

High level abstraction for consuming records from topics.

Public Class Methods

new(avro: nil, kafka: nil, config: {}) click to toggle source

avro - Avro instance for deserializing records [AvroTurf::Messaging] kafka - Kafka instance for creating consumers [Kafka] config - consumer configuration (optional) [Hash]

# File lib/kril/consumer.rb, line 9
def initialize(avro: nil, kafka: nil, config: {})
  config[:group_id] ||= 'kril-consumer'
  @avro = avro
  @kafka = kafka
  @config = config
end

Public Instance Methods

consume_all(topic) { |decode(message), consumer| ... } click to toggle source

Consume all records from a topic. Each record will be yielded to block along with consumer instance. Will listen to topic after all records have been consumed.

topic - topic to consume from [String] yields - record, consumer [String, Kafka::Consumer] return - [nil]

# File lib/kril/consumer.rb, line 42
def consume_all(topic)
  config = @config.clone
  config[:group_id] = SecureRandom.uuid
  consumer = build_consumer(topic, true, config)
  consumer.each_message do |message|
    yield decode(message), consumer
  end
ensure
  consumer.stop
end
consume_one(topic) click to toggle source

Consume a single record from any partition. Will block indefinitely if no record present.

topic - topic to consume from [String] return - deserialized record [String]

# File lib/kril/consumer.rb, line 21
def consume_one(topic)
  consumer = build_consumer(topic, true, @config)
  msg = nil
  consumer.each_message do |message|
    msg = decode(message)
    consumer.mark_message_as_processed(message)
    consumer.commit_offsets
    consumer.stop
  end
  msg
ensure
  consumer.stop
end

Private Instance Methods

build_consumer(topic, start_from_beginning, config) click to toggle source
# File lib/kril/consumer.rb, line 55
def build_consumer(topic, start_from_beginning, config)
  consumer = @kafka.consumer(config)
  consumer.subscribe(topic, start_from_beginning: start_from_beginning)
  consumer
end
decode(message) click to toggle source
# File lib/kril/consumer.rb, line 61
def decode(message)
  {
    key: message.key,
    value: @avro.decode(message.value),
    offset: message.offset,
    create_time: message.create_time,
    topic: message.topic,
    partition: message.partition
  }
end