module Chewy::Index::Import::ClassMethods
Public Instance Methods
Wraps elasticsearch API bulk method, adds additional features like ‘bulk_size` and `suffix`.
@see github.com/elastic/elasticsearch-ruby/blob/master/elasticsearch-api/lib/elasticsearch/api/actions/bulk.rb @see Chewy::Index::Import::BulkRequest
@param options [Hash{Symbol => Object}] besides specific import options, it accepts all the options suitable for the bulk API call like ‘refresh` or `timeout` @option options [String] suffix bulk API chunk size in bytes; if passed, the request is performed several times for each chunk, empty by default @option options [Integer] bulk_size bulk API chunk size in bytes; if passed, the request is performed several times for each chunk, empty by default @option options [Array<Hash>] body elasticsearch API bulk method body @return [Hash] tricky transposed errors hash, empty if everything is fine
# File lib/chewy/index/import.rb, line 104 def bulk(**options) error_items = BulkRequest.new(self, **options).perform(options[:body]) Chewy.wait_for_status payload_errors(error_items) end
Composes a single document from the passed object. Uses either witchcraft or normal composing under the hood.
@param object [Object] a data source object @param crutches [Object] optional crutches object; if omitted - a crutch for the single passed object is created as a fallback @param fields [Array<Symbol>] and array of fields to restrict the generated document @return [Hash] a JSON-ready hash
# File lib/chewy/index/import.rb, line 118 def compose(object, crutches = nil, fields: []) crutches ||= Chewy::Index::Crutch::Crutches.new self, [object] if witchcraft? && root.children.present? cauldron(fields: fields).brew(object, crutches) else root.compose(object, crutches, fields: fields) end end
@!method import(*collection, **options) Basically, one of the main methods for an index. Performs any objects import to the index. Does all the objects handling routines. Performs document import by utilizing bulk API. Bulk size and objects batch size are controlled by the corresponding options.
It accepts ORM/ODM objects, PORO, hashes, ids which are used by adapter to fetch objects from the source depending on the used adapter. It destroys passed objects from the index if they are not in the default scope or marked for destruction.
It handles parent-child relationships with a join field reindexing children when the parent is reindexed.
Performs journaling if enabled: it stores all the ids of the imported objects to a specialized index. It is possible to replay particular import later to restore the data consistency.
Performs partial index update using ‘update` bulk action if any `fields` are specified. Note that if document doesn’t exist yet, an error will be raised by ES, but import catches this an errors and performs full indexing for the corresponding documents. This feature can be disabled by setting ‘update_failover` to `false`.
Utilizes ‘ActiveSupport::Notifications`, so it is possible to get imported objects later by listening to the `import_objects.chewy` queue. It is also possible to get the list of occurred errors from the payload if something went wrong.
Import
can also be run in parallel using the Parallel gem functionality.
@example
UsersIndex.import(parallel: true) # imports everything in parallel with automatic workers number UsersIndex.import(parallel: 3) # using 3 workers UsersIndex.import(parallel: {in_threads: 10}) # in 10 threads
@see github.com/elastic/elasticsearch-ruby/blob/master/elasticsearch-api/lib/elasticsearch/api/actions/bulk.rb @param collection [Array<Object>] and array or anything to import @param options [Hash{Symbol => Object}] besides specific import options, it accepts all the options suitable for the bulk API call like ‘refresh` or `timeout` @option options [String] suffix an index name suffix, used for zero-downtime reset mostly, no suffix by default @option options [Integer] bulk_size bulk API chunk size in bytes; if passed, the request is performed several times for each chunk, empty by default @option options [Integer] batch_size passed to the adapter import method, used to split imported objects in chunks, 1000 by default @option options [Boolean] direct_import skips object reloading in ORM adapter, `false` by default @option options [true, false] journal enables imported objects journaling, false by default @option options [Array<Symbol, String>] update_fields list of fields for the partial import, empty by default @option options [true, false] update_failover enables full objects reimport in cases of partial update errors, `true` by default @option options [true, Integer, Hash] parallel enables parallel import processing with the Parallel gem, accepts the number of workers or any Parallel gem acceptable options @return [true, false] false in case of errors
# File lib/chewy/index/import.rb, line 75 def import(*args) intercept_import_using_strategy(*args).blank? end
@!method import!(*collection, **options) (see import
)
The only difference from {#import} is that it raises an exception in case of any import errors.
@raise [Chewy::ImportFailed] in case of errors
# File lib/chewy/index/import.rb, line 86 def import!(*args) errors = intercept_import_using_strategy(*args) raise Chewy::ImportFailed.new(self, errors) if errors.present? true end
Private Instance Methods
# File lib/chewy/index/import.rb, line 169 def empty_objects_or_scope?(objects_or_scope) if objects_or_scope.respond_to?(:empty?) objects_or_scope.empty? else objects_or_scope.blank? end end
# File lib/chewy/index/import.rb, line 177 def import_linear(objects, routine) ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload| adapter.import(*objects, routine.options) do |action_objects| routine.process(**action_objects) end routine.perform_bulk(routine.leftovers) payload[:import] = routine.stats payload[:errors] = payload_errors(routine.errors) if routine.errors.present? payload[:errors] end end
# File lib/chewy/index/import.rb, line 189 def import_parallel(objects, routine) raise "The `parallel` gem is required for parallel import, please add `gem 'parallel'` to your Gemfile" unless '::Parallel'.safe_constantize ActiveSupport::Notifications.instrument 'import_objects.chewy', index: self do |payload| batches = adapter.import_references(*objects, routine.options.slice(:batch_size)).to_a ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base) results = ::Parallel.map_with_index( batches, routine.parallel_options, &IMPORT_WORKER.curry[self, routine.options, batches.size] ) ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base) errors, import, leftovers = process_parallel_import_results(results) if leftovers.present? batches = leftovers.each_slice(routine.options[:batch_size]) results = ::Parallel.map_with_index( batches, routine.parallel_options, &LEFTOVERS_WORKER.curry[self, routine.options, batches.size] ) errors.concat(results.flatten(1)) end payload[:import] = import payload[:errors] = payload_errors(errors) if errors.present? payload[:errors] end end
# File lib/chewy/index/import.rb, line 156 def import_routine(*args) return if !args.first.nil? && empty_objects_or_scope?(args.first) routine = Routine.new(self, **args.extract_options!) routine.create_indexes! if routine.parallel_options import_parallel(args, routine) else import_linear(args, routine) end end
# File lib/chewy/index/import.rb, line 130 def intercept_import_using_strategy(*args) args_clone = args.deep_dup options = args_clone.extract_options! strategy = options.delete(:strategy) return import_routine(*args) if strategy.blank? ids = args_clone.flatten return {} if ids.blank? return {argument: {"#{strategy} supports ids only!" => ids}} unless ids.all? do |id| id.respond_to?(:to_i) end case strategy when :delayed_sidekiq begin Chewy::Strategy::DelayedSidekiq::Scheduler.new(self, ids, options).postpone {} # success. errors handling convention rescue StandardError => e {scheduler: {e.message => ids}} end else {argument: {"unsupported strategy: '#{strategy}'" => ids}} end end
# File lib/chewy/index/import.rb, line 228 def payload_errors(errors) errors.each_with_object({}) do |error, result| action = error.keys.first.to_sym item = error.values.first error = item['error'] id = item['_id'] result[action] ||= {} result[action][error] ||= [] result[action][error].push(id) end end
# File lib/chewy/index/import.rb, line 220 def process_parallel_import_results(results) results.each_with_object([[], {}, []]) do |r, (e, i, l)| e.concat(r[:errors]) i.merge!(r[:import]) { |_k, v1, v2| v1.to_i + v2.to_i } l.concat(r[:leftovers]) end end