class Chewy::Index::Import::BulkRequest

Adds additional features to elasticsearch-api bulk method:

@see github.com/elastic/elasticsearch-ruby/blob/master/elasticsearch-api/lib/elasticsearch/api/actions/bulk.rb

Public Class Methods

new(index, suffix: nil, bulk_size: nil, **bulk_options) click to toggle source

@param index [Chewy::Index] an index for the request @param suffix [String] an index name optional suffix @param bulk_size [Integer] bulk size in bytes @param bulk_options [Hash] options passed to the elasticsearch-api bulk method

# File lib/chewy/index/import/bulk_request.rb, line 17
def initialize(index, suffix: nil, bulk_size: nil, **bulk_options)
  @index = index
  @suffix = suffix
  @bulk_size = bulk_size - 1.kilobyte if bulk_size # 1 kilobyte for request header and newlines
  @bulk_options = bulk_options

  raise ArgumentError, '`bulk_size` can\'t be less than 1 kilobyte' if @bulk_size && @bulk_size <= 0
end

Public Instance Methods

perform(body) click to toggle source

Performs a bulk request with the passed body, returns empty array if everything is fine and array filled with errored document entries if something went wrong.

@param body [Array<Hash>] a standard bulk request body @return [Array<Hash>] an array of bulk errors

# File lib/chewy/index/import/bulk_request.rb, line 32
def perform(body)
  return [] if body.blank?

  request_bodies(body).each_with_object([]) do |request_body, results|
    response = @index.client.bulk request_base.merge(body: request_body) if request_body.present?

    next unless response.try(:[], 'errors')

    response_items = (response.try(:[], 'items') || [])
      .select { |item| item.values.first['error'] }
    results.concat(response_items)
  end
end

Private Instance Methods

request_base() click to toggle source
# File lib/chewy/index/import/bulk_request.rb, line 48
def request_base
  @request_base ||= {
    index: @index.index_name(suffix: @suffix)
  }.merge!(@bulk_options)
end
request_bodies(body) click to toggle source
# File lib/chewy/index/import/bulk_request.rb, line 54
def request_bodies(body)
  if @bulk_size
    serializer = ::Elasticsearch::API.serializer
    pieces = body.each_with_object(['']) do |piece, result|
      operation, meta = piece.to_a.first
      data = meta.delete(:data)
      piece = serializer.dump(operation => meta)
      piece << "\n" << serializer.dump(data) if data.present?

      if result.last.bytesize + piece.bytesize > @bulk_size
        result.push(piece)
      else
        result[-1].blank? ? (result[-1] = piece) : (result[-1] << "\n" << piece)
      end
    end
    pieces.each { |piece| piece << "\n" }
  else
    [body]
  end
end