class TableSync::Receiving::Model::ActiveRecord

Attributes

raw_model[R]
schema[R]
table[R]

Public Class Methods

new(table_name) click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 23
def initialize(table_name)
  @raw_model = Class.new(::ActiveRecord::Base) do
    self.table_name = table_name
    self.inheritance_column = nil
  end

  model_naming = ::TableSync::NamingResolver::ActiveRecord.new(table_name: table_name)

  @table = model_naming.table.to_sym
  @schema = model_naming.schema.to_sym
end

Public Instance Methods

after_commit(&block) click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 112
def after_commit(&block)
  db.add_transaction_record(AfterCommitWrap.new(&block))
end
columns() click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 35
def columns
  raw_model.column_names.map(&:to_sym)
end
destroy(data:, target_keys:, version_key:) click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 87
def destroy(data:, target_keys:, version_key:)
  sanitized_data = data.map { |attr| attr.select { |key, _value| target_keys.include?(key) } }

  query = nil
  sanitized_data.each_with_index do |row, index|
    if index == 0
      query = raw_model.lock("FOR UPDATE").where(row)
    else
      query = query.or(raw_model.lock("FOR UPDATE").where(row))
    end
  end

  result = query.destroy_all.map { |x| row_to_hash(x) }

  if result.size > data.size
    raise TableSync::DestroyError.new(data: data, target_keys: target_keys, result: result)
  end

  result
end
primary_keys() click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 39
    def primary_keys
      db.execute(<<~SQL).column_values(0).map(&:to_sym)
        SELECT kcu.column_name
        FROM INFORMATION_SCHEMA.TABLES t
        LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
          ON tc.table_catalog = t.table_catalog
          AND tc.table_schema = t.table_schema
          AND tc.table_name = t.table_name
          AND tc.constraint_type = 'PRIMARY KEY'
        LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
          ON kcu.table_catalog = tc.table_catalog
          AND kcu.table_schema = tc.table_schema
          AND kcu.table_name = tc.table_name
          AND kcu.constraint_name = tc.constraint_name
        WHERE
          t.table_schema NOT IN ('pg_catalog', 'information_schema')
          AND t.table_schema = '#{schema}'
          AND t.table_name = '#{table}'
        ORDER BY
          kcu.ordinal_position
      SQL
    end
transaction(&block) click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 108
def transaction(&block)
  ::ActiveRecord::Base.transaction(&block)
end
upsert(data:, target_keys:, version_key:, default_values:) click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 62
def upsert(data:, target_keys:, version_key:, default_values:)
  data.map do |datum|
    conditions = datum.select { |k| target_keys.include?(k) }

    row = raw_model.lock("FOR NO KEY UPDATE").where(conditions)

    if row.to_a.size > 1
      raise TableSync::UpsertError.new(data: datum, target_keys: target_keys, result: row)
    end

    row = row.first

    if row
      next if datum[version_key] <= row[version_key]

      row.update!(datum)
    else
      create_data = default_values.merge(datum)
      row = raw_model.create!(create_data)
    end

    row_to_hash(row)
  end.compact
end

Private Instance Methods

db() click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 120
def db
  raw_model.connection
end
row_to_hash(row) click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 124
def row_to_hash(row)
  row.attributes.transform_keys(&:to_sym)
end