class Praxis::Mapper::IdentityMap

Attributes

config[RW]
blueprint_cache[R]
queries[R]
scope[RW]
unloaded[R]

Public Class Methods

current() click to toggle source

@return [IdentityMap] current identity map from thread-local variable

# File lib/praxis-mapper/identity_map.rb, line 32
def self.current
  map = Thread.current[:_praxis_mapper_identity_map]
  raise "current IdentityMap not set" unless map
  map
end
current=(identity_map) click to toggle source

Stores given identity map in a thread-local variable @param [IdentityMap] some identity map

# File lib/praxis-mapper/identity_map.rb, line 26
def self.current=(identity_map)
  Thread.current[:_praxis_mapper_identity_map] = identity_map
end
current?() click to toggle source

@return [Boolean] whether identity map thread-local variable has been set

# File lib/praxis-mapper/identity_map.rb, line 40
def self.current?
  Thread.current.key?(:_praxis_mapper_identity_map) && Thread.current[:_praxis_mapper_identity_map].kind_of?(Praxis::Mapper::IdentityMap)
end
new(scope={}) click to toggle source

TODO: support multiple connections

# File lib/praxis-mapper/identity_map.rb, line 72
def initialize(scope={})
  @connection_manager = ConnectionManager.new
  @scope = scope
  @selector_generator = Praxis::Mapper::SelectorGenerator.new
  clear!
end
setup!(scope={}) click to toggle source

TODO: how come scope can be set from 3 different methods?

@param scope [Hash] a set of named filters to apply in query @example {:account => [:account_id, 71], :user => [:user_id, 2]}

# File lib/praxis-mapper/identity_map.rb, line 58
def self.setup!(scope={})
  if self.current?
    if !self.current.clear?
      raise "Denied for a pre-existing condition: Identity map has been used."
    else
      self.current.scope = scope
      return self.current
    end
  else
    self.current = self.new(scope)
  end
end

Public Instance Methods

<<(record)
Alias for: add_record
_finalize!(*models) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 214
def _finalize!(*models)
  models = @staged.keys if models.empty?

  did_something = models.any? do |model|
    finalize_model!(model).any?
  end

  if did_something
    _finalize!
  else
    release
  end
end
add_record(record) click to toggle source

return the record provided (if added to the identity map) or return the corresponding record if it was already present

# File lib/praxis-mapper/identity_map.rb, line 520
def add_record(record)
  model = record.class
  record.identities.each do |identity, key|
    # FIXME: Should we be overwriting (possibly) a "nil" value from before?
    #        (due to that row not being found by a previous query)
    #        (That'd be odd since that means we tried to load that same identity)
    if (existing = @row_keys[model][identity][key])
      # FIXME: should merge record into existing to add any additional fields
      return existing
    end

    get_staged(model, identity).delete(key)
    @row_keys[model][identity][key] = record
  end

  @secondary_indexes[model].each do |key, indexed_values|
    val = if key.kind_of? Array
      key.collect { |k| record.send(k) }
    else
      record.send(key)
    end

    indexed_values[val] << record
  end

  record.identity_map = self
  @rows[model] << record
  record
end
Also aliased as: <<
add_records(records) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 478
def add_records(records)
  return [] if records.empty?

  to_stage = Hash.new do |hash,staged_model|
    hash[staged_model] = Hash.new do |identities, identity_name|
      identities[identity_name] = Set.new
    end
  end

  tracked_associations = if (query = records.first._query)
    query.tracked_associations.each do |tracked_association|
      associated_model = tracked_association[:model]
      to_stage[associated_model][:_queries] << query
    end
  else
    []
  end

  tracked_associations.each do |tracked_association|
    associated_model = tracked_association[:model]

    association_key, row_keys = stage_for!(tracked_association, records)
    row_keys.each do |row_key|
      to_stage[associated_model][association_key].add(row_key)
    end

  end

  im_records = records.collect do |record|
    add_record(record)
  end

  to_stage.each do |model_to_stage, data|
    stage(model_to_stage, data)
  end

  im_records
end
add_selectors(resource, fields) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 556
def add_selectors(resource, fields)
  @selector_generator.add(resource, fields)
end
all(model,conditions={}) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 357
def all(model,conditions={})
  return rows_for(model) if conditions.empty?

  key, values = conditions.first

  # optimize the common case of a single value
  if values.size == 1
    value = values[0]
    if @row_keys[model].has_key?(key)
      res = @row_keys[model][key][value]

      if res
        [res]
      else
        []
      end
    else
      index(model, key, value)
    end
  else
    if @row_keys[model].has_key?(key)
      values.collect do |v|
        @row_keys[model][key][v]
      end.compact
    else
      values.each_with_object(Array.new) do |v, results|
        results.push(*index(model, key, v))
      end
    end
  end
end
clear!() click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 83
def clear!
  @rows = Hash.new { |h,k| h[k] = Array.new }

  # for ex:
  #   @staged[Instance][:id] = Set.new
  # yields:
  #  {Instance => {:id => Set.new(1,2,3), :name => Set.new("George Jr.") } }
  @staged = Hash.new do |hash,model|
    hash[model] = Hash.new do |identity_hash, identity_name|
      identity_hash[identity_name] = Set.new
    end
  end

  # for ex:
  #   @row_keys["instances"][:id][1] = Object.new
  # yields:
  #   {"instances"=>{:id=>{1=>Object.new}}
  @row_keys = Hash.new do |row_hash,model|
    row_hash[model] = Hash.new do |primary_keys, key_name|
      primary_keys[key_name] = Hash.new
    end
  end

  @queries = Hash.new { |h,k| h[k] = Set.new }

  # see how it feels to store blueprints here
  # for ex:
  #   @blueprints[User][some_object] = User.new(some_object)
  @blueprint_cache = Hash.new do |cache,blueprint_class|
    cache[blueprint_class] = Hash.new
  end

  # TODO: rework this so it's a hash with default values and simplify #index
  @secondary_indexes = Hash.new do |hash, model|
    hash[model] = Hash.new
  end
end
clear?() click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 45
def clear?
  @rows.empty? &&
    @staged.empty? &&
    @row_keys.empty? &&
    @queries.empty?
end
connection(name) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 417
def connection(name)
  @connection_manager.checkout(name)
end
extract_keys(field, records) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 421
def extract_keys(field, records)
  row_keys = Set.new
  if field.kind_of?(Array) # composite identities
    records.each do |record|
      row_key = field.collect { |col| record.send(col) }
      row_keys << row_key unless row_key.include?(nil)
    end
  else
    row_keys.merge records.collect(&field).compact
  end
  row_keys
end
finalize!(*models_and_opts) click to toggle source

Last parameter in array can be a hash of objects It is implemented this way (instead of (*models, instrument: true)) because when passing in Sequel models, ruby will invoke the “.to_hash” on them, causing a “load” when trying to restructure the args

# File lib/praxis-mapper/identity_map.rb, line 194
def finalize!(*models_and_opts)

  models, instrument = if models_and_opts.last.kind_of?(::Hash)
    ins = models_and_opts.last.fetch(:instrument) do
      true
    end
    [ models_and_opts[0..-2], ins ]
  else
    [ models_and_opts, true ]
  end

  if instrument
    ActiveSupport::Notifications.instrument 'praxis.mapper.finalize' do
      _finalize!(*models)
    end
  else
    _finalize!(*models)
  end
end
finalize_model!(model, query = nil) click to toggle source

don't doc. never ever use yourself! FIXME: make private and fix specs that break?

# File lib/praxis-mapper/identity_map.rb, line 234
def finalize_model!(model, query = nil)
  staged_queries = @staged[model].delete(:_queries) || []
  staged_keys = @staged[model].keys
  non_identities = staged_keys - model.identities

  results = Set.new

  return results if @staged[model].all? { |(_key, values)| values.empty? }

  if query.nil?
    query_class = @connection_manager.repository(model.repository_name)[:query]
    query = query_class.new(self, model)
  end

  # Apply any relevant blocks passed to track in the original queries
  staged_queries.each do |staged_query|
    staged_query.track.each do |(association_name, block)|
      next unless block

      spec = staged_query.model.associations[association_name]

      if spec[:model] == model
        query.instance_eval(&block)
        if (spec[:type] == :many_to_one || spec[:type] == :array_to_many) && query.where
          file, line = block.source_location
          trace = ["#{file}:#{line}"] | caller
          raise RuntimeError, "Error finalizing model #{model.name} for association #{association_name.inspect} -- using a where clause when tracking associations of type #{spec[:type].inspect} is not supported", trace
        end
      end
    end
  end


  # process non-unique staged keys
  #   select identity (any one should do) for those keys and stage blindly
  #   load and add records.

  if non_identities.any?
    to_stage = Hash.new do |hash,identity|
      hash[identity] = Set.new
    end

    non_identities.each do |key|
      values = @staged[model].delete(key)

      rows = query.multi_get(key, values, select: model.identities, raw: true)
      rows.each do |row|
        model.identities.each do |identity|
          if identity.kind_of? Array
            to_stage[identity] << row.values_at(*identity)
          else
            to_stage[identity] << row[identity]
          end
        end
      end
    end

    self.stage(model, to_stage)
  end

  model.identities.each do |identity_name|
    values = self.get_staged(model,identity_name)
    next if values.empty?

    query.where = nil # clear out any where clause from non-identity
    records = query.multi_get(identity_name, values)

    # TODO: refactor this to better-hide queries?
    self.queries[model].add(query)

    results.merge(add_records(records))

    # add nil records for records that were not found by the multi_get
    missing_keys = self.get_staged(model,identity_name)
    missing_keys.each do |missing_key|
      @row_keys[model][identity_name][missing_key] = nil
      get_staged(model, identity_name).delete(missing_key)
    end

  end

  query.freeze

  # TODO: check whether really really did get all the records we should have....
  results.to_a
end
get(model,condition) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 390
def get(model,condition)
  key, value = condition.first

  row_by_key(model, key, value)
end
get_staged(model, key) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 397
def get_staged(model, key)
  @staged[model][key]
end
index(model, key, value) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 334
def index(model, key, value)
  unless @secondary_indexes[model].has_key? key
    @secondary_indexes[model][key] ||= Hash.new
    reindex!(model, key)
  end

  @secondary_indexes[model][key][value] ||= Array.new
end
load(model, &block) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 122
def load(model, &block)
  raise "Can't load unfinalized model #{model}" unless model.finalized?

  query_class = @connection_manager.repository(model.repository_name)[:query]
  query = query_class.new(self, model, &block)

  if query.where == :staged
    query.where = nil
    return finalize_model!(model, query)
  end

  ActiveSupport::Notifications.instrument 'praxis.mapper.load', model: model do
    records = query.execute
    im_records = add_records(records)

    # TODO: refactor this to better-hide queries?
    query.freeze
    queries[model].add(query)

    subload(model, query, records)

    im_records
  end
end
persist!() click to toggle source
# File lib/praxis-mapper/support/factory_bot.rb, line 81
def persist!
  @rows.each_with_object(Hash.new) do |(model, records), inserted|
    next unless (table = model.table_name)

    db ||= self.connection(model.repository_name)

    new_records = records.select(&:new_record)
    next if new_records.empty?
    
    db[table.to_sym].multi_insert new_records.collect(&:_data)

    new_records.each { |rec| rec.new_record = false }
    inserted[model] = new_records        
  end
end
query_statistics() click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 552
def query_statistics
  QueryStatistics.new(queries)
end
reindex!(model, key) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 344
def reindex!(model, key)
  rows_for(model).each do |row|
    val = if key.kind_of? Array
      key.collect { |k| row.send(k) }
    else
      row.send(key)
    end
    # FIXME: make this a set? or handle duplicates better
    index(model, key, val) << row
  end
end
release() click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 228
def release
  @connection_manager.release
end
row_by_key(model,key, value) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 322
def row_by_key(model,key, value)
  @row_keys[model][key].fetch(value) do
    raise UnloadedRecordException, "Did not load #{model} with #{key} = #{value.inspect}."
  end
end
rows_for(model) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 329
def rows_for(model)
  @rows[model]
end
selectors() click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 79
def selectors
  @selector_generator.selectors
end
stage(model, data) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 402
def stage(model, data)
  data.each do |key, values|
    unless values.kind_of? Enumerable
      values = [values]
    end

    # ignore rows we have already loaded... add sanity checking?
    if model.identities.include?(key)
      values.reject! { |k| @row_keys[model][key].has_key? k }
    end

    get_staged(model,key).merge(values)
  end
end
stage_array_to_many(tracked_association, records) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 455
def stage_array_to_many(tracked_association, records)
  key = tracked_association[:key]
  primary_key = tracked_association[:primary_key] || :id

  row_keys = Set.new
  records.collect(&key).each do |keys|
    row_keys.merge keys
  end

  row_keys.reject! do |row_key|
    row_key.nil? || (row_key.kind_of?(Array) && row_key.include?(nil))
  end

  [primary_key, row_keys]
end
stage_for!(spec, records) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 147
def stage_for!(spec, records)
  case spec[:type]
  when :many_to_one
    stage_many_to_one(spec, records)
  when :array_to_many
    stage_array_to_many(spec, records)
  when :one_to_many
    stage_one_to_many(spec, records)
  when :many_to_array
    stage_many_to_array(spec, records)
  end
end
stage_many_to_array(tracked_association, records) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 473
def stage_many_to_array(tracked_association, records)
  raise "not supported yet"
end
stage_many_to_one(tracked_association, records) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 435
def stage_many_to_one(tracked_association, records)
  key = tracked_association[:key]
  primary_key = tracked_association[:primary_key] || :id

  row_keys = extract_keys(key, records)

  [primary_key, row_keys]
end
stage_one_to_many(tracked_association, records) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 445
def stage_one_to_many(tracked_association, records)
  key = tracked_association[:key]
  primary_key = tracked_association[:primary_key] || :id

  row_keys = extract_keys(primary_key, records)

  [key, row_keys]
end
subload(model, query, records) click to toggle source
# File lib/praxis-mapper/identity_map.rb, line 160
def subload(model, query, records)
  query.load.each do |(association_name, block)|
    spec = model.associations.fetch(association_name)

    associated_model = spec[:model]

    key, values = stage_for!(spec, records)

    existing_records = []
    values.reject! do |value|
      if @row_keys[associated_model].has_key?(key) &&
          @row_keys[associated_model][key].has_key?(value)
        existing_records << @row_keys[associated_model][key][value]
      else
        false
      end
    end

    new_query_class = @connection_manager.repository(associated_model.repository_name)[:query]
    new_query = new_query_class.new(self,associated_model, &block)

    new_records = new_query.multi_get(key, values)

    self.queries[associated_model].add(new_query)

    add_records(new_records)

    subload(associated_model, new_query, new_records + existing_records)
  end
end