class Fluent::Plugin::ElasticsearchErrorHandler

Attributes

bulk_message_count[RW]

Public Class Methods

new(plugin) click to toggle source
# File lib/fluent/plugin/elasticsearch_error_handler.rb, line 14
def initialize(plugin)
  @plugin = plugin
end

Public Instance Methods

handle_error(response, tag, chunk, bulk_message_count, extracted_values) click to toggle source
# File lib/fluent/plugin/elasticsearch_error_handler.rb, line 34
def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
  items = response['items']
  if items.nil? || !items.is_a?(Array)
    raise ElasticsearchVersionMismatch, "The response format was unrecognized: #{response}"
  end
  if bulk_message_count != items.length
    raise ElasticsearchSubmitMismatch, "The number of records submitted #{bulk_message_count} do not match the number returned #{items.length}. Unable to process bulk response."
  end
  retry_stream = Fluent::MultiEventStream.new
  stats = Hash.new(0)
  meta = {}
  header = {}
  chunk.msgpack_each do |time, rawrecord|
    bulk_message = ''
    next unless rawrecord.is_a? Hash
    begin
      # we need a deep copy for process_message to alter
      processrecord = Marshal.load(Marshal.dump(rawrecord))
      meta, header, record = @plugin.process_message(tag, meta, header, time, processrecord, extracted_values)
      next unless @plugin.append_record_to_messages(@plugin.write_operation, meta, header, record, bulk_message)
    rescue => e
      stats[:bad_chunk_record] += 1
      next
    end
    item = items.shift
    if item.is_a?(Hash) && item.has_key?(@plugin.write_operation)
      write_operation = @plugin.write_operation
    elsif INDEX_OP == @plugin.write_operation && item.is_a?(Hash) && item.has_key?(CREATE_OP)
      write_operation = CREATE_OP
    elsif item.nil?
      stats[:errors_nil_resp] += 1
      next
    else
      # When we don't have an expected ops field, something changed in the API
      # expected return values (ES 2.x)
      stats[:errors_bad_resp] += 1
      next
    end
    if item[write_operation].has_key?('status')
      status = item[write_operation]['status']
    else
      # When we don't have a status field, something changed in the API
      # expected return values (ES 2.x)
      stats[:errors_bad_resp] += 1
      next
    end
    case
    when [200, 201].include?(status)
      stats[:successes] += 1
    when CREATE_OP == write_operation && 409 == status
      stats[:duplicates] += 1
    when 400 == status
      stats[:bad_argument] += 1
      reason = ""
      log_es_400_reason do
        if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
          reason = " [error type]: #{item[write_operation]['error']['type']}"
        end
        if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('reason')
          reason += " [reason]: \'#{item[write_operation]['error']['reason']}\'"
        end
      end
      @plugin.router.emit_error_event(tag, time, rawrecord, ElasticsearchError.new("400 - Rejected by Elasticsearch#{reason}"))
    else
      if item[write_operation]['error'].is_a?(String)
        reason = item[write_operation]['error']
        stats[:errors_block_resp] += 1
        @plugin.router.emit_error_event(tag, time, rawrecord, ElasticsearchError.new("#{status} - #{reason}"))
        next
      elsif item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
        type = item[write_operation]['error']['type']
        stats[type] += 1
        retry_stream.add(time, rawrecord)
        if unrecoverable_error?(type)
          raise ElasticsearchRequestAbortError, "Rejected Elasticsearch due to #{type}"
        end
      else
        # When we don't have a type field, something changed in the API
        # expected return values (ES 2.x)
        stats[:errors_bad_resp] += 1
        @plugin.router.emit_error_event(tag, time, rawrecord, ElasticsearchError.new("#{status} - No error type provided in the response"))
        next
      end
      stats[type] += 1
    end
  end
  @plugin.log.on_debug do
    msg = ["Indexed (op = #{@plugin.write_operation})"]
    stats.each_pair { |key, value| msg << "#{value} #{key}" }
    @plugin.log.debug msg.join(', ')
  end
  raise Fluent::Plugin::ElasticsearchOutput::RetryStreamError.new(retry_stream) unless retry_stream.empty?
end
log_es_400_reason(&block) click to toggle source
# File lib/fluent/plugin/elasticsearch_error_handler.rb, line 26
def log_es_400_reason(&block)
  if @plugin.log_es_400_reason
    block.call
  else
    @plugin.log.on_debug(&block)
  end
end
unrecoverable_error?(type) click to toggle source
# File lib/fluent/plugin/elasticsearch_error_handler.rb, line 22
def unrecoverable_error?(type)
  unrecoverable_error_types.include?(type)
end
unrecoverable_error_types() click to toggle source
# File lib/fluent/plugin/elasticsearch_error_handler.rb, line 18
def unrecoverable_error_types
  @plugin.unrecoverable_error_types
end