class Chewy::Index::Import::Routine
This class performs the import routine for the options and objects given.
-
Create target and journal indexes if needed.
-
Iterate over all the passed objects in batches.
-
For each batch {#process} method is called:
* creates a bulk request body; * appends journal entries for the current batch to the request body; * prepends a leftovers bulk to the request body, which is calculated basing on the previous iteration errors; * performs the bulk request; * composes new leftovers bulk for the next iteration basing on the response errors if `update_failover` is true; * appends the rest of unfixable errors to the instance level errors array.
-
Perform the request for the last leftovers bulk if present using {#extract_leftovers}.
-
Return the result errors array.
At the moment, it tries to restore only from the partial document update errors in cases when the document doesn’t exist only if ‘update_failover` option is true. In order to restore, it indexes such an objects completely on the next iteration.
Constants
- BULK_OPTIONS
- DEFAULT_OPTIONS
Attributes
Public Class Methods
Basically, processes passed options, extracting bulk request specific options. @param index [Chewy::Index] chewy index @param options [Hash] import options, see {Chewy::Index::Import::ClassMethods#import}
# File lib/chewy/index/import/routine.rb, line 44 def initialize(index, **options) @index = index @options = options @options.reverse_merge!(@index._default_import_options) @options.reverse_merge!(journal: Chewy.configuration[:journal]) @options.reverse_merge!(DEFAULT_OPTIONS) @bulk_options = @options.slice(*BULK_OPTIONS) @parallel_options = @options.delete(:parallel) if @parallel_options && !@parallel_options.is_a?(Hash) @parallel_options = if @parallel_options.is_a?(Integer) {in_processes: @parallel_options} else {} end end @errors = [] @stats = {} @leftovers = [] end
Public Instance Methods
Creates the journal index and the corresponding index if necessary. @return [Object] whatever
# File lib/chewy/index/import/routine.rb, line 66 def create_indexes! Chewy::Stash::Journal.create if @options[:journal] && !Chewy.configuration[:skip_journal_creation_on_import] return if Chewy.configuration[:skip_index_creation_on_import] @index.create!(**@bulk_options.slice(:suffix)) unless @index.exists? end
Performs a bulk request for the passed body.
@param body [Array<Hash>] a standard bulk request body @return [true, false] the result of the request, true if no errors
# File lib/chewy/index/import/routine.rb, line 102 def perform_bulk(body) response = bulk.perform(body) yield response if block_given? Chewy.wait_for_status @errors.concat(response) response.blank? end
The main process method. Converts passed objects to the bulk request body, appends journal entries, performs this request and handles errors performing failover procedures if applicable.
@param index [Array<Object>] any acceptable objects for indexing @param delete [Array<Object>] any acceptable objects for deleting @return [true, false] the result of the request, true if no errors
# File lib/chewy/index/import/routine.rb, line 80 def process(index: [], delete: []) bulk_builder = BulkBuilder.new(@index, to_index: index, delete: delete, fields: @options[:update_fields]) bulk_body = bulk_builder.bulk_body if @options[:journal] journal_builder = JournalBuilder.new(@index, to_index: index, delete: delete) bulk_body.concat(journal_builder.bulk_body) end bulk_body.unshift(*flush_leftovers) perform_bulk(bulk_body) do |response| @leftovers = extract_leftovers(response, bulk_builder.index_objects_by_id) @stats[:index] = @stats[:index].to_i + index.count if index.present? @stats[:delete] = @stats[:delete].to_i + delete.count if delete.present? end end
Private Instance Methods
# File lib/chewy/index/import/routine.rb, line 133 def bulk @bulk ||= BulkRequest.new(@index, **@bulk_options) end
# File lib/chewy/index/import/routine.rb, line 118 def extract_leftovers(errors, index_objects_by_id) return [] unless @options[:update_fields].present? && @options[:update_failover] && errors.present? failed_partial_updates = errors.select do |item| item.keys.first == 'update' && item.values.first['error']['type'] == 'document_missing_exception' end failed_ids_hash = failed_partial_updates.index_by { |item| item.values.first['_id'].to_s } failed_ids_for_reimport = failed_ids_hash.keys & index_objects_by_id.keys errors_to_cleanup = failed_ids_hash.values_at(*failed_ids_for_reimport) errors_to_cleanup.each { |error| errors.delete(error) } failed_objects = index_objects_by_id.values_at(*failed_ids_for_reimport) BulkBuilder.new(@index, to_index: failed_objects).bulk_body end
# File lib/chewy/index/import/routine.rb, line 112 def flush_leftovers leftovers = @leftovers @leftovers = [] leftovers end