module Karafka::Connection::BatchDelegator
Class that delegates processing of batch received messages for which we listen to a proper processor
Public Class Methods
call(group_id, kafka_batch)
click to toggle source
Delegates messages (does something with them) It will either schedule or run a proper processor action for messages @param group_id [String] group_id of a group from which a given message came @param kafka_batch [<Kafka::FetchedBatch>] raw messages fetched batch @note This should be looped to obtain a constant delegating of new messages
# File lib/karafka/connection/batch_delegator.rb, line 14 def call(group_id, kafka_batch) topic = Persistence::Topics.fetch(group_id, kafka_batch.topic) consumer = Persistence::Consumers.fetch(topic, kafka_batch.partition) Karafka.monitor.instrument( 'connection.batch_delegator.call', caller: self, consumer: consumer, kafka_batch: kafka_batch ) do # Due to how ruby-kafka is built, we have the metadata that is stored on the batch # level only available for batch consuming consumer.batch_metadata = Params::Builders::BatchMetadata.from_kafka_batch( kafka_batch, topic ) kafka_messages = kafka_batch.messages # Depending on a case (persisted or not) we might use new consumer instance per # each batch, or use the same one for all of them (for implementing buffering, etc.) if topic.batch_consuming consumer.params_batch = Params::Builders::ParamsBatch.from_kafka_messages( kafka_messages, topic ) consumer.call else kafka_messages.each do |kafka_message| consumer.params_batch = Params::Builders::ParamsBatch.from_kafka_messages( [kafka_message], topic ) consumer.call end end end end