class LogStash::Inputs::Elasticsearch

.Compatibility Note

NOTE

Starting with Elasticsearch 5.3, there's an {ref}modules-http.html[HTTP setting] called `http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch input plugin to version 4.0.2 or higher.

Read from an Elasticsearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc. It also supports periodically scheduling lookup enrichments using a cron syntax (see `schedule` setting).

Example:

source,ruby

input {

# Read all documents from Elasticsearch matching the given query
elasticsearch {
  hosts => "localhost"
  query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
}

}

This would create an Elasticsearch query with the following format:

source,json

curl 'localhost:9200/logstash-*/_search?&scroll=1m&size=1000' -d '{

"query": {
  "match": {
    "statuscode": 200
  }
},
"sort": [ "_doc" ]

}'

Scheduling

Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).

Examples:

|========================================================== | `* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================

Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].

Attributes

client[R]

@private used by unit specs

Public Class Methods

new(params={}) click to toggle source
Calls superclass method
# File lib/logstash/inputs/elasticsearch.rb, line 195
def initialize(params={})
  super(params)

  if docinfo_target.nil?
    @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]']
  end
end

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 203
def register
  require "rufus/scheduler"

  @options = {
    :index => @index,
    :scroll => @scroll,
    :size => @size
  }
  @base_query = LogStash::Json.load(@query)
  if @slices
    @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
    @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
  end

  validate_authentication
  fill_user_password_from_cloud_auth
  fill_hosts_from_cloud_id


  transport_options = {:headers => {}}
  transport_options[:headers].merge!(setup_basic_auth(user, password))
  transport_options[:headers].merge!(setup_api_key(api_key))
  transport_options[:headers].merge!({'user-agent' => prepare_user_agent()})
  transport_options[:request_timeout] = @request_timeout_seconds unless @request_timeout_seconds.nil?
  transport_options[:connect_timeout] = @connect_timeout_seconds unless @connect_timeout_seconds.nil?
  transport_options[:socket_timeout]  = @socket_timeout_seconds  unless @socket_timeout_seconds.nil?

  hosts = setup_hosts
  ssl_options = setup_ssl

  @logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('')

  transport_options[:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('')

  @client = Elasticsearch::Client.new(
    :hosts => hosts,
    :transport_options => transport_options,
    :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore,
    :ssl => ssl_options
  )
  test_connection!
  @client
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 248
def run(output_queue)
  if @schedule
    @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
    @scheduler.cron @schedule do
      do_run(output_queue)
    end

    @scheduler.join
  else
    do_run(output_queue)
  end
end
stop() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 261
def stop
  @scheduler.stop if @scheduler
end

Private Instance Methods

clear_scroll(scroll_id) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 349
def clear_scroll(scroll_id)
  @client.clear_scroll(:body => { :scroll_id => scroll_id }) if scroll_id
rescue => e
  # ignore & log any clear_scroll errors
  logger.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class)
end
do_run(output_queue) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 267
def do_run(output_queue)
  # if configured to run a single slice, don't bother spinning up threads
  return do_run_slice(output_queue) if @slices.nil? || @slices <= 1

  logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8

  @slices.times.map do |slice_id|
    Thread.new do
      LogStash::Util::set_thread_name("#{@id}_slice_#{slice_id}")
      do_run_slice(output_queue, slice_id)
    end
  end.map(&:join)
end
do_run_slice(output_queue, slice_id=nil) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 281
def do_run_slice(output_queue, slice_id=nil)
  slice_query = @base_query
  slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil?

  slice_options = @options.merge(:body => LogStash::Json.dump(slice_query) )

  logger.info("Slice starting", slice_id: slice_id, slices: @slices) unless slice_id.nil?

  begin
    r = search_request(slice_options)

    r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
    logger.debug("Slice progress", slice_id: slice_id, slices: @slices) unless slice_id.nil?

    has_hits = r['hits']['hits'].any?
    scroll_id = r['_scroll_id']

    while has_hits && scroll_id && !stop?
      has_hits, scroll_id = process_next_scroll(output_queue, scroll_id)
      logger.debug("Slice progress", slice_id: slice_id, slices: @slices) if logger.debug? && slice_id
    end
    logger.info("Slice complete", slice_id: slice_id, slices: @slices) unless slice_id.nil?
  ensure
    clear_scroll(scroll_id)
  end
end
fill_hosts_from_cloud_id() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 432
def fill_hosts_from_cloud_id
  return unless @cloud_id

  if @hosts && !hosts_default?(@hosts)
    raise LogStash::ConfigurationError, 'Both cloud_id and hosts specified, please only use one of those.'
  end
  @hosts = parse_host_uri_from_cloud_id(@cloud_id)
end
fill_user_password_from_cloud_auth() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 425
def fill_user_password_from_cloud_auth
  return unless @cloud_auth

  @user, @password = parse_user_password_from_cloud_auth(@cloud_auth)
  params['user'], params['password'] = @user, @password
end
hosts_default?(hosts) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 364
def hosts_default?(hosts)
  hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? )
end
parse_host_uri_from_cloud_id(cloud_id) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 441
def parse_host_uri_from_cloud_id(cloud_id)
  begin # might not be available on older LS
    require 'logstash/util/cloud_setting_id'
  rescue LoadError
    raise LogStash::ConfigurationError, 'The cloud_id setting is not supported by your version of Logstash, ' +
        'please upgrade your installation (or set hosts instead).'
  end

  begin
    cloud_id = LogStash::Util::CloudSettingId.new(cloud_id) # already does append ':{port}' to host
  rescue ArgumentError => e
    raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Id/i, 'cloud_id')
  end
  cloud_uri = "#{cloud_id.elasticsearch_scheme}://#{cloud_id.elasticsearch_host}"
  LogStash::Util::SafeURI.new(cloud_uri)
end
parse_user_password_from_cloud_auth(cloud_auth) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 458
def parse_user_password_from_cloud_auth(cloud_auth)
  begin # might not be available on older LS
    require 'logstash/util/cloud_setting_auth'
  rescue LoadError
    raise LogStash::ConfigurationError, 'The cloud_auth setting is not supported by your version of Logstash, ' +
        'please upgrade your installation (or set user/password instead).'
  end

  cloud_auth = cloud_auth.value if cloud_auth.is_a?(LogStash::Util::Password)
  begin
    cloud_auth = LogStash::Util::CloudSettingAuth.new(cloud_auth)
  rescue ArgumentError => e
    raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Auth/i, 'cloud_auth')
  end
  [ cloud_auth.username, cloud_auth.password ]
end
prepare_user_agent() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 413
def prepare_user_agent
    os_name = java.lang.System.getProperty('os.name')
    os_version = java.lang.System.getProperty('os.version')
    os_arch = java.lang.System.getProperty('os.arch')
    jvm_vendor = java.lang.System.getProperty('java.vendor')
    jvm_version = java.lang.System.getProperty('java.version')

    plugin_version = Gem.loaded_specs["logstash-input-elasticsearch"].version
    # example: logstash/7.14.1 (OS=Linux-5.4.0-84-generic-amd64; JVM=AdoptOpenJDK-11.0.11) logstash-input-elasticsearch/4.10.0
    "logstash/#{LOGSTASH_VERSION} (OS=#{os_name}-#{os_version}-#{os_arch}; JVM=#{jvm_vendor}-#{jvm_version}) logstash-#{@plugin_type}-#{config_name}/#{plugin_version}"
end
process_next_scroll(output_queue, scroll_id) click to toggle source

@param output_queue [#<<] @param scroll_id [String]: a scroll id to resume @return [Array(Boolean,String)]: a tuple representing whether the response

# File lib/logstash/inputs/elasticsearch.rb, line 313
def process_next_scroll(output_queue, scroll_id)
  r = scroll_request(scroll_id)
  r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
  [r['hits']['hits'].any?, r['_scroll_id']]
rescue => e
  # this will typically be triggered by a scroll timeout
  logger.error("Scroll request error, aborting scroll", message: e.message, exception: e.class)
  # return no hits and original scroll_id so we can try to clear it
  [false, scroll_id]
end
push_hit(hit, output_queue) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 324
def push_hit(hit, output_queue)
  event = targeted_event_factory.new_event hit['_source']
  set_docinfo_fields(hit, event) if @docinfo
  decorate(event)
  output_queue << event
end
scroll_request(scroll_id) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 356
def scroll_request scroll_id
  @client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end
search_request(options) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 360
def search_request(options)
  @client.search(options)
end
set_docinfo_fields(hit, event) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 331
def set_docinfo_fields(hit, event)
  # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
  docinfo_target = event.get(@docinfo_target) || {}

  unless docinfo_target.is_a?(Hash)
    @logger.error("Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event.to_hash_with_metadata)

    # TODO: (colin) I am not sure raising is a good strategy here?
    raise Exception.new("Elasticsearch input: incompatible event")
  end

  @docinfo_fields.each do |field|
    docinfo_target[field] = hit[field]
  end

  event.set(@docinfo_target, docinfo_target)
end
setup_api_key(api_key) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 406
def setup_api_key(api_key)
  return {} unless (api_key && api_key.value)

  token = ::Base64.strict_encode64(api_key.value)
  { 'Authorization' => "ApiKey #{token}" }
end
setup_basic_auth(user, password) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 399
def setup_basic_auth(user, password)
  return {} unless user && password && password.value

  token = ::Base64.strict_encode64("#{user}:#{password.value}")
  { 'Authorization' => "Basic #{token}" }
end
setup_hosts() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 387
def setup_hosts
  @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s
  if @ssl
    @hosts.map do |h|
      host, port = h.split(":")
      { :host => host, :scheme => 'https', :port => port }
    end
  else
    @hosts
  end
end
setup_ssl() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 383
def setup_ssl
  @ssl && @ca_file ? { :ssl  => true, :ca_file => @ca_file } : {}
end
test_connection!() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 478
def test_connection!
  @client.ping
rescue Elasticsearch::UnsupportedProductError
  raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
end
validate_authentication() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 368
def validate_authentication
  authn_options = 0
  authn_options += 1 if @cloud_auth
  authn_options += 1 if (@api_key && @api_key.value)
  authn_options += 1 if (@user || (@password && @password.value))

  if authn_options > 1
    raise LogStash::ConfigurationError, 'Multiple authentication options are specified, please only use one of user/password, cloud_auth or api_key'
  end

  if @api_key && @api_key.value && @ssl != true
    raise(LogStash::ConfigurationError, "Using api_key authentication requires SSL/TLS secured communication using the `ssl => true` option")
  end
end