class InventoryRefresh::SaveCollection::Saver::ConcurrentSafeBatch

Private Instance Methods

ar_record_key(record, key) click to toggle source

Attribute accessor to ApplicationRecord object

@param record [ApplicationRecord] record @param key [String] key pointing to attribute of the record @return [Object] value of the record on the key

# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 36
def ar_record_key(record, key)
  record.public_send(key)
end
changed?(record) click to toggle source
# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 217
def changed?(record)
  return true unless inventory_collection.check_changed?

  # If object was archived before, pass it to update so it can be unarchived
  return true if record.respond_to?(:archived_at) && record.archived_at
  # Skip if nothing changed
  return false if record.changed_attributes.empty?
  # Skip if we only changed the resource_timestamp, but data stays the same
  return false if record.changed_attributes.keys == ["resource_timestamp"]

  true
end
create_records!(all_attribute_keys, batch, attributes_index, on_conflict: nil) click to toggle source

Batch inserts records using attributes_index data. With on_conflict option using :do_update, this method does atomic upsert.

@param all_attribute_keys [Array<Symbol>] Array of all columns we will be saving into each table row @param batch [Array<InventoryRefresh::InventoryObject>] Array of InventoryObject object we will be inserting into

the DB

@param attributes_index [Hash{String => Hash}] Hash of data hashes with only keys that are column names of the

models's table

@param on_conflict [Symbol, NilClass] defines behavior on conflict with unique index constraint, allowed values

are :do_update, :do_nothing, nil
# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 285
def create_records!(all_attribute_keys, batch, attributes_index, on_conflict: nil)
  indexed_inventory_objects = {}
  hashes                    = []
  create_time               = time_now
  batch.each do |index, inventory_object|
    hash = if serializable_keys?
             values_for_database!(all_attribute_keys,
                                  attributes_index[index])
           else
             attributes_index[index]
           end

    assign_attributes_for_create!(hash, create_time)

    next unless assert_referential_integrity(hash)

    hashes << hash
    # Index on Unique Columns values, so we can easily fill in the :id later
    indexed_inventory_objects[unique_index_columns.map { |x| hash[x] }] = inventory_object
  end

  return if hashes.blank?

  result = get_connection.execute(
    build_insert_query(all_attribute_keys, hashes, :on_conflict => on_conflict, :mode => :full)
  )

  # We've done upsert, so records were either created or updated. We can recognize that by checking if
  # created and updated timestamps are the same
  created_attr = "created_on" if inventory_collection.supports_column?(:created_on)
  created_attr ||= "created_at" if inventory_collection.supports_column?(:created_at)
  updated_attr = "updated_on" if inventory_collection.supports_column?(:updated_on)
  updated_attr ||= "updated_at" if inventory_collection.supports_column?(:updated_at)

  if created_attr && updated_attr
    created, updated = result.to_a.partition { |x| x[created_attr] == x[updated_attr] }
    inventory_collection.store_created_records(created)
    inventory_collection.store_updated_records(updated)
  else
    # The record doesn't have both created and updated attrs, so we'll take all as created
    inventory_collection.store_created_records(result)
  end

  if inventory_collection.dependees.present?
    # We need to get primary keys of the created objects, but only if there are dependees that would use them
    map_ids_to_inventory_objects(indexed_inventory_objects,
                                 all_attribute_keys,
                                 hashes,
                                 result,
                                 :on_conflict => on_conflict)
  end

  skeletonize_ignored_records!(indexed_inventory_objects, result, :all_unique_columns => true)
end
db_columns_index(record, pure_sql: false) click to toggle source
# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 230
def db_columns_index(record, pure_sql: false)
  # Incoming values are in SQL string form.
  # TODO(lsmola) unify this behavior with object_index_with_keys method in InventoryCollection
  # with streaming refresh? Maybe just metrics and events will not be, but those should be upsert only
  unique_index_keys_to_s.map do |attribute|
    value = if pure_sql
              record[attribute]
            else
              record_key(record, attribute)
            end

    format_value(attribute, value)
  end.join("__")
end
expand_all_attribute_keys!(all_attribute_keys) click to toggle source
# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 105
def expand_all_attribute_keys!(all_attribute_keys)
  %i(created_at updated_at created_on updated_on).each do |col|
    all_attribute_keys << col if supports_column?(col)
  end
  all_attribute_keys << :type if supports_sti?
  all_attribute_keys << :archived_at if supports_column?(:archived_at)
end
format_value(attribute, value) click to toggle source
# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 245
def format_value(attribute, value)
  if attribute == "timestamp"
    # TODO: can this be covered by @deserializable_keys?
    type = model_class.type_for_attribute(attribute)
    type.cast(value).utc.iso8601.to_s
  elsif (type = deserializable_keys[attribute.to_sym])
    type.deserialize(value).to_s
  else
    value.to_s
  end
end
load_and_update_records!(records_batch_iterator, inventory_objects_index, attributes_index, all_attribute_keys) click to toggle source

Batch updates existing records that are in the DB using attributes_index.

@param records_batch_iterator [ActiveRecord::Relation, InventoryRefresh::ApplicationRecordIterator] iterator or

relation, both responding to :find_in_batches method

@param inventory_objects_index [Hash{String => InventoryRefresh::InventoryObject}] Hash of InventoryObject objects @param attributes_index [Hash{String => Hash}] Hash of data hashes with only keys that are column names of the

models's table

@param all_attribute_keys [Array<Symbol>] Array of all columns we will be saving into each table row

# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 135
def load_and_update_records!(records_batch_iterator, inventory_objects_index, attributes_index, all_attribute_keys)
  hashes_for_update         = []
  indexed_inventory_objects = {}

  records_batch_iterator.find_in_batches(:batch_size => batch_size, :attributes_index => attributes_index) do |batch|
    update_time = time_now

    batch.each do |record|
      primary_key_value = record_key(record, primary_key)

      next unless assert_distinct_relation(primary_key_value)

      index = db_columns_index(record)

      inventory_object = inventory_objects_index.delete(index)
      hash             = attributes_index[index]

      if inventory_object
        # Record was found in the DB and sent for saving, we will be updating the DB.
        inventory_object.id = primary_key_value
        next unless assert_referential_integrity(hash)

        record_version = nil
        record_version_max = nil
        if supports_remote_data_timestamp?(all_attribute_keys) || supports_remote_data_version?(all_attribute_keys)

          version_attr, max_version_attr = if supports_remote_data_timestamp?(all_attribute_keys)
                                             [:resource_timestamp, :resource_timestamps_max]
                                           elsif supports_remote_data_version?(all_attribute_keys)
                                             [:resource_counter, :resource_counters_max]
                                           end

          record_version = record_key(record, version_attr.to_s)
          record_version_max = record_key(record, max_version_attr.to_s)
        end

        hash_for_update = if inventory_collection.use_ar_object?
                            record.assign_attributes(hash.except(:id))
                            next unless changed?(record)

                            values_for_database!(all_attribute_keys,
                                                 hash)
                          elsif serializable_keys?
                            # TODO(lsmola) hash data with current DB data to allow subset of data being sent,
                            # otherwise we would nullify the not sent attributes. Test e.g. on disks in cloud
                            values_for_database!(all_attribute_keys,
                                                 hash)
                          else
                            # TODO(lsmola) hash data with current DB data to allow subset of data being sent,
                            # otherwise we would nullify the not sent attributes. Test e.g. on disks in cloud
                            hash
                          end

        if supports_remote_data_timestamp?(all_attribute_keys) || supports_remote_data_version?(all_attribute_keys)
          next if skeletonize_or_skip_record(record_version,
                                             hash[version_attr],
                                             record_version_max,
                                             inventory_object)
        end

        assign_attributes_for_update!(hash_for_update, update_time)

        hash_for_update[:id] = primary_key_value
        indexed_inventory_objects[index] = inventory_object
        hashes_for_update << hash_for_update
      end
    end

    # Update in batches
    if hashes_for_update.size >= batch_size_for_persisting
      update_records!(all_attribute_keys, hashes_for_update, indexed_inventory_objects)

      hashes_for_update = []
      indexed_inventory_objects = {}
    end
  end

  # Update the last batch
  update_records!(all_attribute_keys, hashes_for_update, indexed_inventory_objects)
  hashes_for_update = [] # Cleanup so GC can release it sooner
end
map_ids_to_inventory_objects(indexed_inventory_objects, all_attribute_keys, hashes, result, on_conflict:) click to toggle source

Stores primary_key values of created records into associated InventoryObject objects.

@param indexed_inventory_objects [Hash{String => InventoryRefresh::InventoryObject}] inventory objects indexed

by stringified value made from db_columns

@param all_attribute_keys [Array<Symbol>] Array of all columns we will be saving into each table row @param hashes [Array<Hashes>] Array of hashes that were used for inserting of the data @param result [Array<Hashes>] Array of hashes that are a result of the batch insert query, each result

contains a primary key_value plus all columns that are a part of the unique index

@param on_conflict [Symbol, NilClass] defines behavior on conflict with unique index constraint, allowed values

are :do_update, :do_nothing, nil
# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 350
def map_ids_to_inventory_objects(indexed_inventory_objects, all_attribute_keys, hashes, result, on_conflict:)
  if on_conflict == :do_nothing
    # TODO(lsmola) is the comment below still accurate? We will update some partial rows, the actual skeletal
    # precreate will still do nothing.
    # For ON CONFLICT DO NOTHING, we need to always fetch the records plus the attribute_references. This path
    # applies only for skeletal precreate.
    inventory_collection.model_class.where(
      build_multi_selection_query(hashes)
    ).select(unique_index_columns + [:id] + attribute_references.to_a).each do |record|
      key              = unique_index_columns.map { |x| record.public_send(x) }
      inventory_object = indexed_inventory_objects[key]

      # Load also attribute_references, so lazy_find with :key pointing to skeletal reference works
      attributes = record.attributes.symbolize_keys
      attribute_references.each do |ref|
        inventory_object[ref] = attributes[ref]

        next unless (foreign_key = association_to_foreign_key_mapping[ref])
        base_class_name       = attributes[association_to_foreign_type_mapping[ref].try(:to_sym)] || association_to_base_class_mapping[ref]
        id                    = attributes[foreign_key.to_sym]
        inventory_object[ref] = InventoryRefresh::ApplicationRecordReference.new(base_class_name, id)
      end

      inventory_object.id = record.id if inventory_object
    end
  elsif !supports_remote_data_timestamp?(all_attribute_keys) || result.count == batch_size_for_persisting
    # We can use the insert query result to fetch all primary_key values, which makes this the most effective
    # path.
    result.each do |inserted_record|
      key = unique_index_columns.map do |x|
        value = inserted_record[x.to_s]
        type = deserializable_keys[x]
        type ? type.deserialize(value) : value
      end
      inventory_object    = indexed_inventory_objects[key]
      inventory_object.id = inserted_record[primary_key] if inventory_object
    end
  else
    # The remote_data_timestamp is adding a WHERE condition to ON CONFLICT UPDATE. As a result, the RETURNING
    # clause is not guaranteed to return all ids of the inserted/updated records in the result. In that case
    # we test if the number of results matches the expected batch size. Then if the counts do not match, the only
    # safe option is to query all the data from the DB, using the unique_indexes. The batch size will also not match
    # for every remainders(a last batch in a stream of batches)
    inventory_collection.model_class.where(
      build_multi_selection_query(hashes)
    ).select(unique_index_columns + [:id]).each do |inserted_record|
      key                 = unique_index_columns.map { |x| inserted_record.public_send(x) }
      inventory_object    = indexed_inventory_objects[key]
      inventory_object.id = inserted_record.id if inventory_object
    end
  end
end
mark_last_seen_at(attributes_index) click to toggle source
# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 113
def mark_last_seen_at(attributes_index)
  return unless supports_column?(:last_seen_at)
  return if attributes_index.blank?

  all_attribute_keys = [:last_seen_at]

  last_seen_at = Time.now.utc
  attributes_index.each_value { |v| v[:last_seen_at] = last_seen_at }

  query = build_partial_update_query(all_attribute_keys, attributes_index.values)

  get_connection.execute(query)
end
pure_sql_record_key(record, key) click to toggle source

Attribute accessor to Hash object

@param record [Hash] hash @param key [String] key pointing to attribute of the record @return [Object] value of the record on the key

# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 45
def pure_sql_record_key(record, key)
  record[select_keys_indexes[key]]
end
record_key(record, key) click to toggle source

Attribute accessor to ApplicationRecord object or Hash

@param record [Hash, ApplicationRecord] record or hash @param key [String] key pointing to attribute of the record @return [Object] value of the record on the key

# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 27
def record_key(record, key)
  send(record_key_method, record, key)
end
save!(association) click to toggle source

Saves the InventoryCollection

@param association [Symbol] An existing association on manager

# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 52
def save!(association)
  attributes_index        = {}
  inventory_objects_index = {}
  all_attribute_keys      = Set.new + inventory_collection.batch_extra_attributes

  inventory_collection.each do |inventory_object|
    attributes = inventory_object.class.attributes_with_keys(inventory_object.data, inventory_collection, all_attribute_keys, inventory_object)
    index      = build_stringified_reference(attributes, unique_index_keys)

    # Interesting fact: not building attributes_index and using only inventory_objects_index doesn't do much
    # of a difference, since the most objects inside are shared.
    attributes_index[index]        = attributes
    inventory_objects_index[index] = inventory_object
  end

  expand_all_attribute_keys!(all_attribute_keys)

  logger.debug("Processing #{inventory_collection} of size #{inventory_collection.size}...")

  unless inventory_collection.create_only?
    load_and_update_records!(association, inventory_objects_index, attributes_index, all_attribute_keys)
  end

  unless inventory_collection.create_only?
    inventory_collection.custom_reconnect_block&.call(inventory_collection, inventory_objects_index, attributes_index)
  end

  # Records that were not found in the DB but sent for saving, we will be creating these in the DB.
  if inventory_collection.create_allowed?
    inventory_objects_index.each_slice(batch_size_for_persisting) do |batch|
      create_records!(all_attribute_keys, batch, attributes_index, :on_conflict => :do_update)
    end

    create_or_update_partial_records(all_attribute_keys)
  end

  logger.debug("Marking :last_seen_at of #{inventory_collection} of size #{inventory_collection.size}...")

  mark_last_seen_at(attributes_index)

  # Let the GC clean this up
  inventory_objects_index = nil
  attributes_index = nil

  logger.debug("Processing #{inventory_collection}, "\
               "created=#{inventory_collection.created_records.count}, "\
               "updated=#{inventory_collection.updated_records.count}, "\
               "deleted=#{inventory_collection.deleted_records.count}...Complete")
rescue => e
  logger.error("Error when saving #{inventory_collection} with #{inventory_collection_details}. Message: #{e.message}")
  raise e
end
update_records!(all_attribute_keys, hashes, indexed_inventory_objects) click to toggle source

Batch updates existing records

@param hashes [Array<Hash>] data used for building a batch update sql query @param all_attribute_keys [Array<Symbol>] Array of all columns we will be saving into each table row

# File lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb, line 261
def update_records!(all_attribute_keys, hashes, indexed_inventory_objects)
  return if hashes.blank?

  query = build_update_query(all_attribute_keys, hashes)
  result = get_connection.execute(query)

  # We will check for timestamp clashes of full row update and we will fallback to skeletal update
  inventory_collection.store_updated_records(result)

  skeletonize_ignored_records!(indexed_inventory_objects, result)

  result
end