class LogStash::Outputs::InfluxDB

This output lets you output Metrics to InfluxDB (>= 0.9.0-rc31)

The configuration here attempts to be as friendly as possible and minimize the need for multiple definitions to write to multiple measurements and still be efficient

the InfluxDB API let's you do some semblance of bulk operation per http call but each call is database-specific

You can learn more at influxdb.com[InfluxDB homepage]

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/influxdb.rb, line 200
def close
  buffer_flush(:final => true)
end
coerce_value(value_type, value) click to toggle source
# File lib/logstash/outputs/influxdb.rb, line 237
def coerce_value(value_type, value)
  case value_type.to_sym
  when :integer
    value.to_i
    
  when :float
    value.to_f

  when :string
    value.to_s
  
  else
    @logger.warn("Don't know how to convert to #{value_type}. Returning value unchanged")
    value  
  end
end
coerce_values!(event_data) click to toggle source

Coerce values in the event data to their appropriate type. This requires foreknowledge of what's in the data point, which is less than ideal. An alternative is to use a `code` filter and manipulate the individual point's data before sending to the output pipeline

# File lib/logstash/outputs/influxdb.rb, line 220
def coerce_values!(event_data)
  @coerce_values.each do |column, value_type|
    if event_data.has_key?(column)
      begin
        @logger.debug? and @logger.debug("Converting column #{column} to type #{value_type}: Current value: #{event_data[column]}")
        event_data[column] = coerce_value(value_type, event_data[column])

      rescue => e
        @logger.error("Unhandled exception", :error => e.message)
      end
    end
  end

  event_data
end
create_point_from_event(event) click to toggle source

Create a data point from an event. If @use_event_fields_for_data_points is true, convert the event to a hash. Otherwise, use @data_points. Each key and value will be run through event#sprintf with the exception of a non-String value (which will be passed through)

# File lib/logstash/outputs/influxdb.rb, line 209
def create_point_from_event(event)
  Hash[ (@use_event_fields_for_data_points ? event.to_hash : @data_points).map do |k,v| 
    [event.sprintf(k), (String === v ? event.sprintf(v) : v)] 
  end ]
end
dowrite(events, database) click to toggle source
# File lib/logstash/outputs/influxdb.rb, line 188
def dowrite(events, database)
  begin
      @influxdbClient.write_points(events, @time_precision, @retention_policy, @db  )
  rescue InfluxDB::AuthenticationError => ae
      @logger.warn("Authentication Error while writing to InfluxDB", :exception => ae)
  rescue InfluxDB::ConnectionError => ce 
      @logger.warn("Connection Error while writing to InfluxDB", :exception => ce)
  rescue Exception => e
      @logger.warn("Non recoverable exception while writing to InfluxDB", :exception => e)
  end
end
exclude_fields!(event_data) click to toggle source

Remove a set of fields from the event data before sending it to Influxdb. This is useful for removing @timestamp, @version, etc

# File lib/logstash/outputs/influxdb.rb, line 257
def exclude_fields!(event_data)
  @exclude_fields.each { |field| event_data.delete(field) }
end
extract_tags(fields) click to toggle source

Extract tags from a hash of fields. Returns a tuple containing a hash of tags (as configured by send_as_tags) and a hash of fields that exclude the tags. If fields contains a key “tags” with an array, they will be moved to the tags hash (and each will be given a value of true)

Example:

# Given send_as_tags: ["bar"]
original_fields = {"foo" => 1, "bar" => 2, "tags" => ["tag"]}
tags, fields = extract_tags(original_fields)
# tags: {"bar" => 2, "tag" => "true"} and fields: {"foo" => 1}
# File lib/logstash/outputs/influxdb.rb, line 273
def extract_tags(fields)
  remainder = fields.dup

  tags = if remainder.has_key?("tags") && remainder["tags"].respond_to?(:inject)
    remainder.delete("tags").inject({}) { |tags, tag| tags[tag] = "true"; tags }
  else
    {}
  end
  
  @send_as_tags.each { |key| (tags[key] = remainder.delete(key)) if remainder.has_key?(key) }

  tags.delete_if { |key,value| value.nil? || value == "" }
  remainder.delete_if { |key,value| value.nil? || value == "" }

  [tags, remainder]
end
flush(events, database, teardown = false) click to toggle source
# File lib/logstash/outputs/influxdb.rb, line 183
def flush(events, database, teardown = false)
  @logger.debug? and @logger.debug("Flushing #{events.size} events to #{database} - Teardown? #{teardown}")
  dowrite(events, database)
end
receive(event) click to toggle source
# File lib/logstash/outputs/influxdb.rb, line 145
def receive(event)
  @logger.debug? and @logger.debug("Influxdb output: Received event: #{event}")

  # An Influxdb 0.9 event looks like this:
  # cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000
  #  ^ measurement  ^ tags (optional)            ^ fields   ^ timestamp (optional)
  #
  # Since we'll be buffering them to send as a batch, we'll only collect
  # the values going into the points array
  
  time  = timestamp_at_precision(event.timestamp, @time_precision.to_sym)
  point = create_point_from_event(event)
  exclude_fields!(point)
  coerce_values!(point)

  if point.has_key?('time')
    unless @allow_time_override
      logger.error("Cannot override value of time without 'allow_time_override'. Using event timestamp")
    else
      time = point.delete("time")
    end
  end



  tags, point = extract_tags(point)

  event_hash = {
    :series => event.sprintf(@measurement),
    :timestamp       => time,
    :values      => point
  }
  event_hash[:tags] = tags unless tags.empty?

  buffer_receive(event_hash, event.sprintf(@db))
end
register() click to toggle source
# File lib/logstash/outputs/influxdb.rb, line 128
def register
  require 'cgi'
  
  @queue = []

  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )
  @auth_method = @user.nil? ? 'none'.freeze  : "params".freeze 
  
  @influxdbClient = InfluxDB::Client.new host: @host, port: @port, time_precision: @time_precision, use_ssl: @ssl, verify_ssl: false, retry: @max_retries, initial_delay: @initial_delay, auth_method: @auth_method, username: @user, password: @password.value
end
timestamp_at_precision( timestamp, precision ) click to toggle source

Returns the numeric value of the given timestamp in the requested precision. precision must be one of the valid values for time_precision

# File lib/logstash/outputs/influxdb.rb, line 293
def timestamp_at_precision( timestamp, precision )
  multiplier = case precision
    when :h  then 1.0/3600
    when :m  then 1.0/60
    when :s  then 1
    when :ms then 1000
    when :u  then 1000000
    when :n  then 1000000000
  end
  
  (timestamp.to_f * multiplier).to_i
end