# File lib/table_sync/receiving/model/active_record.rb, line 39 def primary_keys db.execute(<<~SQL).column_values(0).map(&:to_sym) SELECT kcu.column_name FROM INFORMATION_SCHEMA.TABLES t LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc ON tc.table_catalog = t.table_catalog AND tc.table_schema = t.table_schema AND tc.table_name = t.table_name AND tc.constraint_type = 'PRIMARY KEY' LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu ON kcu.table_catalog = tc.table_catalog AND kcu.table_schema = tc.table_schema AND kcu.table_name = tc.table_name AND kcu.constraint_name = tc.constraint_name WHERE t.table_schema NOT IN ('pg_catalog', 'information_schema') AND t.table_schema = '#{schema}' AND t.table_name = '#{table}' ORDER BY kcu.ordinal_position SQL end
class TableSync::Receiving::Model::ActiveRecord
Attributes
raw_model[R]
schema[R]
table[R]
Public Class Methods
new(table_name)
click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 23 def initialize(table_name) @raw_model = Class.new(::ActiveRecord::Base) do self.table_name = table_name self.inheritance_column = nil end model_naming = ::TableSync::NamingResolver::ActiveRecord.new(table_name: table_name) @table = model_naming.table.to_sym @schema = model_naming.schema.to_sym end
Public Instance Methods
after_commit(&block)
click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 112 def after_commit(&block) db.add_transaction_record(AfterCommitWrap.new(&block)) end
columns()
click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 35 def columns raw_model.column_names.map(&:to_sym) end
destroy(data:, target_keys:, version_key:)
click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 87 def destroy(data:, target_keys:, version_key:) sanitized_data = data.map { |attr| attr.select { |key, _value| target_keys.include?(key) } } query = nil sanitized_data.each_with_index do |row, index| if index == 0 query = raw_model.lock("FOR UPDATE").where(row) else query = query.or(raw_model.lock("FOR UPDATE").where(row)) end end result = query.destroy_all.map { |x| row_to_hash(x) } if result.size > data.size raise TableSync::DestroyError.new(data: data, target_keys: target_keys, result: result) end result end
primary_keys()
click to toggle source
transaction(&block)
click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 108 def transaction(&block) ::ActiveRecord::Base.transaction(&block) end
upsert(data:, target_keys:, version_key:, default_values:)
click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 62 def upsert(data:, target_keys:, version_key:, default_values:) data.map do |datum| conditions = datum.select { |k| target_keys.include?(k) } row = raw_model.lock("FOR NO KEY UPDATE").where(conditions) if row.to_a.size > 1 raise TableSync::UpsertError.new(data: datum, target_keys: target_keys, result: row) end row = row.first if row next if datum[version_key] <= row[version_key] row.update!(datum) else create_data = default_values.merge(datum) row = raw_model.create!(create_data) end row_to_hash(row) end.compact end
Private Instance Methods
db()
click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 120 def db raw_model.connection end
row_to_hash(row)
click to toggle source
# File lib/table_sync/receiving/model/active_record.rb, line 124 def row_to_hash(row) row.attributes.transform_keys(&:to_sym) end