class LogStash::Outputs::AmazonES

This output plugin emits data to Amazon Elasticsearch with support for signing requests using AWS V4 Signatures

The configuration and experience is similar to logstash-output-elasticsearch plugin and we have added Signature V4 support for the same Some of the default configurations like connection timeouts have been tuned for optimal performance with Amazon Elasticsearch

Retry Policy

This plugin uses the same retry policy as logstash-output-elasticsearch, It uses bulk API to optimize its imports into Elasticsearch.. These requests may experience either partial or total failures. Events are retried if they fail due to either a network error or the status codes 429 (the server is busy), 409 (Version Conflict), or 503 (temporary overloading/maintenance).

The retry policy's logic can be described as follows:

Constants

RETRYABLE_CODES
SUCCESS_CODES

Attributes

client[R]

Public Instance Methods

flush(actions, teardown = false) click to toggle source

When there are exceptions raised upon submission, we raise an exception so that Stud::Buffer will retry to flush

# File lib/logstash/outputs/amazon_es.rb, line 349
def flush(actions, teardown = false)
  begin
    submit(actions)
  rescue Manticore::SocketException => e
    # If we can't even connect to the server let's just print out the URL (:hosts is actually a URL)
    # and let the user sort it out from there
    @logger.error(
      "Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}',"+
        " but Elasticsearch appears to be unreachable or down!",
      :client_config => @client.client_options,
      :error_message => e.message
    )
    @logger.debug("Failed actions for last bad bulk request!", :actions => actions)
  rescue => e
    # For all other errors print out full connection issues
    @logger.error(
      "Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}'," +
          " but an error occurred and it failed! Are you sure you can reach elasticsearch from this machine using " +
        "the configuration provided?",
      :client_config => @client.client_options,
      :error_message => e.message,
      :error_class => e.class.name,
      :backtrace => e.backtrace
    )

    @logger.debug("Failed actions for last bad bulk request!", :actions => actions)

    raise e
  end
end
get_template() click to toggle source
# File lib/logstash/outputs/amazon_es.rb, line 267
def get_template
  if @template.nil?
    @template = ::File.expand_path('amazon_es/elasticsearch-template.json', ::File.dirname(__FILE__))
    if !File.exists?(@template)
      raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')"
    end
  end
  template_json = IO.read(@template).gsub(/\n/,'')
  template = LogStash::Json.load(template_json)
  @logger.info("Using mapping template", :template => template)
  return template
end
receive(event) click to toggle source
# File lib/logstash/outputs/amazon_es.rb, line 281
def receive(event)
  return unless output?(event)

  # block until we have not maxed out our
  # retry queue. This is applying back-pressure
  # to slow down the receive-rate
  @retry_flush_mutex.synchronize {
    @retry_queue_not_full.wait(@retry_flush_mutex) while @retry_queue.size > @retry_max_items
  }

  event.set('[@metadata][retry_count]', 0)

  # Set the 'type' value for the index.
  type = if @document_type
           event.sprintf(@document_type)
         elsif @index_type # deprecated
           event.sprintf(@index_type)
         else
           event.get('type') || 'logs'
         end

  params = {
    :_id => @document_id ? event.sprintf(@document_id) : nil,
    :_index => event.sprintf(@index),
    :_type => type,
    :_routing => @routing ? event.sprintf(@routing) : nil
  }

  params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @action == 'update' && @upsert != ""

  buffer_receive([event.sprintf(@action), params, event])
end
register() click to toggle source
# File lib/logstash/outputs/amazon_es.rb, line 194
def register
  @hosts = Array(@hosts)
  # retry-specific variables
  @retry_flush_mutex = Mutex.new
  @retry_teardown_requested = Concurrent::AtomicBoolean.new(false)
  # needs flushing when interval
  @retry_queue_needs_flushing = ConditionVariable.new
  @retry_queue_not_full = ConditionVariable.new
  @retry_queue = Queue.new
  @submit_mutex = Mutex.new

  client_settings = {}
  common_options = {
    :client_settings => client_settings
  }

  client_settings[:path] = "/#{@path}/".gsub(/\/+/, "/") # Normalize slashes
  @logger.debug? && @logger.debug("Normalizing http path", :path => @path, :normalized => client_settings[:path])

  if @hosts.nil? || @hosts.empty?
    @logger.info("No 'host' set in elasticsearch output. Defaulting to localhost")
    @hosts = ["localhost"]
  end

  client_settings.merge! setup_proxy()
  common_options.merge! setup_basic_auth()

  # Update API setup
  update_options = {
    :upsert => @upsert,
    :doc_as_upsert => @doc_as_upsert
  }
  common_options.merge! update_options if @action == 'update'

  @client = LogStash::Outputs::AES::HttpClient.new(
    common_options.merge(:hosts => @hosts, :port => @port, :region => @region, :aws_access_key_id => @aws_access_key_id, :aws_secret_access_key => @aws_secret_access_key,:protocol => @protocol)
  )

  if @manage_template
    begin
      @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
      @client.template_install(@template_name, get_template, @template_overwrite)
    rescue => e
      @logger.error("Failed to install template: #{e.message}")
    end
  end

  @logger.info("New Elasticsearch output", :hosts => @hosts, :port => @port)

  @client_idx = 0

  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )

  @retry_timer_thread = Thread.new do
    loop do
      sleep(@retry_max_interval)
      @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal }
    end
  end

  @retry_thread = Thread.new do
    while @retry_teardown_requested.false?
      @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.wait(@retry_flush_mutex) }
      retry_flush
    end
  end
end
submit(actions) click to toggle source

The submit method can be called from both the Stud::Buffer flush thread and from our own retry thread.

# File lib/logstash/outputs/amazon_es.rb, line 317
def submit(actions)
  @submit_mutex.synchronize do
    es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash] }

    bulk_response = @client.bulk(es_actions)

    if bulk_response["errors"] && bulk_response["items"]
      actions_to_retry = []

      bulk_response['items'].each_with_index do |item,idx|
        action = es_actions[idx]
        action_type, props = item.first # These are all hashes with one value, so we destructure them here

        status = props['status']
        error = props['error']

        if RETRYABLE_CODES.include?(status)
          @logger.warn "retrying failed action with response code: #{status}"
          actions_to_retry << action
        elsif not SUCCESS_CODES.include?(status)
          @logger.warn "failed action", status: status, error: error, action: action
        end
      end

      retry_push(actions_to_retry) unless actions_to_retry.empty?
    end
  end
end
teardown() click to toggle source
# File lib/logstash/outputs/amazon_es.rb, line 381
def teardown

  @retry_teardown_requested.make_true
  # First, make sure retry_timer_thread is stopped
  # to ensure we do not signal a retry based on
  # the retry interval.
  Thread.kill(@retry_timer_thread)
  @retry_timer_thread.join
  # Signal flushing in the case that #retry_flush is in
  # the process of waiting for a signal.
  @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal }
  # Now, #retry_flush is ensured to not be in a state of
  # waiting and can be safely joined into the main thread
  # for further final execution of an in-process remaining call.
  @retry_thread.join

  # execute any final actions along with a proceeding retry for any
  # final actions that did not succeed.
  buffer_flush(:final => true)
  retry_flush
end

Private Instance Methods

retry_flush() click to toggle source

in charge of submitting any actions in @retry_queue that need to be retried

This method is not called concurrently. It is only called by @retry_thread and once that thread is ended during the teardown process, a final call to this method is done upon teardown in the main thread.

# File lib/logstash/outputs/amazon_es.rb, line 437
def retry_flush()
  unless @retry_queue.empty?
    buffer = @retry_queue.size.times.map do
      next_action, next_doc, next_event = @retry_queue.pop
      next_event.set('[@metadata][retry_count]', next_event.get('[@metadata][retry_count]') + 1)

      if next_event.get('[@metadata][retry_count]') > @max_retries
        @logger.error "too many attempts at sending event. dropping: #{next_event}"
        nil
      else
        [next_action, next_doc, next_event]
      end
    end.compact

    submit(buffer) unless buffer.empty?
  end

  @retry_flush_mutex.synchronize {
    @retry_queue_not_full.signal if @retry_queue.size < @retry_max_items
  }
end
retry_push(actions) click to toggle source
# File lib/logstash/outputs/amazon_es.rb, line 460
def retry_push(actions)
  Array(actions).each{|action| @retry_queue << action}
  @retry_flush_mutex.synchronize {
    @retry_queue_needs_flushing.signal if @retry_queue.size >= @retry_max_items
  }
end
setup_basic_auth() click to toggle source
# File lib/logstash/outputs/amazon_es.rb, line 420
def setup_basic_auth
  return {} unless @user && @password

  {
    :user => ::URI.escape(@user, "@:"),
    :password => ::URI.escape(@password.value, "@:")
  }
end
setup_proxy() click to toggle source
# File lib/logstash/outputs/amazon_es.rb, line 404
def setup_proxy
  return {} unless @proxy

  # Symbolize keys
  proxy = if @proxy.is_a?(Hash)
            Hash[@proxy.map {|k,v| [k.to_sym, v]}]
          elsif @proxy.is_a?(String)
            @proxy
          else
            raise LogStash::ConfigurationError, "Expected 'proxy' to be a string or hash, not '#{@proxy}''!"
          end

  return {:proxy => proxy}
end