class LogStash::Outputs::AmazonES
This output plugin emits data to Amazon Elasticsearch
with support for signing requests using AWS V4 Signatures
The configuration and experience is similar to logstash-output-elasticsearch plugin and we have added Signature V4 support for the same Some of the default configurations like connection timeouts have been tuned for optimal performance with Amazon Elasticsearch
Retry Policy¶ ↑
This plugin uses the same retry policy as logstash-output-elasticsearch, It uses bulk API to optimize its imports into Elasticsearch
.. These requests may experience either partial or total failures. Events are retried if they fail due to either a network error or the status codes 429 (the server is busy), 409 (Version Conflict), or 503 (temporary overloading/maintenance).
The retry policy's logic can be described as follows:
-
Block and retry all events in the bulk response that experience transient network exceptions until a successful submission is received by
Elasticsearch
. -
Retry the subset of sent events which resulted in ES errors of a retryable nature.
-
Events which returned retryable error codes will be pushed onto a separate queue for retrying events. Events in this queue will be retried a maximum of 5 times by default (configurable through :max_retries). The size of this queue is capped by the value set in :retry_max_items.
-
Events from the retry queue are submitted again when the queue reaches its max size or when the max interval time is reached. The max interval time is configurable via :retry_max_interval.
-
Events which are not retryable or have reached their max retry count are logged to stderr.
Constants
- RETRYABLE_CODES
- SUCCESS_CODES
Attributes
Public Instance Methods
When there are exceptions raised upon submission, we raise an exception so that Stud::Buffer will retry to flush
# File lib/logstash/outputs/amazon_es.rb, line 349 def flush(actions, teardown = false) begin submit(actions) rescue Manticore::SocketException => e # If we can't even connect to the server let's just print out the URL (:hosts is actually a URL) # and let the user sort it out from there @logger.error( "Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}',"+ " but Elasticsearch appears to be unreachable or down!", :client_config => @client.client_options, :error_message => e.message ) @logger.debug("Failed actions for last bad bulk request!", :actions => actions) rescue => e # For all other errors print out full connection issues @logger.error( "Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}'," + " but an error occurred and it failed! Are you sure you can reach elasticsearch from this machine using " + "the configuration provided?", :client_config => @client.client_options, :error_message => e.message, :error_class => e.class.name, :backtrace => e.backtrace ) @logger.debug("Failed actions for last bad bulk request!", :actions => actions) raise e end end
# File lib/logstash/outputs/amazon_es.rb, line 267 def get_template if @template.nil? @template = ::File.expand_path('amazon_es/elasticsearch-template.json', ::File.dirname(__FILE__)) if !File.exists?(@template) raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')" end end template_json = IO.read(@template).gsub(/\n/,'') template = LogStash::Json.load(template_json) @logger.info("Using mapping template", :template => template) return template end
# File lib/logstash/outputs/amazon_es.rb, line 281 def receive(event) return unless output?(event) # block until we have not maxed out our # retry queue. This is applying back-pressure # to slow down the receive-rate @retry_flush_mutex.synchronize { @retry_queue_not_full.wait(@retry_flush_mutex) while @retry_queue.size > @retry_max_items } event.set('[@metadata][retry_count]', 0) # Set the 'type' value for the index. type = if @document_type event.sprintf(@document_type) elsif @index_type # deprecated event.sprintf(@index_type) else event.get('type') || 'logs' end params = { :_id => @document_id ? event.sprintf(@document_id) : nil, :_index => event.sprintf(@index), :_type => type, :_routing => @routing ? event.sprintf(@routing) : nil } params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @action == 'update' && @upsert != "" buffer_receive([event.sprintf(@action), params, event]) end
# File lib/logstash/outputs/amazon_es.rb, line 194 def register @hosts = Array(@hosts) # retry-specific variables @retry_flush_mutex = Mutex.new @retry_teardown_requested = Concurrent::AtomicBoolean.new(false) # needs flushing when interval @retry_queue_needs_flushing = ConditionVariable.new @retry_queue_not_full = ConditionVariable.new @retry_queue = Queue.new @submit_mutex = Mutex.new client_settings = {} common_options = { :client_settings => client_settings } client_settings[:path] = "/#{@path}/".gsub(/\/+/, "/") # Normalize slashes @logger.debug? && @logger.debug("Normalizing http path", :path => @path, :normalized => client_settings[:path]) if @hosts.nil? || @hosts.empty? @logger.info("No 'host' set in elasticsearch output. Defaulting to localhost") @hosts = ["localhost"] end client_settings.merge! setup_proxy() common_options.merge! setup_basic_auth() # Update API setup update_options = { :upsert => @upsert, :doc_as_upsert => @doc_as_upsert } common_options.merge! update_options if @action == 'update' @client = LogStash::Outputs::AES::HttpClient.new( common_options.merge(:hosts => @hosts, :port => @port, :region => @region, :aws_access_key_id => @aws_access_key_id, :aws_secret_access_key => @aws_secret_access_key,:protocol => @protocol) ) if @manage_template begin @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) @client.template_install(@template_name, get_template, @template_overwrite) rescue => e @logger.error("Failed to install template: #{e.message}") end end @logger.info("New Elasticsearch output", :hosts => @hosts, :port => @port) @client_idx = 0 buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) @retry_timer_thread = Thread.new do loop do sleep(@retry_max_interval) @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal } end end @retry_thread = Thread.new do while @retry_teardown_requested.false? @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.wait(@retry_flush_mutex) } retry_flush end end end
The submit method can be called from both the Stud::Buffer flush thread and from our own retry thread.
# File lib/logstash/outputs/amazon_es.rb, line 317 def submit(actions) @submit_mutex.synchronize do es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash] } bulk_response = @client.bulk(es_actions) if bulk_response["errors"] && bulk_response["items"] actions_to_retry = [] bulk_response['items'].each_with_index do |item,idx| action = es_actions[idx] action_type, props = item.first # These are all hashes with one value, so we destructure them here status = props['status'] error = props['error'] if RETRYABLE_CODES.include?(status) @logger.warn "retrying failed action with response code: #{status}" actions_to_retry << action elsif not SUCCESS_CODES.include?(status) @logger.warn "failed action", status: status, error: error, action: action end end retry_push(actions_to_retry) unless actions_to_retry.empty? end end end
# File lib/logstash/outputs/amazon_es.rb, line 381 def teardown @retry_teardown_requested.make_true # First, make sure retry_timer_thread is stopped # to ensure we do not signal a retry based on # the retry interval. Thread.kill(@retry_timer_thread) @retry_timer_thread.join # Signal flushing in the case that #retry_flush is in # the process of waiting for a signal. @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal } # Now, #retry_flush is ensured to not be in a state of # waiting and can be safely joined into the main thread # for further final execution of an in-process remaining call. @retry_thread.join # execute any final actions along with a proceeding retry for any # final actions that did not succeed. buffer_flush(:final => true) retry_flush end
Private Instance Methods
in charge of submitting any actions in @retry_queue that need to be retried
This method is not called concurrently. It is only called by @retry_thread and once that thread is ended during the teardown process, a final call to this method is done upon teardown in the main thread.
# File lib/logstash/outputs/amazon_es.rb, line 437 def retry_flush() unless @retry_queue.empty? buffer = @retry_queue.size.times.map do next_action, next_doc, next_event = @retry_queue.pop next_event.set('[@metadata][retry_count]', next_event.get('[@metadata][retry_count]') + 1) if next_event.get('[@metadata][retry_count]') > @max_retries @logger.error "too many attempts at sending event. dropping: #{next_event}" nil else [next_action, next_doc, next_event] end end.compact submit(buffer) unless buffer.empty? end @retry_flush_mutex.synchronize { @retry_queue_not_full.signal if @retry_queue.size < @retry_max_items } end
# File lib/logstash/outputs/amazon_es.rb, line 460 def retry_push(actions) Array(actions).each{|action| @retry_queue << action} @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal if @retry_queue.size >= @retry_max_items } end
# File lib/logstash/outputs/amazon_es.rb, line 420 def setup_basic_auth return {} unless @user && @password { :user => ::URI.escape(@user, "@:"), :password => ::URI.escape(@password.value, "@:") } end
# File lib/logstash/outputs/amazon_es.rb, line 404 def setup_proxy return {} unless @proxy # Symbolize keys proxy = if @proxy.is_a?(Hash) Hash[@proxy.map {|k,v| [k.to_sym, v]}] elsif @proxy.is_a?(String) @proxy else raise LogStash::ConfigurationError, "Expected 'proxy' to be a string or hash, not '#{@proxy}''!" end return {:proxy => proxy} end