module LogStash::Outputs::ElasticSearch::Common

Constants

RETRYABLE_CODES
SUCCESS_CODES
VALID_HTTP_ACTIONS

To be overidden by the -java version

Attributes

client[R]
hosts[R]

Public Instance Methods

check_action_validity() click to toggle source
# File lib/logstash/outputs/elasticsearch/common.rb, line 62
def check_action_validity
  raise LogStash::ConfigurationError, "No action specified!" unless @action

  # If we're using string interpolation, we're good!
  return if @action =~ /%{.+}/
  return if valid_actions.include?(@action)

  raise LogStash::ConfigurationError, "Action '#{@action}' is invalid! Pick one of #{valid_actions} or use a sprintf style statement"
end
event_action_params(event) click to toggle source

get the action parameters for the given event

# File lib/logstash/outputs/elasticsearch/common.rb, line 127
def event_action_params(event)
  type = get_event_type(event)

  params = {
    :_id => @document_id ? event.sprintf(@document_id) : nil,
    :_index => event.sprintf(@index),
    :_type => type,
    :_routing => @routing ? event.sprintf(@routing) : nil
  }

  if @pipeline
    params[:pipeline] = @pipeline
  end

 if @parent
    params[:parent] = event.sprintf(@parent)
  end

  if @action == 'update'
    params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
    params[:_script] = event.sprintf(@script) if @script != ""
    params[:_retry_on_conflict] = @retry_on_conflict
  end

  params
end
event_action_tuple(event) click to toggle source

Convert the event into a 3-tuple of action, params, and event

# File lib/logstash/outputs/elasticsearch/common.rb, line 34
def event_action_tuple(event)
  params = event_action_params(event)
  action = event.sprintf(@action)
  [action, params, event]
end
flush() click to toggle source
# File lib/logstash/outputs/elasticsearch/common.rb, line 40
def flush
  @buffer.flush
end
get_event_type(event) click to toggle source

Determine the correct value for the 'type' field for the given event

# File lib/logstash/outputs/elasticsearch/common.rb, line 155
def get_event_type(event)
  # Set the 'type' value for the index.
  type = if @document_type
           event.sprintf(@document_type)
         else
           event.get("type") || "logs"
         end

  if !(type.is_a?(String) || type.is_a?(Numeric))
    @logger.warn("Bad event type! Non-string/integer type value set!", :type_class => type.class, :type_value => type.to_s, :event => event)
  end

  type.to_s
end
install_template() click to toggle source
# File lib/logstash/outputs/elasticsearch/common.rb, line 52
def install_template
  TemplateManager.install_template(self)
end
multi_receive(events) click to toggle source

Receive an array of events and immediately attempt to index them (no buffering)

# File lib/logstash/outputs/elasticsearch/common.rb, line 27
def multi_receive(events)
  events.each_slice(@flush_size) do |slice|
    retrying_submit(slice.map {|e| event_action_tuple(e) })
  end
end
receive(event) click to toggle source
# File lib/logstash/outputs/elasticsearch/common.rb, line 22
def receive(event)
  @buffer << event_action_tuple(event)
end
register() click to toggle source
# File lib/logstash/outputs/elasticsearch/common.rb, line 11
def register
  @stopping = Concurrent::AtomicBoolean.new(false)
  setup_hosts # properly sets @hosts
  build_client
  install_template
  setup_buffer_and_handler
  check_action_validity

  @logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts)
end
retrying_submit(actions) click to toggle source
# File lib/logstash/outputs/elasticsearch/common.rb, line 78
def retrying_submit(actions)
  # Initially we submit the full list of actions
  submit_actions = actions

  while submit_actions && submit_actions.length > 0
    return if !submit_actions || submit_actions.empty? # If everything's a success we move along
    # We retry with whatever is didn't succeed
    begin
      submit_actions = submit(submit_actions)
    rescue => e
      @logger.warn("Encountered an unexpected error submitting a bulk request! Will retry.",
                   :message => e.message,
                   :class => e.class.name,
                   :backtrace => e.backtrace)
    end

    sleep @retry_max_interval if submit_actions && submit_actions.length > 0
  end
end
safe_bulk(es_actions,actions) click to toggle source

Rescue retryable errors during bulk submission

# File lib/logstash/outputs/elasticsearch/common.rb, line 171
def safe_bulk(es_actions,actions)
  @client.bulk(es_actions)
rescue Manticore::SocketException, Manticore::SocketTimeout => e
  # If we can't even connect to the server let's just print out the URL (:hosts is actually a URL)
  # and let the user sort it out from there
  @logger.error(
    "Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}',"+
      " but Elasticsearch appears to be unreachable or down!",
    :error_message => e.message,
    :class => e.class.name,
    :client_config => @client.client_options,
  )
  @logger.debug("Failed actions for last bad bulk request!", :actions => actions)

  # We retry until there are no errors! Errors should all go to the retry queue
  sleep @retry_max_interval
  retry unless @stopping.true?
rescue => e
  # For all other errors print out full connection issues
  @logger.error(
    "Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}'," +
      " but an error occurred and it failed! Are you sure you can reach elasticsearch from this machine using " +
      "the configuration provided?",
    :error_message => e.message,
    :error_class => e.class.name,
    :backtrace => e.backtrace,
    :client_config => @client.client_options,
  )

  @logger.debug("Failed actions for last bad bulk request!", :actions => actions)

  raise e
end
setup_buffer_and_handler() click to toggle source
# File lib/logstash/outputs/elasticsearch/common.rb, line 56
def setup_buffer_and_handler
  @buffer = ::LogStash::Outputs::ElasticSearch::Buffer.new(@logger, @flush_size, @idle_flush_time) do |actions|
    retrying_submit(actions)
  end
end
setup_hosts() click to toggle source
# File lib/logstash/outputs/elasticsearch/common.rb, line 44
def setup_hosts
  @hosts = Array(@hosts)
  if @hosts.empty?
    @logger.info("No 'host' set in elasticsearch output. Defaulting to localhost")
    @hosts.replace(["localhost"])
  end
end
submit(actions) click to toggle source
# File lib/logstash/outputs/elasticsearch/common.rb, line 98
def submit(actions)
  es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash]}

  bulk_response = safe_bulk(es_actions,actions)

  # If there are no errors, we're done here!
  return unless bulk_response["errors"]

  actions_to_retry = []
  bulk_response["items"].each_with_index do |response,idx|
    action_type, action_props = response.first
    status = action_props["status"]
    error  = action_props["error"]
    action = actions[idx]

    if SUCCESS_CODES.include?(status)
      next
    elsif RETRYABLE_CODES.include?(status)
      @logger.info "retrying failed action with response code: #{status} (#{error})"
      actions_to_retry << action
    else
      @logger.warn "Failed action. ", status: status, action: action, response: response
    end
  end

  actions_to_retry
end
valid_actions() click to toggle source
# File lib/logstash/outputs/elasticsearch/common.rb, line 74
def valid_actions
  VALID_HTTP_ACTIONS
end