module Karafka::Params::Builders::Params

Builder for params

Public Class Methods

from_kafka_message(kafka_message, topic) click to toggle source

@param kafka_message [Kafka::FetchedMessage] message fetched from Kafka @param topic [Karafka::Routing::Topic] topic for which this message was fetched @return [Karafka::Params::Params] params object with payload and message metadata

# File lib/karafka/params/builders/params.rb, line 16
def from_kafka_message(kafka_message, topic)
  metadata = Karafka::Params::Metadata.new(
    create_time: kafka_message.create_time,
    headers: kafka_message.headers || {},
    is_control_record: kafka_message.is_control_record,
    key: kafka_message.key,
    offset: kafka_message.offset,
    deserializer: topic.deserializer,
    partition: kafka_message.partition,
    receive_time: Time.now,
    topic: topic.name
  ).freeze

  Karafka::Params::Params.new(
    kafka_message.value,
    metadata
  )
end