class LogStash::Outputs::Influx

Constants

RETRYABLE_MANTICORE_EXCEPTIONS

Attributes

is_batch[RW]

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/influx.rb, line 264
def close
  @timer.cancel
  client.close
end
log_error_response(response, url, event) click to toggle source
# File lib/logstash/outputs/influx.rb, line 137
def log_error_response(response, url, event)
  log_failure(
            "Encountered non-2xx HTTP code #{response.code}",
            :response_code => response.code,
            :url => url,
            :event => event
          )
end
log_retryable_response(response) click to toggle source
# File lib/logstash/outputs/influx.rb, line 129
def log_retryable_response(response)
  if (response.code == 429)
    @logger.debug? && @logger.debug("Encountered a 429 response, will retry. This is not serious, just flow control via HTTP")
  else
    @logger.warn("Encountered a retryable HTTP request in HTTP output, will retry", :code => response.code, :body => response.body)
  end
end
multi_receive(events) click to toggle source
# File lib/logstash/outputs/influx.rb, line 111
def multi_receive(events)
  return if events.empty?
  send_events(events)
end
register() click to toggle source
# File lib/logstash/outputs/influx.rb, line 82
def register
  # We count outstanding requests with this queue
  # This queue tracks the requests to create backpressure
  # When this queue is empty no new requests may be sent,
  # tokens must be added back by the client on success
  @request_tokens = SizedQueue.new(@pool_max)
  @pool_max.times {|t| @request_tokens << true }

  @requests = Array.new

  if @content_type.nil?
    case @format
      when "form" ; @content_type = "application/x-www-form-urlencoded"
      when "json" ; @content_type = "application/json"
      when "json_batch" ; @content_type = "application/json"
      when "message" ; @content_type = "text/plain"
    end
  end

  @is_batch = @is_batch

  @headers["Content-Type"] = @content_type

  validate_format!

  # Run named Timer as daemon thread
  @timer = java.util.Timer.new("Influx Output #{self.params['id']}", true)
end
send_event(event, attempt) click to toggle source
# File lib/logstash/outputs/influx.rb, line 214
def send_event(event, attempt)
  body = event_body(event)

  # Send the request

  url = @url + 'write?db=' + @db

  headers = @is_batch ? @headers : event_headers(event)

  # Compress the body and add appropriate header
  if @http_compression == true
    headers["Content-Encoding"] = "gzip"
    body = gzip(body)
  end

  # Create an async request
  response = client.send(:post, url, :body => body, :headers => headers).call

  if !response_success?(response)
    if retryable_response?(response)
      log_retryable_response(response)
      return :retry, event, attempt
    else
      log_error_response(response, url, event)
      return :failure, event, attempt
    end
  else
    return :success, event, attempt
  end

rescue => exception
  will_retry = retryable_exception?(exception)
  log_failure("Could not fetch URL",
              :url => url,
              :method => @http_method,
              :body => body,
              :headers => headers,
              :message => exception.message,
              :class => exception.class.name,
              :backtrace => exception.backtrace,
              :will_retry => will_retry
  )

  if will_retry
    return :retry, event, attempt
  else
    return :failure, event, attempt
  end
end
send_events(events) click to toggle source
# File lib/logstash/outputs/influx.rb, line 146
def send_events(events)
  successes = java.util.concurrent.atomic.AtomicInteger.new(0)
  failures  = java.util.concurrent.atomic.AtomicInteger.new(0)
  retries = java.util.concurrent.atomic.AtomicInteger.new(0)
  event_count = @is_batch ? 1 : events.size

  pending = Queue.new
  if @is_batch
    pending << [events, 0]
  else
    events.each {|e| pending << [e, 0]}
  end

  while popped = pending.pop
    break if popped == :done

    event, attempt = popped

    action, event, attempt = send_event(event, attempt)
    begin
      action = :failure if action == :retry && !@retry_failed

      case action
      when :success
        successes.incrementAndGet
      when :retry
        retries.incrementAndGet

        next_attempt = attempt+1
        sleep_for = sleep_for_attempt(next_attempt)
        @logger.info("Retrying http request, will sleep for #{sleep_for} seconds")
        timer_task = RetryTimerTask.new(pending, event, next_attempt)
        @timer.schedule(timer_task, sleep_for*1000)
      when :failure
        failures.incrementAndGet
      else
        raise "Unknown action #{action}"
      end

      if action == :success || action == :failure
        if successes.get+failures.get == event_count
          pending << :done
        end
      end
    rescue => e
      # This should never happen unless there's a flat out bug in the code
      @logger.error("Error sending HTTP Request",
        :class => e.class.name,
        :message => e.message,
        :backtrace => e.backtrace)
      failures.incrementAndGet
      raise e
    end
  end
rescue => e
  @logger.error("Error in http output loop",
          :class => e.class.name,
          :message => e.message,
          :backtrace => e.backtrace)
  raise e
end
sleep_for_attempt(attempt) click to toggle source
# File lib/logstash/outputs/influx.rb, line 208
def sleep_for_attempt(attempt)
  sleep_for = attempt**2
  sleep_for = sleep_for <= 60 ? sleep_for : 60
  (sleep_for/2) + (rand(0..sleep_for)/2)
end

Private Instance Methods

convert_mapping(mapping, event) click to toggle source
# File lib/logstash/outputs/influx.rb, line 307
def convert_mapping(mapping, event)
  if mapping.is_a?(Hash)
    mapping.reduce({}) do |acc, kv|
      k, v = kv
      acc[k] = convert_mapping(v, event)
      acc
    end
  elsif mapping.is_a?(Array)
    mapping.map { |elem| convert_mapping(elem, event) }
  else
    event.sprintf(mapping)
  end
end
custom_headers(event) click to toggle source
# File lib/logstash/outputs/influx.rb, line 330
def custom_headers(event)
  return nil unless @headers

  @headers.reduce({}) do |acc,kv|
    k,v = kv
    acc[k] = event.sprintf(v)
    acc
  end
end
encode(hash) click to toggle source

TODO Extract this to a codec

# File lib/logstash/outputs/influx.rb, line 341
def encode(hash)
  return hash.collect do |key, value|
    CGI.escape(key) + "=" + CGI.escape(value.to_s)
  end.join("&")
end
event_body(event) click to toggle source

Format the HTTP body

# File lib/logstash/outputs/influx.rb, line 291
def event_body(event)
  # TODO: Create an HTTP post data codec, use that here
  event.map {|e| map_event(e) }.join("\n")

end
event_headers(event) click to toggle source
# File lib/logstash/outputs/influx.rb, line 326
def event_headers(event)
  custom_headers(event) || {}
end
gzip(data) click to toggle source

gzip data

# File lib/logstash/outputs/influx.rb, line 298
def gzip(data)
  gz = StringIO.new
  gz.set_encoding("BINARY")
  z = Zlib::GzipWriter.new(gz)
  z.write(data)
  z.close
  gz.string
end
log_failure(message, opts) click to toggle source

This is split into a separate method mostly to help testing

# File lib/logstash/outputs/influx.rb, line 286
def log_failure(message, opts)
  @logger.error("[HTTP Output Failure] #{message}", opts)
end
map_event(event) click to toggle source
# File lib/logstash/outputs/influx.rb, line 321
def map_event(event)
  event_hash = event.to_hash
  event_hash['payload']
end
response_success?(response) click to toggle source
# File lib/logstash/outputs/influx.rb, line 271
def response_success?(response)
  code = response.code
  return true if @ignorable_codes && @ignorable_codes.include?(code)
  return code >= 200 && code <= 299
end
retryable_exception?(exception) click to toggle source
# File lib/logstash/outputs/influx.rb, line 281
def retryable_exception?(exception)
  RETRYABLE_MANTICORE_EXCEPTIONS.any? {|me| exception.is_a?(me) }
end
retryable_response?(response) click to toggle source
# File lib/logstash/outputs/influx.rb, line 277
def retryable_response?(response)
  @retryable_codes && @retryable_codes.include?(response.code)
end
validate_format!() click to toggle source
# File lib/logstash/outputs/influx.rb, line 348
def validate_format!
  if @format == "message"
    if @message.nil?
      raise "message must be set if message format is used"
    end

    if @content_type.nil?
      raise "content_type must be set if message format is used"
    end

    unless @mapping.nil?
      @logger.warn "mapping is not supported and will be ignored if message format is used"
    end
  end
end