class LogStash::Inputs::Elasticsearch
.Compatibility Note
- NOTE
-
¶ ↑
Starting with
Elasticsearch
5.3, there's an {ref}modules-http.html[HTTP setting] called `http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update theElasticsearch
input plugin to version 4.0.2 or higher.¶ ↑
Read from an
Elasticsearch
cluster, based on search query results. This is useful for replaying test logs, reindexing, etc. It also supports periodically scheduling lookup enrichments using a cron syntax (see `schedule` setting).Example:
- source,ruby
-
input {
# Read all documents from Elasticsearch matching the given query elasticsearch { hosts => "localhost" query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }' }
}
This would create an
Elasticsearch
query with the following format:- source,json
-
curl 'localhost:9200/logstash-*/_search?&scroll=1m&size=1000' -d '{
"query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ]
}'
Scheduling¶ ↑
Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).
Examples:
|========================================================== | `* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================
Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].
Attributes
@private used by unit specs
Public Class Methods
# File lib/logstash/inputs/elasticsearch.rb, line 195 def initialize(params={}) super(params) if docinfo_target.nil? @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]'] end end
Public Instance Methods
# File lib/logstash/inputs/elasticsearch.rb, line 203 def register require "rufus/scheduler" @options = { :index => @index, :scroll => @scroll, :size => @size } @base_query = LogStash::Json.load(@query) if @slices @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") end validate_authentication fill_user_password_from_cloud_auth fill_hosts_from_cloud_id transport_options = {:headers => {}} transport_options[:headers].merge!(setup_basic_auth(user, password)) transport_options[:headers].merge!(setup_api_key(api_key)) transport_options[:headers].merge!({'user-agent' => prepare_user_agent()}) transport_options[:request_timeout] = @request_timeout_seconds unless @request_timeout_seconds.nil? transport_options[:connect_timeout] = @connect_timeout_seconds unless @connect_timeout_seconds.nil? transport_options[:socket_timeout] = @socket_timeout_seconds unless @socket_timeout_seconds.nil? hosts = setup_hosts ssl_options = setup_ssl @logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('') transport_options[:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('') @client = Elasticsearch::Client.new( :hosts => hosts, :transport_options => transport_options, :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore, :ssl => ssl_options ) test_connection! @client end
# File lib/logstash/inputs/elasticsearch.rb, line 248 def run(output_queue) if @schedule @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) @scheduler.cron @schedule do do_run(output_queue) end @scheduler.join else do_run(output_queue) end end
# File lib/logstash/inputs/elasticsearch.rb, line 261 def stop @scheduler.stop if @scheduler end
Private Instance Methods
# File lib/logstash/inputs/elasticsearch.rb, line 349 def clear_scroll(scroll_id) @client.clear_scroll(:body => { :scroll_id => scroll_id }) if scroll_id rescue => e # ignore & log any clear_scroll errors logger.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class) end
# File lib/logstash/inputs/elasticsearch.rb, line 267 def do_run(output_queue) # if configured to run a single slice, don't bother spinning up threads return do_run_slice(output_queue) if @slices.nil? || @slices <= 1 logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8 @slices.times.map do |slice_id| Thread.new do LogStash::Util::set_thread_name("#{@id}_slice_#{slice_id}") do_run_slice(output_queue, slice_id) end end.map(&:join) end
# File lib/logstash/inputs/elasticsearch.rb, line 281 def do_run_slice(output_queue, slice_id=nil) slice_query = @base_query slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil? slice_options = @options.merge(:body => LogStash::Json.dump(slice_query) ) logger.info("Slice starting", slice_id: slice_id, slices: @slices) unless slice_id.nil? begin r = search_request(slice_options) r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } logger.debug("Slice progress", slice_id: slice_id, slices: @slices) unless slice_id.nil? has_hits = r['hits']['hits'].any? scroll_id = r['_scroll_id'] while has_hits && scroll_id && !stop? has_hits, scroll_id = process_next_scroll(output_queue, scroll_id) logger.debug("Slice progress", slice_id: slice_id, slices: @slices) if logger.debug? && slice_id end logger.info("Slice complete", slice_id: slice_id, slices: @slices) unless slice_id.nil? ensure clear_scroll(scroll_id) end end
# File lib/logstash/inputs/elasticsearch.rb, line 432 def fill_hosts_from_cloud_id return unless @cloud_id if @hosts && !hosts_default?(@hosts) raise LogStash::ConfigurationError, 'Both cloud_id and hosts specified, please only use one of those.' end @hosts = parse_host_uri_from_cloud_id(@cloud_id) end
# File lib/logstash/inputs/elasticsearch.rb, line 425 def fill_user_password_from_cloud_auth return unless @cloud_auth @user, @password = parse_user_password_from_cloud_auth(@cloud_auth) params['user'], params['password'] = @user, @password end
# File lib/logstash/inputs/elasticsearch.rb, line 364 def hosts_default?(hosts) hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? ) end
# File lib/logstash/inputs/elasticsearch.rb, line 441 def parse_host_uri_from_cloud_id(cloud_id) begin # might not be available on older LS require 'logstash/util/cloud_setting_id' rescue LoadError raise LogStash::ConfigurationError, 'The cloud_id setting is not supported by your version of Logstash, ' + 'please upgrade your installation (or set hosts instead).' end begin cloud_id = LogStash::Util::CloudSettingId.new(cloud_id) # already does append ':{port}' to host rescue ArgumentError => e raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Id/i, 'cloud_id') end cloud_uri = "#{cloud_id.elasticsearch_scheme}://#{cloud_id.elasticsearch_host}" LogStash::Util::SafeURI.new(cloud_uri) end
# File lib/logstash/inputs/elasticsearch.rb, line 458 def parse_user_password_from_cloud_auth(cloud_auth) begin # might not be available on older LS require 'logstash/util/cloud_setting_auth' rescue LoadError raise LogStash::ConfigurationError, 'The cloud_auth setting is not supported by your version of Logstash, ' + 'please upgrade your installation (or set user/password instead).' end cloud_auth = cloud_auth.value if cloud_auth.is_a?(LogStash::Util::Password) begin cloud_auth = LogStash::Util::CloudSettingAuth.new(cloud_auth) rescue ArgumentError => e raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Auth/i, 'cloud_auth') end [ cloud_auth.username, cloud_auth.password ] end
# File lib/logstash/inputs/elasticsearch.rb, line 413 def prepare_user_agent os_name = java.lang.System.getProperty('os.name') os_version = java.lang.System.getProperty('os.version') os_arch = java.lang.System.getProperty('os.arch') jvm_vendor = java.lang.System.getProperty('java.vendor') jvm_version = java.lang.System.getProperty('java.version') plugin_version = Gem.loaded_specs["logstash-input-elasticsearch"].version # example: logstash/7.14.1 (OS=Linux-5.4.0-84-generic-amd64; JVM=AdoptOpenJDK-11.0.11) logstash-input-elasticsearch/4.10.0 "logstash/#{LOGSTASH_VERSION} (OS=#{os_name}-#{os_version}-#{os_arch}; JVM=#{jvm_vendor}-#{jvm_version}) logstash-#{@plugin_type}-#{config_name}/#{plugin_version}" end
@param output_queue [#<<] @param scroll_id [String]: a scroll id to resume @return [Array(Boolean,String)]: a tuple representing whether the response
# File lib/logstash/inputs/elasticsearch.rb, line 313 def process_next_scroll(output_queue, scroll_id) r = scroll_request(scroll_id) r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } [r['hits']['hits'].any?, r['_scroll_id']] rescue => e # this will typically be triggered by a scroll timeout logger.error("Scroll request error, aborting scroll", message: e.message, exception: e.class) # return no hits and original scroll_id so we can try to clear it [false, scroll_id] end
# File lib/logstash/inputs/elasticsearch.rb, line 324 def push_hit(hit, output_queue) event = targeted_event_factory.new_event hit['_source'] set_docinfo_fields(hit, event) if @docinfo decorate(event) output_queue << event end
# File lib/logstash/inputs/elasticsearch.rb, line 356 def scroll_request scroll_id @client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll) end
# File lib/logstash/inputs/elasticsearch.rb, line 360 def search_request(options) @client.search(options) end
# File lib/logstash/inputs/elasticsearch.rb, line 331 def set_docinfo_fields(hit, event) # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event. docinfo_target = event.get(@docinfo_target) || {} unless docinfo_target.is_a?(Hash) @logger.error("Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event.to_hash_with_metadata) # TODO: (colin) I am not sure raising is a good strategy here? raise Exception.new("Elasticsearch input: incompatible event") end @docinfo_fields.each do |field| docinfo_target[field] = hit[field] end event.set(@docinfo_target, docinfo_target) end
# File lib/logstash/inputs/elasticsearch.rb, line 406 def setup_api_key(api_key) return {} unless (api_key && api_key.value) token = ::Base64.strict_encode64(api_key.value) { 'Authorization' => "ApiKey #{token}" } end
# File lib/logstash/inputs/elasticsearch.rb, line 399 def setup_basic_auth(user, password) return {} unless user && password && password.value token = ::Base64.strict_encode64("#{user}:#{password.value}") { 'Authorization' => "Basic #{token}" } end
# File lib/logstash/inputs/elasticsearch.rb, line 387 def setup_hosts @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s if @ssl @hosts.map do |h| host, port = h.split(":") { :host => host, :scheme => 'https', :port => port } end else @hosts end end
# File lib/logstash/inputs/elasticsearch.rb, line 383 def setup_ssl @ssl && @ca_file ? { :ssl => true, :ca_file => @ca_file } : {} end
# File lib/logstash/inputs/elasticsearch.rb, line 478 def test_connection! @client.ping rescue Elasticsearch::UnsupportedProductError raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch" end
# File lib/logstash/inputs/elasticsearch.rb, line 368 def validate_authentication authn_options = 0 authn_options += 1 if @cloud_auth authn_options += 1 if (@api_key && @api_key.value) authn_options += 1 if (@user || (@password && @password.value)) if authn_options > 1 raise LogStash::ConfigurationError, 'Multiple authentication options are specified, please only use one of user/password, cloud_auth or api_key' end if @api_key && @api_key.value && @ssl != true raise(LogStash::ConfigurationError, "Using api_key authentication requires SSL/TLS secured communication using the `ssl => true` option") end end