module Chewy::Index::Import::ClassMethods
Public Instance Methods
Wraps elasticsearch API bulk method, adds additional features like `bulk_size` and `suffix`.
@see github.com/elastic/elasticsearch-ruby/blob/master/elasticsearch-api/lib/elasticsearch/api/actions/bulk.rb @see Chewy::Index::Import::BulkRequest
@param options [Hash{Symbol => Object}] besides specific import options, it accepts all the options suitable for the bulk API call like `refresh` or `timeout` @option options [String] suffix bulk API chunk size in bytes; if passed, the request is performed several times for each chunk, empty by default @option options [Integer] bulk_size bulk API chunk size in bytes; if passed, the request is performed several times for each chunk, empty by default @option options [Array<Hash>] body elasticsearch API bulk method body @return [Hash] tricky transposed errors hash, empty if everything is fine
# File lib/chewy/index/import.rb, line 103 def bulk(**options) error_items = BulkRequest.new(self, **options).perform(options[:body]) Chewy.wait_for_status payload_errors(error_items) end
Composes a single document from the passed object. Uses either witchcraft or normal composing under the hood.
@param object [Object] a data source object @param crutches [Object] optional crutches object; if omitted - a crutch for the single passed object is created as a fallback @param fields [Array<Symbol>] and array of fields to restrict the generated document @return [Hash] a JSON-ready hash
# File lib/chewy/index/import.rb, line 117 def compose(object, crutches = nil, fields: []) crutches ||= Chewy::Index::Crutch::Crutches.new self, [object] if witchcraft? && root.children.present? cauldron(fields: fields).brew(object, crutches) else root.compose(object, crutches, fields: fields) end end
# File lib/chewy/index/import.rb, line 75 def import(*args) import_routine(*args).blank? end
# File lib/chewy/index/import.rb, line 86 def import!(*args) errors = import_routine(*args) raise Chewy::ImportFailed.new(self, errors) if errors.present? true end
Private Instance Methods
# File lib/chewy/index/import.rb, line 142 def empty_objects_or_scope?(objects_or_scope) if objects_or_scope.respond_to?(:empty?) objects_or_scope.empty? else objects_or_scope.blank? end end
# File lib/chewy/index/import.rb, line 150 def import_linear(objects, routine) ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload| adapter.import(*objects, routine.options) do |action_objects| routine.process(**action_objects) end routine.perform_bulk(routine.leftovers) payload[:import] = routine.stats payload[:errors] = payload_errors(routine.errors) if routine.errors.present? payload[:errors] end end
# File lib/chewy/index/import.rb, line 162 def import_parallel(objects, routine) raise "The `parallel` gem is required for parallel import, please add `gem 'parallel'` to your Gemfile" unless '::Parallel'.safe_constantize ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload| batches = adapter.import_references(*objects, routine.options.slice(:batch_size)).to_a ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base) results = ::Parallel.map_with_index( batches, routine.parallel_options, &IMPORT_WORKER.curry[self, routine.options, batches.size] ) ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base) errors, import, leftovers = process_parallel_import_results(results) if leftovers.present? batches = leftovers.each_slice(routine.options[:batch_size]) results = ::Parallel.map_with_index( batches, routine.parallel_options, &LEFTOVERS_WORKER.curry[self, routine.options, batches.size] ) errors.concat(results.flatten(1)) end payload[:import] = import payload[:errors] = payload_errors(errors) if errors.present? payload[:errors] end end
# File lib/chewy/index/import.rb, line 129 def import_routine(*args) return if !args.first.nil? && empty_objects_or_scope?(args.first) routine = Routine.new(self, **args.extract_options!) routine.create_indexes! if routine.parallel_options import_parallel(args, routine) else import_linear(args, routine) end end
# File lib/chewy/index/import.rb, line 201 def payload_errors(errors) errors.each_with_object({}) do |error, result| action = error.keys.first.to_sym item = error.values.first error = item['error'] id = item['_id'] result[action] ||= {} result[action][error] ||= [] result[action][error].push(id) end end
# File lib/chewy/index/import.rb, line 193 def process_parallel_import_results(results) results.each_with_object([[], {}, []]) do |r, (e, i, l)| e.concat(r[:errors]) i.merge!(r[:import]) { |_k, v1, v2| v1.to_i + v2.to_i } l.concat(r[:leftovers]) end end