module Deimos::KafkaSource::ClassMethods

Public Instance Methods

import_without_validations_or_callbacks(column_names, array_of_attributes, options={}) click to toggle source

This is an internal method, part of the activerecord_import gem. It's the one that actually does the importing, having already normalized the inputs (arrays, hashes, records etc.) Basically we want to first do the import, then reload the records and send them to Kafka.

Calls superclass method
# File lib/deimos/kafka_source.rb, line 83
def import_without_validations_or_callbacks(column_names,
                                            array_of_attributes,
                                            options={})
  results = super
  if !self.kafka_config[:import] || array_of_attributes.empty?
    return results
  end

  # This will contain an array of hashes, where each hash is the actual
  # attribute hash that created the object.
  array_of_hashes = []
  array_of_attributes.each do |array|
    array_of_hashes << column_names.zip(array).to_h.with_indifferent_access
  end
  hashes_with_id, hashes_without_id = array_of_hashes.partition { |arr| arr[:id].present? }

  self.kafka_producers.each { |p| p.send_events(hashes_with_id) }

  if hashes_without_id.any?
    if options[:on_duplicate_key_update].present? &&
       options[:on_duplicate_key_update] != [:updated_at]
      unique_columns = column_names.map(&:to_s) -
                       options[:on_duplicate_key_update].map(&:to_s) - %w(id created_at)
      records = hashes_without_id.map do |hash|
        self.where(unique_columns.map { |c| [c, hash[c]] }.to_h).first
      end
      self.kafka_producers.each { |p| p.send_events(records) }
    else
      # re-fill IDs based on what was just entered into the DB.
      last_id = if self.connection.adapter_name.downcase =~ /sqlite/
                  self.connection.select_value('select last_insert_rowid()') -
                    hashes_without_id.size + 1
                else # mysql
                  self.connection.select_value('select LAST_INSERT_ID()')
                end
      hashes_without_id.each_with_index do |attrs, i|
        attrs[:id] = last_id + i
      end
      self.kafka_producers.each { |p| p.send_events(hashes_without_id) }
    end
  end
  results
end
kafka_config() click to toggle source

@return [Hash]

# File lib/deimos/kafka_source.rb, line 59
def kafka_config
  {
    update: true,
    delete: true,
    import: true,
    create: true
  }
end
kafka_producers() click to toggle source

@return [Array<Deimos::ActiveRecordProducer>] the producers to run.

# File lib/deimos/kafka_source.rb, line 69
def kafka_producers
  if self.respond_to?(:kafka_producer)
    Deimos.config.logger.warn(message: DEPRECATION_WARNING)
    return [self.kafka_producer]
  end

  raise NotImplementedError
end