class Fluent::ElasticsearchOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elasticsearch.rb, line 34
def initialize
  super
end

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 46
def client
  @_es ||= begin
    adapter_conf = lambda {|f| f.adapter :patron }
    transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
                                                                        options: {
                                                                          reload_connections: @reload_connections,
                                                                          reload_on_failure: @reload_on_failure,
                                                                          retry_on_failure: 5,
                                                                          transport_options: {
                                                                            request: { timeout: @request_timeout }
                                                                          }
                                                                        }), &adapter_conf)
    es = Elasticsearch::Client.new transport: transport

    begin
      raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description})!" unless es.ping
    rescue Faraday::ConnectionFailed => e
      raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description})! #{e.message}"
    end

    log.info "Connection opened to Elasticsearch cluster => #{connection_options_description}"
    es
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elasticsearch.rb, line 38
def configure(conf)
  super
end
connection_options_description() click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 104
def connection_options_description
  get_connection_options[:hosts].map do |host_info|
    attributes = host_info.dup
    attributes[:password] = 'obfuscated' if attributes.has_key?(:password)
    attributes.inspect
  end.join(', ')
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 112
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
get_connection_options() click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 71
def get_connection_options
  raise "`password` must be present if `user` is present" if @user && !@password

  hosts = if @hosts
    @hosts.split(',').map do |host_str|
      # Support legacy hosts format host:port,host:port,host:port...
      if host_str.match(%r{^[^:]+(\:\d+)?$})
        {
          host:   host_str.split(':')[0],
          port:   (host_str.split(':')[1] || @port).to_i,
          scheme: @scheme
        }
      else
        # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
        uri = URI(host_str)
        %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
          hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
          hash
        end
      end
    end.compact
  else
    [{host: @host, port: @port, scheme: @scheme}]
  end.each do |host|
    host.merge!(user: @user, password: @password) if !host[:user] && @user
    host.merge!(path: @path) if !host[:path] && @path
  end

  {
    hosts: hosts
  }
end
send(data) click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 161
def send(data)
  retries = 0
  begin
    client.bulk body: data
  rescue Faraday::ConnectionFailed, Faraday::TimeoutError => e
    if retries < 2
      retries += 1
      @_es = nil
      log.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}"
      sleep 2**retries
      retry
    end
    raise ConnectionFailure, "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}"
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elasticsearch.rb, line 116
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elasticsearch.rb, line 42
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 120
def write(chunk)
  bulk_message = []

  chunk.msgpack_each do |tag, time, record|
    next unless record.is_a? Hash
    if @logstash_format
      if record.has_key?("@timestamp")
        time = Time.parse record["@timestamp"]
      else
        record.merge!({"@timestamp" => Time.at(time).to_datetime.to_s})
      end
      if @utc_index
        target_index = "#{@logstash_prefix}-#{Time.at(time).getutc.strftime("#{@logstash_dateformat}")}"
      else
        target_index = "#{@logstash_prefix}-#{Time.at(time).strftime("#{@logstash_dateformat}")}"
      end
    else
      target_index = Date.today.strftime @index_name
    end

    if @include_tag_key
      record.merge!(@tag_key => tag)
    end

    meta = { "index" => {"_index" => target_index, "_type" => type_name} }
    if @id_key && record[@id_key]
      meta['index']['_id'] = record[@id_key]
    end

    if @parent_key && record[@parent_key]
      meta['index']['_parent'] = record[@parent_key]
    end

    bulk_message << meta
    bulk_message << record
  end

  send(bulk_message) unless bulk_message.empty?
  bulk_message.clear
end