class Deimos::Producer

Producer to publish messages to a given kafka topic.

Constants

MAX_BATCH_SIZE

Public Class Methods

config() click to toggle source

@return [Hash]

# File lib/deimos/producer.rb, line 61
def config
  @config ||= {
    encode_key: true,
    namespace: Deimos.config.producers.schema_namespace
  }
end
determine_backend_class(sync, force_send) click to toggle source

@param sync [Boolean] @param force_send [Boolean] @return [Class < Deimos::Backend]

# File lib/deimos/producer.rb, line 127
def determine_backend_class(sync, force_send)
  backend = if force_send
              :kafka
            else
              Deimos.config.producers.backend
            end
  if backend == :kafka_async && sync
    backend = :kafka
  elsif backend == :kafka && sync == false
    backend = :kafka_async
  end
  "Deimos::Backends::#{backend.to_s.classify}".constantize
end
encoder() click to toggle source

@return [Deimos::SchemaBackends::Base]

# File lib/deimos/producer.rb, line 149
def encoder
  @encoder ||= Deimos.schema_backend(schema: config[:schema],
                                     namespace: config[:namespace])
end
key_encoder() click to toggle source

@return [Deimos::SchemaBackends::Base]

# File lib/deimos/producer.rb, line 155
def key_encoder
  @key_encoder ||= Deimos.schema_backend(schema: config[:key_schema],
                                         namespace: config[:namespace])
end
partition_key(_payload) click to toggle source

Override the default partition key (which is the payload key). @param _payload [Hash] the payload being passed into the produce method. Will include `payload_key` if it is part of the original payload. @return [String]

# File lib/deimos/producer.rb, line 84
def partition_key(_payload)
  nil
end
produce_batch(backend, batch) click to toggle source

Send a batch to the backend. @param backend [Class < Deimos::Backend] @param batch [Array<Deimos::Message>]

# File lib/deimos/producer.rb, line 144
def produce_batch(backend, batch)
  backend.publish(producer_class: self, messages: batch)
end
publish(payload, topic: self.topic) click to toggle source

Publish the payload to the topic. @param payload [Hash] with an optional payload_key hash key. @param topic [String] if specifying the topic

# File lib/deimos/producer.rb, line 91
def publish(payload, topic: self.topic)
  publish_list([payload], topic: topic)
end
publish_list(payloads, sync: nil, force_send: false, topic: self.topic) click to toggle source

Publish a list of messages. @param payloads [Hash|Array<Hash>] with optional payload_key hash key. @param sync [Boolean] if given, override the default setting of whether to publish synchronously. @param force_send [Boolean] if true, ignore the configured backend and send immediately to Kafka. @param topic [String] if specifying the topic

# File lib/deimos/producer.rb, line 102
def publish_list(payloads, sync: nil, force_send: false, topic: self.topic)
  return if Deimos.config.kafka.seed_brokers.blank? ||
            Deimos.config.producers.disabled ||
            Deimos.producers_disabled?(self)

  raise 'Topic not specified. Please specify the topic.' if topic.blank?

  backend_class = determine_backend_class(sync, force_send)
  Deimos.instrument(
    'encode_messages',
    producer: self,
    topic: topic,
    payloads: payloads
  ) do
    messages = Array(payloads).map { |p| Deimos::Message.new(p, self) }
    messages.each { |m| _process_message(m, topic) }
    messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch|
      self.produce_batch(backend_class, batch)
    end
  end
end
topic(topic=nil) click to toggle source

Set the topic. @param topic [String] @return [String] the current topic if no argument given.

# File lib/deimos/producer.rb, line 71
def topic(topic=nil)
  if topic
    config[:topic] = topic
    return
  end
  # accessor
  "#{Deimos.config.producers.topic_prefix}#{config[:topic]}"
end
watched_attributes() click to toggle source

Override this in active record producers to add non-schema fields to check for updates @return [Array<String>] fields to check for updates

# File lib/deimos/producer.rb, line 163
def watched_attributes
  self.encoder.schema_fields.map(&:name)
end

Private Class Methods

_encode_key(key) click to toggle source

@param key [Object] @return [String|Object]

# File lib/deimos/producer.rb, line 194
def _encode_key(key)
  if key.nil?
    return nil if config[:no_keys] # no key is fine, otherwise it's a problem

    raise 'No key given but a key is required! Use `key_config none: true` to avoid using keys.'
  end
  if config[:encode_key] && config[:key_field].nil? &&
     config[:key_schema].nil?
    raise 'No key config given - if you are not encoding keys, please use `key_config plain: true`'
  end

  if config[:key_field]
    encoder.encode_key(config[:key_field], key, topic: "#{Deimos.config.producers.topic_prefix}#{config[:topic]}-key")
  elsif config[:key_schema]
    key_encoder.encode(key, topic: "#{Deimos.config.producers.topic_prefix}#{config[:topic]}-key")
  else
    key
  end
end
_process_message(message, topic) click to toggle source

@param message [Message] @param topic [String]

# File lib/deimos/producer.rb, line 171
def _process_message(message, topic)
  # this violates the Law of Demeter but it has to happen in a very
  # specific order and requires a bunch of methods on the producer
  # to work correctly.
  message.add_fields(encoder.schema_fields.map(&:name))
  message.partition_key = self.partition_key(message.payload)
  message.key = _retrieve_key(message.payload)
  # need to do this before _coerce_fields because that might result
  # in an empty payload which is an *error* whereas this is intended.
  message.payload = nil if message.payload.blank?
  message.coerce_fields(encoder)
  message.encoded_key = _encode_key(message.key)
  message.topic = topic
  message.encoded_payload = if message.payload.nil?
                              nil
                            else
                              encoder.encode(message.payload,
                                             topic: "#{Deimos.config.producers.topic_prefix}#{config[:topic]}-value")
                            end
end
_retrieve_key(payload) click to toggle source

@param payload [Hash] @return [String]

# File lib/deimos/producer.rb, line 216
def _retrieve_key(payload)
  key = payload.delete(:payload_key)
  return key if key

  config[:key_field] ? payload[config[:key_field]] : nil
end