class Chewy::Index::Syncer

This class is able to find missing and outdated documents in the ES comparing ids from the data source and the ES index. Also, if `outdated_sync_field` exists in the index definition, it performs comparison of this field values for each source object and corresponding ES document. Usually, this field is `updated_at` and if its value in the source is not equal to the value in the index - this means that this document outdated and should be reindexed.

To fetch necessary data from the source it uses adapter method {Chewy::Index::Adapter::Base#import_fields}, in case when the Object adapter is used it makes sense to read corresponding documentation.

If `parallel` option is passed to the initializer - it will fetch source and index data in parallel and then perform outdated objects calculation in parallel processes. Also, further import (if required) will be performed in parallel as well.

@note

In rails 4.0 time converted to json with the precision of seconds
without milliseconds used, so outdated check is not so precise there.

ATTENTION: synchronization may be slow in case when synchronized tables
are missing compound index on primary key and `outdated_sync_field`.

@see Chewy::Index::Actions::ClassMethods#sync

Constants

DEFAULT_SYNC_BATCH_SIZE
ISO_DATETIME
OUTDATED_IDS_WORKER
SOURCE_OR_INDEX_DATA_WORKER

Public Class Methods

dates_equal(one, two) click to toggle source

Compares times with ms precision.

# File lib/chewy/index/syncer.rb, line 70
def self.dates_equal(one, two)
  [one.to_i, one.usec / 1000] == [two.to_i, two.usec / 1000]
end
new(index, parallel: nil) click to toggle source

@param index [Chewy::Index] chewy index @param parallel [true, Integer, Hash] options for parallel execution or the number of processes

# File lib/chewy/index/syncer.rb, line 76
def initialize(index, parallel: nil)
  @index = index
  @parallel = if !parallel || parallel.is_a?(Hash)
    parallel
  elsif parallel.is_a?(Integer)
    {in_processes: parallel}
  else
    {}
  end
end
typecast_date(string) click to toggle source
# File lib/chewy/index/syncer.rb, line 56
def self.typecast_date(string)
  if string.is_a?(String) && (match = ISO_DATETIME.match(string))
    microsec = (match[7].to_r * 1_000_000).to_i
    day = "#{match[1]}-#{match[2]}-#{match[3]}"
    time_with_seconds = "#{match[4]}:#{match[5]}:#{match[6]}"
    microseconds = format('%06d', microsec)
    date = "#{day}T#{time_with_seconds}.#{microseconds}+00:00"
    Time.iso8601(date)
  else
    string
  end
end

Public Instance Methods

missing_ids() click to toggle source

Finds ids of all the objects that are not indexed yet or deleted from the source already.

@return [Array<String>] an array of missing ids from both sides

# File lib/chewy/index/syncer.rb, line 101
def missing_ids
  return [] if source_data.blank?

  @missing_ids ||= begin
    source_data_ids = data_ids(source_data)
    index_data_ids = data_ids(index_data)

    (source_data_ids - index_data_ids).concat(index_data_ids - source_data_ids)
  end
end
outdated_ids() click to toggle source

If index supports outdated sync, it compares the values of the `outdated_sync_field` for each object and document in the source and index and returns the ids of entities which differ.

@see Chewy::Index::Mapping::ClassMethods#supports_outdated_sync? @return [Array<String>] an array of outdated ids

# File lib/chewy/index/syncer.rb, line 118
def outdated_ids
  return [] if source_data.blank? || index_data.blank? || !@index.supports_outdated_sync?

  @outdated_ids ||= if @parallel
    parallel_outdated_ids
  else
    linear_outdated_ids
  end
end
perform() click to toggle source

Finds all the missing and outdated ids and performs import for them.

@return [Integer, nil] the amount of missing and outdated documents reindexed, nil in case of errors

# File lib/chewy/index/syncer.rb, line 90
def perform
  ids = missing_ids | outdated_ids
  return 0 if ids.blank?

  @index.import(ids, parallel: @parallel) && ids.count
end

Private Instance Methods

data_ids(data) click to toggle source
# File lib/chewy/index/syncer.rb, line 178
def data_ids(data)
  return data unless @index.supports_outdated_sync?

  data.map(&:first)
end
fetch_index_data() click to toggle source
# File lib/chewy/index/syncer.rb, line 168
def fetch_index_data
  if @index.supports_outdated_sync?
    @index.pluck(:_id, @index.outdated_sync_field).each do |data|
      data[0] = data[0].to_s
    end
  else
    @index.pluck(:_id).map(&:to_s)
  end
end
fetch_source_data() click to toggle source
# File lib/chewy/index/syncer.rb, line 153
def fetch_source_data
  if @index.supports_outdated_sync?
    import_fields_args = {
      fields: [@index.outdated_sync_field],
      batch_size: DEFAULT_SYNC_BATCH_SIZE,
      typecast: false
    }
    @index.adapter.import_fields(import_fields_args).to_a.flatten(1).each do |data|
      data[0] = data[0].to_s
    end
  else
    @index.adapter.import_fields(batch_size: DEFAULT_SYNC_BATCH_SIZE, typecast: false).to_a.flatten(1).map(&:to_s)
  end
end
index_data() click to toggle source
# File lib/chewy/index/syncer.rb, line 134
def index_data
  @index_data ||= source_and_index_data.second
end
linear_outdated_ids() click to toggle source
# File lib/chewy/index/syncer.rb, line 184
def linear_outdated_ids
  OUTDATED_IDS_WORKER.call(outdated_sync_field_type, source_data.to_h, nil, nil, index_data)
end
outdated_sync_field_type() click to toggle source
# File lib/chewy/index/syncer.rb, line 207
def outdated_sync_field_type
  return @outdated_sync_field_type if instance_variable_defined?(:@outdated_sync_field_type)
  return unless @index.outdated_sync_field

  mappings = @index.client.indices.get_mapping(index: @index.index_name).values.first.fetch('mappings', {})

  @outdated_sync_field_type = mappings
    .fetch('properties', {})
    .fetch(@index.outdated_sync_field.to_s, {})['type']
rescue Elasticsearch::Transport::Transport::Errors::NotFound
  nil
end
parallel_outdated_ids() click to toggle source
# File lib/chewy/index/syncer.rb, line 188
def parallel_outdated_ids
  size = processor_count.zero? ? index_data.size : (index_data.size / processor_count.to_f).ceil
  batches = index_data.each_slice(size)

  ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)
  curried_outdated_ids_worker = OUTDATED_IDS_WORKER.curry[outdated_sync_field_type, source_data.to_h, @index, batches.size]
  result = ::Parallel.map(
    batches,
    @parallel,
    &curried_outdated_ids_worker
  ).flatten(1)
  ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
  result
end
processor_count() click to toggle source
# File lib/chewy/index/syncer.rb, line 203
def processor_count
  @processor_count ||= @parallel[:in_processes] || @parallel[:in_threads] || ::Parallel.processor_count
end
source_and_index_data() click to toggle source
# File lib/chewy/index/syncer.rb, line 138
def source_and_index_data
  @source_and_index_data ||= if @parallel
    ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)
    result = ::Parallel.map(%i[source index], @parallel, &SOURCE_OR_INDEX_DATA_WORKER.curry[self, @index])
    ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
    if result.first.keys.first == :source
      [result.first.values.first, result.second.values.first]
    else
      [result.second.values.first, result.first.values.first]
    end
  else
    [fetch_source_data, fetch_index_data]
  end
end
source_data() click to toggle source
# File lib/chewy/index/syncer.rb, line 130
def source_data
  @source_data ||= source_and_index_data.first
end