module Chewy::Search::Scrolling

This module contains batch requests DSL via ES scroll API. All the methods are optimized on memory consumption, they are not caching anythig, so use them when you need to do some single-run stuff on a huge amount of documents. Don't forget to tune the `scroll` parameter for long-lasting actions. All the scroll methods respect the limit value if provided.

@see www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html

Public Instance Methods

scroll_batches(batch_size: Request::DEFAULT_BATCH_SIZE, scroll: Request::DEFAULT_SCROLL) { |hits| ... } click to toggle source

Iterates through the documents of the scope in batches. Limit if overrided by the `batch_size`. There are 2 possible use-cases: with a block or without.

@param batch_size [Integer] batch size obviously, replaces `size` query parameter @param scroll [String] cursor expiration time

@overload scroll_batches(batch_size: 1000, scroll: '1m')

@example
  PlaceIndex.scroll_batches { |batch| batch.each { |hit| p hit['_id'] } }
@yieldparam batch [Array<Hash>] block is executed for each batch of hits

@overload scroll_batches(batch_size: 1000, scroll: '1m')

@example
  PlaceIndex.scroll_batches.flat_map { |batch| batch.map { |hit| hit['_id'] } }
@return [Enumerator] a standard ruby Enumerator
# File lib/chewy/search/scrolling.rb, line 27
def scroll_batches(batch_size: Request::DEFAULT_BATCH_SIZE, scroll: Request::DEFAULT_SCROLL)
  return enum_for(:scroll_batches, batch_size: batch_size, scroll: scroll) unless block_given?

  result = perform(size: batch_size, scroll: scroll)
  total = [raw_limit_value, result.fetch('hits', {}).fetch('total', {}).fetch('value', 0)].compact.min
  last_batch_size = total % batch_size
  fetched = 0
  scroll_id = nil

  loop do
    hits = result.fetch('hits', {}).fetch('hits', [])
    fetched += hits.size
    hits = hits.first(last_batch_size) if last_batch_size != 0 && fetched >= total
    yield(hits) if hits.present?
    scroll_id = result['_scroll_id']
    break if fetched >= total

    result = perform_scroll(scroll: scroll, scroll_id: scroll_id)
  end
ensure
  Chewy.client.clear_scroll(body: {scroll_id: scroll_id}) if scroll_id
end
scroll_documents(**options, &block)
Alias for: scroll_objects
scroll_hits(**options, &block) click to toggle source

@!method scroll_hits(batch_size: 1000, scroll: '1m') Iterates through the documents of the scope in batches. Yields each hit separately.

@param batch_size [Integer] batch size obviously, replaces `size` query parameter @param scroll [String] cursor expiration time

@overload scroll_hits(batch_size: 1000, scroll: '1m')

@example
  PlaceIndex.scroll_hits { |hit| p hit['_id'] }
@yieldparam hit [Hash] block is executed for each hit

@overload scroll_hits(batch_size: 1000, scroll: '1m')

@example
  PlaceIndex.scroll_hits.map { |hit| hit['_id'] }
@return [Enumerator] a standard ruby Enumerator
# File lib/chewy/search/scrolling.rb, line 65
def scroll_hits(**options, &block)
  return enum_for(:scroll_hits, **options) unless block_given?

  scroll_batches(**options).each do |batch|
    batch.each(&block)
  end
end
scroll_objects(**options, &block) click to toggle source

@!method scroll_objects(batch_size: 1000, scroll: '1m') Iterates through the documents of the scope in batches. Performs load operation for each batch and then yields each loaded ORM/ODM object. Uses {Chewy::Search::Request#load} passed options for loading.

@note If the record is not found it yields nil instead. @see Chewy::Search::Request#load @see Chewy::Search::Loader @param batch_size [Integer] batch size obviously, replaces `size` query parameter @param scroll [String] cursor expiration time

@overload scroll_objects(batch_size: 1000, scroll: '1m')

@example
  PlaceIndex.scroll_objects { |record| p record.id }
@yieldparam record [Object] block is executed for each record loaded

@overload scroll_objects(batch_size: 1000, scroll: '1m')

@example
  PlaceIndex.scroll_objects.map { |record| record.id }
@return [Enumerator] a standard ruby Enumerator
# File lib/chewy/search/scrolling.rb, line 117
def scroll_objects(**options, &block)
  return enum_for(:scroll_objects, **options) unless block_given?

  except(:source, :stored_fields, :script_fields, :docvalue_fields)
    .source(false).scroll_batches(**options).each do |batch|
      loader.load(batch).each(&block)
    end
end
Also aliased as: scroll_records, scroll_documents
scroll_records(**options, &block)
Alias for: scroll_objects
scroll_wrappers(**options) { |derive_index.build(hit)| ... } click to toggle source

@!method scroll_wrappers(batch_size: 1000, scroll: '1m') Iterates through the documents of the scope in batches. Yields each hit wrapped with {Chewy::Index}.

@param batch_size [Integer] batch size obviously, replaces `size` query parameter @param scroll [String] cursor expiration time

@overload scroll_wrappers(batch_size: 1000, scroll: '1m')

@example
  PlaceIndex.scroll_wrappers { |object| p object.id }
@yieldparam object [Chewy::Index] block is executed for each hit object

@overload scroll_wrappers(batch_size: 1000, scroll: '1m')

@example
  PlaceIndex.scroll_wrappers.map { |object| object.id }
@return [Enumerator] a standard ruby Enumerator
# File lib/chewy/search/scrolling.rb, line 89
def scroll_wrappers(**options)
  return enum_for(:scroll_wrappers, **options) unless block_given?

  scroll_hits(**options).each do |hit|
    yield loader.derive_index(hit['_index']).build(hit)
  end
end

Private Instance Methods

perform_scroll(body) click to toggle source
# File lib/chewy/search/scrolling.rb, line 130
def perform_scroll(body)
  ActiveSupport::Notifications.instrument 'search_query.chewy', notification_payload(request: body) do
    Chewy.client.scroll(body)
  end
end