class Chewy::Index::Import::Routine

This class performs the import routine for the options and objects given.

  1. Create target and journal indexes if needed.

  2. Iterate over all the passed objects in batches.

  3. For each batch {#process} method is called:

* creates a bulk request body;
* appends journal entries for the current batch to the request body;
* prepends a leftovers bulk to the request body, which is calculated
  basing on the previous iteration errors;
* performs the bulk request;
* composes new leftovers bulk for the next iteration basing on the response errors if `update_failover` is true;
* appends the rest of unfixable errors to the instance level errors array.
  1. Perform the request for the last leftovers bulk if present using {#extract_leftovers}.

  2. Return the result errors array.

At the moment, it tries to restore only from the partial document update errors in cases when the document doesn't exist only if `update_failover` option is true. In order to restore, it indexes such an objects completely on the next iteration.

@see Chewy::Index::Import::ClassMethods#import

Constants

BULK_OPTIONS
DEFAULT_OPTIONS

Attributes

errors[R]
leftovers[R]
options[R]
parallel_options[R]
stats[R]

Public Class Methods

new(index, **options) click to toggle source

Basically, processes passed options, extracting bulk request specific options. @param index [Chewy::Index] chewy index @param options [Hash] import options, see {Chewy::Index::Import::ClassMethods#import}

# File lib/chewy/index/import/routine.rb, line 44
def initialize(index, **options)
  @index = index
  @options = options
  @options.reverse_merge!(@index._default_import_options)
  @options.reverse_merge!(journal: Chewy.configuration[:journal])
  @options.reverse_merge!(DEFAULT_OPTIONS)
  @bulk_options = @options.slice(*BULK_OPTIONS)
  @parallel_options = @options.delete(:parallel)
  if @parallel_options && !@parallel_options.is_a?(Hash)
    @parallel_options = if @parallel_options.is_a?(Integer)
      {in_processes: @parallel_options}
    else
      {}
    end
  end
  @errors = []
  @stats = {}
  @leftovers = []
end

Public Instance Methods

create_indexes!() click to toggle source

Creates the journal index and the corresponding index if necessary. @return [Object] whatever

# File lib/chewy/index/import/routine.rb, line 66
def create_indexes!
  Chewy::Stash::Journal.create if @options[:journal]
  return if Chewy.configuration[:skip_index_creation_on_import]

  @index.create!(**@bulk_options.slice(:suffix)) unless @index.exists?
end
perform_bulk(body) { |response| ... } click to toggle source

Performs a bulk request for the passed body.

@param body [Array<Hash>] a standard bulk request body @return [true, false] the result of the request, true if no errors

# File lib/chewy/index/import/routine.rb, line 102
def perform_bulk(body)
  response = bulk.perform(body)
  yield response if block_given?
  Chewy.wait_for_status
  @errors.concat(response)
  response.blank?
end
process(index: [], delete: []) click to toggle source

The main process method. Converts passed objects to the bulk request body, appends journal entries, performs this request and handles errors performing failover procedures if applicable.

@param index [Array<Object>] any acceptable objects for indexing @param delete [Array<Object>] any acceptable objects for deleting @return [true, false] the result of the request, true if no errors

# File lib/chewy/index/import/routine.rb, line 80
def process(index: [], delete: [])
  bulk_builder = BulkBuilder.new(@index, to_index: index, delete: delete, fields: @options[:update_fields])
  bulk_body = bulk_builder.bulk_body

  if @options[:journal]
    journal_builder = JournalBuilder.new(@index, to_index: index, delete: delete)
    bulk_body.concat(journal_builder.bulk_body)
  end

  bulk_body.unshift(*flush_leftovers)

  perform_bulk(bulk_body) do |response|
    @leftovers = extract_leftovers(response, bulk_builder.index_objects_by_id)
    @stats[:index] = @stats[:index].to_i + index.count if index.present?
    @stats[:delete] = @stats[:delete].to_i + delete.count if delete.present?
  end
end

Private Instance Methods

bulk() click to toggle source
# File lib/chewy/index/import/routine.rb, line 133
def bulk
  @bulk ||= BulkRequest.new(@index, **@bulk_options)
end
extract_leftovers(errors, index_objects_by_id) click to toggle source
# File lib/chewy/index/import/routine.rb, line 118
def extract_leftovers(errors, index_objects_by_id)
  return [] unless @options[:update_fields].present? && @options[:update_failover] && errors.present?

  failed_partial_updates = errors.select do |item|
    item.keys.first == 'update' && item.values.first['error']['type'] == 'document_missing_exception'
  end
  failed_ids_hash = failed_partial_updates.index_by { |item| item.values.first['_id'].to_s }
  failed_ids_for_reimport = failed_ids_hash.keys & index_objects_by_id.keys
  errors_to_cleanup = failed_ids_hash.values_at(*failed_ids_for_reimport)
  errors_to_cleanup.each { |error| errors.delete(error) }

  failed_objects = index_objects_by_id.values_at(*failed_ids_for_reimport)
  BulkBuilder.new(@index, to_index: failed_objects).bulk_body
end
flush_leftovers() click to toggle source
# File lib/chewy/index/import/routine.rb, line 112
def flush_leftovers
  leftovers = @leftovers
  @leftovers = []
  leftovers
end