module Deimos::KafkaSource

Represents an object which needs to inform Kafka when it is saved or bulk imported.

Constants

DEPRECATION_WARNING

Public Instance Methods

deletion_payload() click to toggle source

Payload to send after we are destroyed. @return [Hash]

# File lib/deimos/kafka_source.rb, line 52
def deletion_payload
  { payload_key: self[self.class.primary_key] }
end
send_kafka_event_on_create() click to toggle source

Send the newly created model to Kafka.

# File lib/deimos/kafka_source.rb, line 19
def send_kafka_event_on_create
  return unless self.persisted?
  return unless self.class.kafka_config[:create]

  self.class.kafka_producers.each { |p| p.send_event(self) }
end
send_kafka_event_on_destroy() click to toggle source

Send a deletion (null payload) event to Kafka.

# File lib/deimos/kafka_source.rb, line 44
def send_kafka_event_on_destroy
  return unless self.class.kafka_config[:delete]

  self.class.kafka_producers.each { |p| p.publish_list([self.deletion_payload]) }
end
send_kafka_event_on_update() click to toggle source

Send the newly updated model to Kafka.

# File lib/deimos/kafka_source.rb, line 27
def send_kafka_event_on_update
  return unless self.class.kafka_config[:update]

  producers = self.class.kafka_producers
  fields = producers.flat_map(&:watched_attributes).uniq
  fields -= ['updated_at']
  # Only send an event if a field we care about was changed.
  any_changes = fields.any? do |field|
    field_change = self.previous_changes[field]
    field_change.present? && field_change[0] != field_change[1]
  end
  return unless any_changes

  producers.each { |p| p.send_event(self) }
end