class Chewy::Index::Syncer
This class is able to find missing and outdated documents in the ES comparing ids from the data source and the ES index. Also, if `outdated_sync_field` exists in the index definition, it performs comparison of this field values for each source object and corresponding ES document. Usually, this field is `updated_at` and if its value in the source is not equal to the value in the index - this means that this document outdated and should be reindexed.
To fetch necessary data from the source it uses adapter method {Chewy::Index::Adapter::Base#import_fields}, in case when the Object
adapter is used it makes sense to read corresponding documentation.
If `parallel` option is passed to the initializer - it will fetch source and index data in parallel and then perform outdated objects calculation in parallel processes. Also, further import (if required) will be performed in parallel as well.
@note
In rails 4.0 time converted to json with the precision of seconds without milliseconds used, so outdated check is not so precise there. ATTENTION: synchronization may be slow in case when synchronized tables are missing compound index on primary key and `outdated_sync_field`.
Constants
- DEFAULT_SYNC_BATCH_SIZE
- ISO_DATETIME
- OUTDATED_IDS_WORKER
- SOURCE_OR_INDEX_DATA_WORKER
Public Class Methods
Compares times with ms precision.
# File lib/chewy/index/syncer.rb, line 70 def self.dates_equal(one, two) [one.to_i, one.usec / 1000] == [two.to_i, two.usec / 1000] end
@param index [Chewy::Index] chewy index @param parallel [true, Integer, Hash] options for parallel execution or the number of processes
# File lib/chewy/index/syncer.rb, line 76 def initialize(index, parallel: nil) @index = index @parallel = if !parallel || parallel.is_a?(Hash) parallel elsif parallel.is_a?(Integer) {in_processes: parallel} else {} end end
# File lib/chewy/index/syncer.rb, line 56 def self.typecast_date(string) if string.is_a?(String) && (match = ISO_DATETIME.match(string)) microsec = (match[7].to_r * 1_000_000).to_i day = "#{match[1]}-#{match[2]}-#{match[3]}" time_with_seconds = "#{match[4]}:#{match[5]}:#{match[6]}" microseconds = format('%06d', microsec) date = "#{day}T#{time_with_seconds}.#{microseconds}+00:00" Time.iso8601(date) else string end end
Public Instance Methods
Finds ids of all the objects that are not indexed yet or deleted from the source already.
@return [Array<String>] an array of missing ids from both sides
# File lib/chewy/index/syncer.rb, line 101 def missing_ids return [] if source_data.blank? @missing_ids ||= begin source_data_ids = data_ids(source_data) index_data_ids = data_ids(index_data) (source_data_ids - index_data_ids).concat(index_data_ids - source_data_ids) end end
If index supports outdated sync, it compares the values of the `outdated_sync_field` for each object and document in the source and index and returns the ids of entities which differ.
@see Chewy::Index::Mapping::ClassMethods#supports_outdated_sync?
@return [Array<String>] an array of outdated ids
# File lib/chewy/index/syncer.rb, line 118 def outdated_ids return [] if source_data.blank? || index_data.blank? || !@index.supports_outdated_sync? @outdated_ids ||= if @parallel parallel_outdated_ids else linear_outdated_ids end end
Finds all the missing and outdated ids and performs import for them.
@return [Integer, nil] the amount of missing and outdated documents reindexed, nil in case of errors
# File lib/chewy/index/syncer.rb, line 90 def perform ids = missing_ids | outdated_ids return 0 if ids.blank? @index.import(ids, parallel: @parallel) && ids.count end
Private Instance Methods
# File lib/chewy/index/syncer.rb, line 178 def data_ids(data) return data unless @index.supports_outdated_sync? data.map(&:first) end
# File lib/chewy/index/syncer.rb, line 168 def fetch_index_data if @index.supports_outdated_sync? @index.pluck(:_id, @index.outdated_sync_field).each do |data| data[0] = data[0].to_s end else @index.pluck(:_id).map(&:to_s) end end
# File lib/chewy/index/syncer.rb, line 153 def fetch_source_data if @index.supports_outdated_sync? import_fields_args = { fields: [@index.outdated_sync_field], batch_size: DEFAULT_SYNC_BATCH_SIZE, typecast: false } @index.adapter.import_fields(import_fields_args).to_a.flatten(1).each do |data| data[0] = data[0].to_s end else @index.adapter.import_fields(batch_size: DEFAULT_SYNC_BATCH_SIZE, typecast: false).to_a.flatten(1).map(&:to_s) end end
# File lib/chewy/index/syncer.rb, line 134 def index_data @index_data ||= source_and_index_data.second end
# File lib/chewy/index/syncer.rb, line 184 def linear_outdated_ids OUTDATED_IDS_WORKER.call(outdated_sync_field_type, source_data.to_h, nil, nil, index_data) end
# File lib/chewy/index/syncer.rb, line 207 def outdated_sync_field_type return @outdated_sync_field_type if instance_variable_defined?(:@outdated_sync_field_type) return unless @index.outdated_sync_field mappings = @index.client.indices.get_mapping(index: @index.index_name).values.first.fetch('mappings', {}) @outdated_sync_field_type = mappings .fetch('properties', {}) .fetch(@index.outdated_sync_field.to_s, {})['type'] rescue Elasticsearch::Transport::Transport::Errors::NotFound nil end
# File lib/chewy/index/syncer.rb, line 188 def parallel_outdated_ids size = processor_count.zero? ? index_data.size : (index_data.size / processor_count.to_f).ceil batches = index_data.each_slice(size) ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base) curried_outdated_ids_worker = OUTDATED_IDS_WORKER.curry[outdated_sync_field_type, source_data.to_h, @index, batches.size] result = ::Parallel.map( batches, @parallel, &curried_outdated_ids_worker ).flatten(1) ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base) result end
# File lib/chewy/index/syncer.rb, line 203 def processor_count @processor_count ||= @parallel[:in_processes] || @parallel[:in_threads] || ::Parallel.processor_count end
# File lib/chewy/index/syncer.rb, line 138 def source_and_index_data @source_and_index_data ||= if @parallel ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base) result = ::Parallel.map(%i[source index], @parallel, &SOURCE_OR_INDEX_DATA_WORKER.curry[self, @index]) ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base) if result.first.keys.first == :source [result.first.values.first, result.second.values.first] else [result.second.values.first, result.first.values.first] end else [fetch_source_data, fetch_index_data] end end
# File lib/chewy/index/syncer.rb, line 130 def source_data @source_data ||= source_and_index_data.first end