class Deimos::Producer
Producer
to publish messages to a given kafka topic.
Constants
- MAX_BATCH_SIZE
Public Class Methods
@return [Hash]
# File lib/deimos/producer.rb, line 61 def config @config ||= { encode_key: true, namespace: Deimos.config.producers.schema_namespace } end
@param sync [Boolean] @param force_send [Boolean] @return [Class < Deimos::Backend]
# File lib/deimos/producer.rb, line 127 def determine_backend_class(sync, force_send) backend = if force_send :kafka else Deimos.config.producers.backend end if backend == :kafka_async && sync backend = :kafka elsif backend == :kafka && sync == false backend = :kafka_async end "Deimos::Backends::#{backend.to_s.classify}".constantize end
@return [Deimos::SchemaBackends::Base]
# File lib/deimos/producer.rb, line 149 def encoder @encoder ||= Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace]) end
@return [Deimos::SchemaBackends::Base]
# File lib/deimos/producer.rb, line 155 def key_encoder @key_encoder ||= Deimos.schema_backend(schema: config[:key_schema], namespace: config[:namespace]) end
Override the default partition key (which is the payload key). @param _payload [Hash] the payload being passed into the produce method. Will include `payload_key` if it is part of the original payload. @return [String]
# File lib/deimos/producer.rb, line 84 def partition_key(_payload) nil end
Send a batch to the backend. @param backend [Class < Deimos::Backend] @param batch [Array<Deimos::Message>]
# File lib/deimos/producer.rb, line 144 def produce_batch(backend, batch) backend.publish(producer_class: self, messages: batch) end
Publish the payload to the topic. @param payload [Hash] with an optional payload_key hash key. @param topic [String] if specifying the topic
# File lib/deimos/producer.rb, line 91 def publish(payload, topic: self.topic) publish_list([payload], topic: topic) end
Publish a list of messages. @param payloads [Hash|Array<Hash>] with optional payload_key hash key. @param sync [Boolean] if given, override the default setting of whether to publish synchronously. @param force_send [Boolean] if true, ignore the configured backend and send immediately to Kafka. @param topic [String] if specifying the topic
# File lib/deimos/producer.rb, line 102 def publish_list(payloads, sync: nil, force_send: false, topic: self.topic) return if Deimos.config.kafka.seed_brokers.blank? || Deimos.config.producers.disabled || Deimos.producers_disabled?(self) raise 'Topic not specified. Please specify the topic.' if topic.blank? backend_class = determine_backend_class(sync, force_send) Deimos.instrument( 'encode_messages', producer: self, topic: topic, payloads: payloads ) do messages = Array(payloads).map { |p| Deimos::Message.new(p, self) } messages.each { |m| _process_message(m, topic) } messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch| self.produce_batch(backend_class, batch) end end end
Set the topic. @param topic [String] @return [String] the current topic if no argument given.
# File lib/deimos/producer.rb, line 71 def topic(topic=nil) if topic config[:topic] = topic return end # accessor "#{Deimos.config.producers.topic_prefix}#{config[:topic]}" end
Override this in active record producers to add non-schema fields to check for updates @return [Array<String>] fields to check for updates
# File lib/deimos/producer.rb, line 163 def watched_attributes self.encoder.schema_fields.map(&:name) end
Private Class Methods
@param key [Object] @return [String|Object]
# File lib/deimos/producer.rb, line 194 def _encode_key(key) if key.nil? return nil if config[:no_keys] # no key is fine, otherwise it's a problem raise 'No key given but a key is required! Use `key_config none: true` to avoid using keys.' end if config[:encode_key] && config[:key_field].nil? && config[:key_schema].nil? raise 'No key config given - if you are not encoding keys, please use `key_config plain: true`' end if config[:key_field] encoder.encode_key(config[:key_field], key, topic: "#{Deimos.config.producers.topic_prefix}#{config[:topic]}-key") elsif config[:key_schema] key_encoder.encode(key, topic: "#{Deimos.config.producers.topic_prefix}#{config[:topic]}-key") else key end end
@param message [Message] @param topic [String]
# File lib/deimos/producer.rb, line 171 def _process_message(message, topic) # this violates the Law of Demeter but it has to happen in a very # specific order and requires a bunch of methods on the producer # to work correctly. message.add_fields(encoder.schema_fields.map(&:name)) message.partition_key = self.partition_key(message.payload) message.key = _retrieve_key(message.payload) # need to do this before _coerce_fields because that might result # in an empty payload which is an *error* whereas this is intended. message.payload = nil if message.payload.blank? message.coerce_fields(encoder) message.encoded_key = _encode_key(message.key) message.topic = topic message.encoded_payload = if message.payload.nil? nil else encoder.encode(message.payload, topic: "#{Deimos.config.producers.topic_prefix}#{config[:topic]}-value") end end
@param payload [Hash] @return [String]
# File lib/deimos/producer.rb, line 216 def _retrieve_key(payload) key = payload.delete(:payload_key) return key if key config[:key_field] ? payload[config[:key_field]] : nil end