module Chewy::Index::Import::ClassMethods

Public Instance Methods

bulk(**options) click to toggle source

Wraps elasticsearch API bulk method, adds additional features like `bulk_size` and `suffix`.

@see github.com/elastic/elasticsearch-ruby/blob/master/elasticsearch-api/lib/elasticsearch/api/actions/bulk.rb @see Chewy::Index::Import::BulkRequest @param options [Hash{Symbol => Object}] besides specific import options, it accepts all the options suitable for the bulk API call like `refresh` or `timeout` @option options [String] suffix bulk API chunk size in bytes; if passed, the request is performed several times for each chunk, empty by default @option options [Integer] bulk_size bulk API chunk size in bytes; if passed, the request is performed several times for each chunk, empty by default @option options [Array<Hash>] body elasticsearch API bulk method body @return [Hash] tricky transposed errors hash, empty if everything is fine

# File lib/chewy/index/import.rb, line 103
def bulk(**options)
  error_items = BulkRequest.new(self, **options).perform(options[:body])
  Chewy.wait_for_status

  payload_errors(error_items)
end
compose(object, crutches = nil, fields: []) click to toggle source

Composes a single document from the passed object. Uses either witchcraft or normal composing under the hood.

@param object [Object] a data source object @param crutches [Object] optional crutches object; if omitted - a crutch for the single passed object is created as a fallback @param fields [Array<Symbol>] and array of fields to restrict the generated document @return [Hash] a JSON-ready hash

# File lib/chewy/index/import.rb, line 117
def compose(object, crutches = nil, fields: [])
  crutches ||= Chewy::Index::Crutch::Crutches.new self, [object]

  if witchcraft? && root.children.present?
    cauldron(fields: fields).brew(object, crutches)
  else
    root.compose(object, crutches, fields: fields)
  end
end
import(*args) click to toggle source
# File lib/chewy/index/import.rb, line 75
               def import(*args)
  import_routine(*args).blank?
end
import!(*args) click to toggle source
# File lib/chewy/index/import.rb, line 86
               def import!(*args)
  errors = import_routine(*args)
  raise Chewy::ImportFailed.new(self, errors) if errors.present?

  true
end

Private Instance Methods

empty_objects_or_scope?(objects_or_scope) click to toggle source
# File lib/chewy/index/import.rb, line 142
def empty_objects_or_scope?(objects_or_scope)
  if objects_or_scope.respond_to?(:empty?)
    objects_or_scope.empty?
  else
    objects_or_scope.blank?
  end
end
import_linear(objects, routine) click to toggle source
# File lib/chewy/index/import.rb, line 150
def import_linear(objects, routine)
  ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload|
    adapter.import(*objects, routine.options) do |action_objects|
      routine.process(**action_objects)
    end
    routine.perform_bulk(routine.leftovers)
    payload[:import] = routine.stats
    payload[:errors] = payload_errors(routine.errors) if routine.errors.present?
    payload[:errors]
  end
end
import_parallel(objects, routine) click to toggle source
# File lib/chewy/index/import.rb, line 162
def import_parallel(objects, routine)
  raise "The `parallel` gem is required for parallel import, please add `gem 'parallel'` to your Gemfile" unless '::Parallel'.safe_constantize

  ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload|
    batches = adapter.import_references(*objects, routine.options.slice(:batch_size)).to_a

    ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)
    results = ::Parallel.map_with_index(
      batches,
      routine.parallel_options,
      &IMPORT_WORKER.curry[self, routine.options, batches.size]
    )
    ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
    errors, import, leftovers = process_parallel_import_results(results)

    if leftovers.present?
      batches = leftovers.each_slice(routine.options[:batch_size])
      results = ::Parallel.map_with_index(
        batches,
        routine.parallel_options,
        &LEFTOVERS_WORKER.curry[self, routine.options, batches.size]
      )
      errors.concat(results.flatten(1))
    end

    payload[:import] = import
    payload[:errors] = payload_errors(errors) if errors.present?
    payload[:errors]
  end
end
import_routine(*args) click to toggle source
# File lib/chewy/index/import.rb, line 129
def import_routine(*args)
  return if !args.first.nil? && empty_objects_or_scope?(args.first)

  routine = Routine.new(self, **args.extract_options!)
  routine.create_indexes!

  if routine.parallel_options
    import_parallel(args, routine)
  else
    import_linear(args, routine)
  end
end
payload_errors(errors) click to toggle source
# File lib/chewy/index/import.rb, line 201
def payload_errors(errors)
  errors.each_with_object({}) do |error, result|
    action = error.keys.first.to_sym
    item = error.values.first
    error = item['error']
    id = item['_id']

    result[action] ||= {}
    result[action][error] ||= []
    result[action][error].push(id)
  end
end
process_parallel_import_results(results) click to toggle source
# File lib/chewy/index/import.rb, line 193
def process_parallel_import_results(results)
  results.each_with_object([[], {}, []]) do |r, (e, i, l)|
    e.concat(r[:errors])
    i.merge!(r[:import]) { |_k, v1, v2| v1.to_i + v2.to_i }
    l.concat(r[:leftovers])
  end
end