class LogStash::Outputs::ElasticSearchJava
This output lets you store logs in Elasticsearch using the native 'node' and 'transport' protocols. It is highly recommended to use the regular 'logstash-output-elasticsearch' output which uses HTTP instead. This output is, in-fact, sometimes slower, and never faster than that one. Additionally, upgrading your Elasticsearch cluster may require you to simultaneously update this plugin for any protocol level changes. The HTTP client may be easier to work with due to wider familiarity with HTTP.
*VERSION NOTE*: Your Elasticsearch cluster must be running Elasticsearch 1.0.0 or later.
If you want to set other Elasticsearch options that are not exposed directly as configuration options, there are two methods:
-
Create an `elasticsearch.yml` file in the $PWD of the Logstash process
-
Pass in es.* java properties (`java -Des.node.foo=` or `ruby -J-Des.node.foo=`)
With the default `protocol` setting (“node”), this plugin will join your Elasticsearch cluster as a client node, so it will show up in Elasticsearch's cluster status.
You can learn more about Elasticsearch at <www.elastic.co/products/elasticsearch>
Operational Notes¶ ↑
If using the default `protocol` setting (“node”), your firewalls might need to permit port 9300 in both directions (from Logstash to Elasticsearch, and Elasticsearch to Logstash)
Retry Policy¶ ↑
By default all bulk requests to ES are synchronous. Not all events in the bulk requests always make it successfully. For example, there could be events which are not formatted correctly for the index they are targeting (type mismatch in mapping). So that we minimize loss of events, we have a specific retry policy in place. We retry all events which fail to be reached by Elasticsearch for network related issues. We retry specific events which exhibit errors under a separate policy described below. Events of this nature are ones which experience ES error codes described as retryable errors.
*Retryable Errors:*
-
429, Too Many Requests (RFC6585)
-
503, The server is currently unable to handle the request due to a temporary overloading or maintenance of the server.
Here are the rules of what is retried when:
-
Block and retry all events in bulk response that experiences transient network exceptions until a successful submission is received by Elasticsearch.
-
Retry subset of sent events which resulted in ES errors of a retryable nature which can be found in
RETRYABLE_CODES
-
For events which returned retryable error codes, they will be pushed onto a separate queue for retrying events. events in this queue will be retried a maximum of 5 times by default (configurable through :max_retries). The size of this queue is capped by the value set in :retry_max_items.
-
Events from the retry queue are submitted again either when the queue reaches its max size or when the max interval time is reached, which is set in :retry_max_interval.
-
Events which are not retryable or have reached their max retry count are logged to stderr.
Constants
- RETRYABLE_CODES
- SUCCESS_CODES
Attributes
Public Instance Methods
# File lib/logstash/outputs/elasticsearch_java.rb, line 168 def build_client @client = client_class.new(client_options) end
# File lib/logstash/outputs/elasticsearch_java.rb, line 184 def client_class case @protocol when "transport" LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::TransportClient when "node" LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient end end
# File lib/logstash/outputs/elasticsearch_java.rb, line 139 def client_options client_settings = {} client_settings["cluster.name"] = @cluster if @cluster client_settings["network.host"] = @network_host if @network_host client_settings["transport.tcp.port"] = @transport_tcp_port if @transport_tcp_port client_settings["client.transport.sniff"] = @sniffing if @node_name client_settings["node.name"] = @node_name else client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}" end options = { :protocol => @protocol, :client_settings => client_settings, :hosts => @hosts } # Update API setup update_options = { :upsert => @upsert, :doc_as_upsert => @doc_as_upsert } options.merge! update_options if @action == 'update' options end
# File lib/logstash/outputs/elasticsearch_java.rb, line 172 def close @stopping.make_true @buffer.stop end
# File lib/logstash/outputs/elasticsearch_java.rb, line 177 def get_plugin_options @@plugins.each do |plugin| name = plugin.name.split('-')[-1] client_settings.merge!(LogStash::Outputs::ElasticSearchJava.const_get(name.capitalize).create_client_config(self)) end end