class Fluent::Plugin::ElasticsearchOutput

Constants

DEFAULT_BUFFER_TYPE
DEFAULT_ELASTICSEARCH_VERSION
DEFAULT_RELOAD_AFTER
DEFAULT_TYPE_NAME
DEFAULT_TYPE_NAME_ES_7x
RequestInfo
TARGET_BULK_BYTES

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elasticsearch2.rb, line 157
def initialize
  super
end

Public Instance Methods

append_record_to_messages(op, meta, header, record, msgs) click to toggle source

append_record_to_messages adds a record to the bulk message payload to be submitted to Elasticsearch. Records that do not include '_id' field are skipped when 'write_operation' is configured for 'create' or 'update'

returns 'true' if record was appended to the bulk message

and 'false' otherwise
# File lib/fluent/plugin/out_elasticsearch2.rb, line 510
def append_record_to_messages(op, meta, header, record, msgs)
  case op
  when UPDATE_OP, UPSERT_OP
    if meta.has_key?(ID_FIELD)
      header[UPDATE_OP] = meta
      msgs << @dump_proc.call(header) << BODY_DELIMITER
      msgs << @dump_proc.call(update_body(record, op)) << BODY_DELIMITER
      return true
    end
  when CREATE_OP
    if meta.has_key?(ID_FIELD)
      header[CREATE_OP] = meta
      msgs << @dump_proc.call(header) << BODY_DELIMITER
      msgs << @dump_proc.call(record) << BODY_DELIMITER
      return true
    end
  when INDEX_OP
    header[INDEX_OP] = meta
    msgs << @dump_proc.call(header) << BODY_DELIMITER
    msgs << @dump_proc.call(record) << BODY_DELIMITER
    return true
  end
  return false
end
backend_options() click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 328
def backend_options
  case @http_backend
  when :excon
    { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
  when :typhoeus
    require 'typhoeus'
    { sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
  end
rescue LoadError => ex
  log.error_backtrace(ex.backtrace)
  raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}"
end
client(host = nil) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 415
def client(host = nil)
  # check here to see if we already have a client connection for the given host
  connection_options = get_connection_options(host)

  @_es = nil unless is_existing_connection(connection_options[:hosts])

  @_es ||= begin
    @current_config = connection_options[:hosts].clone
    adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
    local_reload_connections = @reload_connections
    if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
      local_reload_connections = @reload_after
    end
    headers = { 'Content-Type' => @content_type.to_s }.merge(@custom_headers)
    transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(connection_options.merge(
                                                                        options: {
                                                                          reload_connections: local_reload_connections,
                                                                          reload_on_failure: @reload_on_failure,
                                                                          resurrect_after: @resurrect_after,
                                                                          logger: @transport_logger,
                                                                          transport_options: {
                                                                            headers: headers,
                                                                            request: { timeout: @request_timeout },
                                                                            ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
                                                                          },
                                                                          http: {
                                                                            user: @user,
                                                                            password: @password
                                                                          },
                                                                          sniffer_class: @sniffer_class,
                                                                          serializer_class: @serializer_class,
                                                                        }), &adapter_conf)
    Elasticsearch::Client.new transport: transport
  end
end
client_library_version() click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 346
def client_library_version
  Elasticsearch::VERSION
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elasticsearch2.rb, line 161
    def configure(conf)
      compat_parameters_convert(conf, :buffer)

      super
      raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag

      @time_parser = create_time_parser
      @backend_options = backend_options

      if @remove_keys
        @remove_keys = @remove_keys.split(/\s*,\s*/)
      end

      if @target_index_key && @target_index_key.is_a?(String)
        @target_index_key = @target_index_key.split '.'
      end

      if @target_type_key && @target_type_key.is_a?(String)
        @target_type_key = @target_type_key.split '.'
      end

      if @remove_keys_on_update && @remove_keys_on_update.is_a?(String)
        @remove_keys_on_update = @remove_keys_on_update.split ','
      end

      raise Fluent::ConfigError, "'max_retry_putting_template' must be greater than or equal to zero." if @max_retry_putting_template < 0
      raise Fluent::ConfigError, "'max_retry_get_es_version' must be greater than or equal to zero." if @max_retry_get_es_version < 0

      # Raise error when using host placeholders and template features at same time.
      valid_host_placeholder = placeholder?(:host_placeholder, @host)
      if valid_host_placeholder && (@template_name && @template_file || @templates)
        raise Fluent::ConfigError, "host placeholder and template installation are exclusive features."
      end

      if !Fluent::Engine.dry_run_mode
        if @template_name && @template_file
          retry_operate(@max_retry_putting_template, @fail_on_putting_template_retry_exceed) do
            if @customize_template
              if @rollover_index
                raise Fluent::ConfigError, "'deflector_alias' must be provided if 'rollover_index' is set true ." if not @deflector_alias
              end
              template_custom_install(@template_name, @template_file, @template_overwrite, @customize_template, @index_prefix, @rollover_index, @deflector_alias, @application_name, @index_date_pattern)
            else
              template_install(@template_name, @template_file, @template_overwrite)
            end
          end
        elsif @templates
          retry_operate(@max_retry_putting_template, @fail_on_putting_template_retry_exceed) do
            templates_hash_install(@templates, @template_overwrite)
          end
        end
      end

      @serializer_class = nil
      begin
        require 'oj'
        @dump_proc = Oj.method(:dump)
        if @prefer_oj_serializer
          @serializer_class = Fluent::Plugin::Serializer::Oj
          Elasticsearch::API.settings[:serializer] = Fluent::Plugin::Serializer::Oj
        end
      rescue LoadError
        @dump_proc = Yajl.method(:dump)
      end

      raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?

      if @user && m = @user.match(/%{(?<user>.*)}/)
        @user = URI.encode_www_form_component(m["user"])
      end
      if @password && m = @password.match(/%{(?<password>.*)}/)
        @password = URI.encode_www_form_component(m["password"])
      end

      @transport_logger = nil
      if @with_transporter_log
        @transport_logger = log
        log_level = conf['@log_level'] || conf['log_level']
        log.warn "Consider to specify log_level with @log_level." unless log_level
      end
      # Specify @sniffer_class before calling #client.
      # #detect_es_major_version uses #client.
      @sniffer_class = nil
      begin
        @sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
      rescue Exception => ex
        raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
      end

      @last_seen_major_version =
        if @verify_es_version_at_startup && !Fluent::Engine.dry_run_mode
          retry_operate(@max_retry_get_es_version) do
            detect_es_major_version
          end
        else
          @default_elasticsearch_version
        end
      if @last_seen_major_version == 6 && @type_name != DEFAULT_TYPE_NAME_ES_7x
        log.info "Detected ES 6.x: ES 7.x will only accept `_doc` in type_name."
      end
      if @last_seen_major_version >= 7 && @type_name != DEFAULT_TYPE_NAME_ES_7x
        log.warn "Detected ES 7.x or above: `_doc` will be used as the document `_type`."
        @type_name = '_doc'.freeze
      end

      if @validate_client_version && !Fluent::Engine.dry_run_mode
        if @last_seen_major_version != client_library_version.to_i
          raise Fluent::ConfigError, <<-EOC
            Detected ES #{@last_seen_major_version} but you use ES client #{client_library_version}.
            Please consider to use #{@last_seen_major_version}.x series ES client.
          EOC
        end
      end

      if @last_seen_major_version >= 6
        case @ssl_version
        when :SSLv23, :TLSv1, :TLSv1_1
          if @scheme == :https
            log.warn "Detected ES 6.x or above and enabled insecure security:
                      You might have to specify `ssl_version TLSv1_2` in configuration."
          end
        end
      end

      if @buffer_config.flush_thread_count < 2
        log.warn "To prevent events traffic jam, you should specify 2 or more 'flush_thread_count'."
      end

      # Consider missing the prefix of "$." in nested key specifiers.
      @id_key = convert_compat_id_key(@id_key) if @id_key
      @parent_key = convert_compat_id_key(@parent_key) if @parent_key
      @routing_key = convert_compat_id_key(@routing_key) if @routing_key

      @routing_key_name = configure_routing_key_name
      @meta_config_map = create_meta_config_map
      @current_config = nil

      @ignore_exception_classes = @ignore_exceptions.map do |exception|
        unless Object.const_defined?(exception)
          log.warn "Cannot find class #{exception}. Will ignore it."

          nil
        else
          Object.const_get(exception)
        end
      end.compact

      if @bulk_message_request_threshold < 0
        class << self
          alias_method :split_request?, :split_request_size_uncheck?
        end
      else
        class << self
          alias_method :split_request?, :split_request_size_check?
        end
      end
    end
configure_routing_key_name() click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 350
def configure_routing_key_name
  if @last_seen_major_version >= 7
    'routing'
  else
    '_routing'
  end
end
connection_options_description(con_host=nil) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 495
def connection_options_description(con_host=nil)
  get_connection_options(con_host)[:hosts].map do |host_info|
    attributes = host_info.dup
    attributes[:password] = 'obfuscated' if attributes.has_key?(:password)
    attributes.inspect
  end.join(', ')
end
convert_compat_id_key(key) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 358
def convert_compat_id_key(key)
  if key.include?('.') && !key.start_with?('$[')
    key = "$.#{key}" unless key.start_with?('$.')
  end
  key
end
convert_numeric_time_into_string(numeric_time, time_key_format = "%Y-%m-%d %H:%M:%S.%N%z") click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 403
def convert_numeric_time_into_string(numeric_time, time_key_format = "%Y-%m-%d %H:%M:%S.%N%z")
  numeric_time_parser = Fluent::NumericTimeParser.new(:float)
  Time.at(numeric_time_parser.parse(numeric_time).to_r).strftime(time_key_format)
end
create_meta_config_map() click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 365
def create_meta_config_map
  result = []
  result << [record_accessor_create(@id_key), '_id'] if @id_key
  result << [record_accessor_create(@parent_key), '_parent'] if @parent_key
  result << [record_accessor_create(@routing_key), @routing_key_name] if @routing_key
  result
end
create_time_parser() click to toggle source

once fluent v0.14 is released we might be able to use Fluent::Parser::TimeParser, but it doesn't quite do what we want - if gives

sec,nsec

where as we want something we can call `strftime` on…

# File lib/fluent/plugin/out_elasticsearch2.rb, line 376
def create_time_parser
  if @time_key_format
    begin
      # Strptime doesn't support all formats, but for those it does it's
      # blazingly fast.
      strptime = Strptime.new(@time_key_format)
      Proc.new { |value|
        value = convert_numeric_time_into_string(value, @time_key_format) if value.is_a?(Numeric)
        strptime.exec(value).to_datetime
      }
    rescue
      # Can happen if Strptime doesn't recognize the format; or
      # if strptime couldn't be required (because it's not installed -- it's
      # ruby 2 only)
      Proc.new { |value|
        value = convert_numeric_time_into_string(value, @time_key_format) if value.is_a?(Numeric)
        DateTime.strptime(value, @time_key_format)
      }
    end
  else
    Proc.new { |value|
      value = convert_numeric_time_into_string(value) if value.is_a?(Numeric)
      DateTime.parse(value)
    }
  end
end
detect_es_major_version() click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 341
def detect_es_major_version
  @_es_info ||= client.info
  @_es_info["version"]["number"].to_i
end
expand_placeholders(chunk) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 575
def expand_placeholders(chunk)
  logstash_prefix = extract_placeholders(@logstash_prefix, chunk)
  index_name = extract_placeholders(@index_name, chunk)
  type_name = extract_placeholders(@type_name, chunk)
  return logstash_prefix, index_name, type_name
end
flatten_record(record, prefix=[]) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 560
def flatten_record(record, prefix=[])
  ret = {}
  if record.is_a? Hash
    record.each { |key, value|
      ret.merge! flatten_record(value, prefix + [key.to_s])
    }
  elsif record.is_a? Array
    # Don't mess with arrays, leave them unprocessed
    ret.merge!({prefix.join(@flatten_hashes_separator) => record})
  else
    return {prefix.join(@flatten_hashes_separator) => record}
  end
  ret
end
get_connection_options(con_host=nil) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 463
def get_connection_options(con_host=nil)

  hosts = if con_host || @hosts
    (con_host || @hosts).split(',').map do |host_str|
      # Support legacy hosts format host:port,host:port,host:port...
      if host_str.match(%r{^[^:]+(\:\d+)?$})
        {
          host:   host_str.split(':')[0],
          port:   (host_str.split(':')[1] || @port).to_i,
          scheme: @scheme.to_s
        }
      else
        # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
        uri = URI(get_escaped_userinfo(host_str))
        %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
          hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
          hash
        end
      end
    end.compact
  else
    [{host: @host, port: @port, scheme: @scheme.to_s}]
  end.each do |host|
    host.merge!(user: @user, password: @password) if !host[:user] && @user
    host.merge!(path: @path) if !host[:path] && @path
  end

  {
    hosts: hosts
  }
end
get_escaped_userinfo(host_str) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 451
def get_escaped_userinfo(host_str)
  if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
    m["scheme"] +
      URI.encode_www_form_component(m["user"]) +
      ':' +
      URI.encode_www_form_component(m["password"]) +
      m["path"]
  else
    host_str
  end
end
get_parent_of(record, path) click to toggle source

returns [parent, child_key] of child described by path array in record's tree returns [nil, child_key] if path doesnt exist in record

# File lib/fluent/plugin/out_elasticsearch2.rb, line 733
def get_parent_of(record, path)
  parent_object = path[0..-2].reduce(record) { |a, e| a.is_a?(Hash) ? a[e] : nil }
  [parent_object, path[-1]]
end
gzip(string) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 738
def gzip(string)
  wio = StringIO.new("w")
  w_gz = Zlib::GzipWriter.new(wio, strategy = Zlib::BEST_SPEED)
  w_gz.write(string)
  w_gz.close
  wio.string
end
is_existing_connection(host) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 781
def is_existing_connection(host)
  # check if the host provided match the current connection
  return false if @_es.nil?
  return false if @current_config.nil?
  return false if host.length != @current_config.length

  for i in 0...host.length
    if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port]
      return false
    end
  end

  return true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 582
def multi_workers_ready?
  true
end
parse_time(value, event_time, tag) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 408
def parse_time(value, event_time, tag)
  @time_parser.call(value)
rescue => e
  router.emit_error_event(@time_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @time_key_format, 'value' => value}, e)
  return Time.at(event_time).to_datetime
end
placeholder?(name, param) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 319
def placeholder?(name, param)
  begin
    placeholder_validate!(name, param)
    true
  rescue Fluent::ConfigError
    false
  end
end
process_message(tag, meta, header, time, record, extracted_values) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 652
def process_message(tag, meta, header, time, record, extracted_values)
  logstash_prefix, index_name, type_name = extracted_values

  if @flatten_hashes
    record = flatten_record(record)
  end

  dt = nil
  if @logstash_format || @include_timestamp
    if record.has_key?(TIMESTAMP_FIELD)
      rts = record[TIMESTAMP_FIELD]
      dt = parse_time(rts, time, tag)
    elsif record.has_key?(@time_key)
      rts = record[@time_key]
      dt = parse_time(rts, time, tag)
      record[TIMESTAMP_FIELD] = dt.iso8601(@time_precision) unless @time_key_exclude_timestamp
    else
      dt = Time.at(time).to_datetime
      record[TIMESTAMP_FIELD] = dt.iso8601(@time_precision)
    end
  end

  target_index_parent, target_index_child_key = @target_index_key ? get_parent_of(record, @target_index_key) : nil
  if target_index_parent && target_index_parent[target_index_child_key]
    target_index = target_index_parent.delete(target_index_child_key)
  elsif @logstash_format
    dt = dt.new_offset(0) if @utc_index
    target_index = "#{logstash_prefix}#{@logstash_prefix_separator}#{dt.strftime(@logstash_dateformat)}"
  else
    target_index = index_name
  end

  # Change target_index to lower-case since Elasticsearch doesn't
  # allow upper-case characters in index names.
  target_index = target_index.downcase
  if @include_tag_key
    record[@tag_key] = tag
  end

  target_type_parent, target_type_child_key = @target_type_key ? get_parent_of(record, @target_type_key) : nil
  if target_type_parent && target_type_parent[target_type_child_key]
    target_type = target_type_parent.delete(target_type_child_key)
    if @last_seen_major_version == 6
      log.warn "Detected ES 6.x: `@type_name` will be used as the document `_type`."
      target_type = type_name
    elsif @last_seen_major_version >= 7
      log.warn "Detected ES 7.x or above: `_doc` will be used as the document `_type`."
      target_type = '_doc'.freeze
    end
  else
    if @last_seen_major_version >= 7 && @type_name != DEFAULT_TYPE_NAME_ES_7x
      log.warn "Detected ES 7.x or above: `_doc` will be used as the document `_type`."
      target_type = '_doc'.freeze
    else
      target_type = type_name
    end
  end

  meta.clear
  meta["_index".freeze] = target_index
  meta["_type".freeze] = target_type

  if @pipeline
    meta["pipeline".freeze] = @pipeline
  end

  @meta_config_map.each do |record_accessor, meta_key|
    if raw_value = record_accessor.call(record)
      meta[meta_key] = raw_value
    end
  end

  if @remove_keys
    @remove_keys.each { |key| record.delete(key) }
  end

  return [meta, header, record]
end
remove_keys(record) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 551
def remove_keys(record)
  keys = record[@remove_keys_on_update_key] || @remove_keys_on_update || []
  record.delete(@remove_keys_on_update_key)
  return record unless keys.any?
  record = record.dup
  keys.each { |key| record.delete(key) }
  record
end
send_bulk(data, tag, chunk, bulk_message_count, extracted_values, info) click to toggle source

send_bulk given a specific bulk request, the original tag, chunk, and bulk_message_count

# File lib/fluent/plugin/out_elasticsearch2.rb, line 748
def send_bulk(data, tag, chunk, bulk_message_count, extracted_values, info)
  begin
    log.on_trace { log.trace "bulk request: #{data}" }
    if @compression
      prepared_data = gzip(data)
    else
      prepared_data = data
    end
    response = client(info.host).bulk body: prepared_data, index: info.index
    log.on_trace { log.trace "bulk response: #{response}" }

    if response['errors']
      error = Fluent::Plugin::ElasticsearchErrorHandler.new(self)
      error.handle_error(response, tag, chunk, bulk_message_count, extracted_values)
    end
  rescue RetryStreamError => e
    emit_tag = @retry_tag ? @retry_tag : tag
    router.emit_stream(emit_tag, e.retry_stream)
  rescue => e
    ignore = @ignore_exception_classes.any? { |clazz| e.class <= clazz }

    log.warn "Exception ignored in tag #{tag}: #{e.class.name} #{e.message}" if ignore

    @_es = nil if @reconnect_on_error
    @_es_info = nil if @reconnect_on_error

    raise UnrecoverableRequestFailure if ignore && @exception_backup

    # FIXME: identify unrecoverable errors and raise UnrecoverableRequestFailure instead
    raise RecoverableRequestFailure, "could not push logs to Elasticsearch cluster (#{connection_options_description(info.host)}): #{e.message}" unless ignore
  end
end
split_request?(bulk_message, info) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 640
def split_request?(bulk_message, info)
  # For safety.
end
split_request_size_check?(bulk_message, info) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 644
def split_request_size_check?(bulk_message, info)
  bulk_message[info].size > @bulk_message_request_threshold
end
split_request_size_uncheck?(bulk_message, info) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 648
def split_request_size_uncheck?(bulk_message, info)
  false
end
update_body(record, op) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 535
def update_body(record, op)
  update = remove_keys(record)
  if @suppress_doc_wrap
    return update
  end
  body = {"doc".freeze => update}
  if op == UPSERT_OP
    if update == record
      body["doc_as_upsert".freeze] = true
    else
      body[UPSERT_OP] = record
    end
  end
  body
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_elasticsearch2.rb, line 586
def write(chunk)
  bulk_message_count = Hash.new { |h,k| h[k] = 0 }
  bulk_message = Hash.new { |h,k| h[k] = '' }
  header = {}
  meta = {}

  tag = chunk.metadata.tag
  extracted_values = expand_placeholders(chunk)
  host = if @hosts
           extract_placeholders(@hosts, chunk)
         else
           extract_placeholders(@host, chunk)
         end

  chunk.msgpack_each do |time, record|
    next unless record.is_a? Hash
    begin
      meta, header, record = process_message(tag, meta, header, time, record, extracted_values)
      info = if @include_index_in_url
               RequestInfo.new(host, meta.delete("_index".freeze))
             else
               RequestInfo.new(host, nil)
             end

      if split_request?(bulk_message, info)
        bulk_message.each do |info, msgs|
          send_bulk(msgs, tag, chunk, bulk_message_count[info], extracted_values, info) unless msgs.empty?
          msgs.clear
          # Clear bulk_message_count for this info.
          bulk_message_count[info] = 0;
          next
        end
      end

      if append_record_to_messages(@write_operation, meta, header, record, bulk_message[info])
        bulk_message_count[info] += 1;
      else
        if @emit_error_for_missing_id
          raise MissingIdFieldError, "Missing '_id' field. Write operation is #{@write_operation}"
        else
          log.on_debug { log.debug("Dropping record because its missing an '_id' field and write_operation is #{@write_operation}: #{record}") }
        end
      end
    rescue => e
      router.emit_error_event(tag, time, record, e)
    end
  end

  bulk_message.each do |info, msgs|
    send_bulk(msgs, tag, chunk, bulk_message_count[info], extracted_values, info) unless msgs.empty?
    msgs.clear
  end
end