class LogStash::Outputs::InfluxDB
This output lets you output Metrics to InfluxDB
(>= 0.9.0-rc31)
The configuration here attempts to be as friendly as possible and minimize the need for multiple definitions to write to multiple measurements and still be efficient
the InfluxDB
API let's you do some semblance of bulk operation per http call but each call is database-specific
You can learn more at influxdb.com[InfluxDB homepage]
Public Instance Methods
# File lib/logstash/outputs/influxdb.rb, line 200 def close buffer_flush(:final => true) end
# File lib/logstash/outputs/influxdb.rb, line 237 def coerce_value(value_type, value) case value_type.to_sym when :integer value.to_i when :float value.to_f when :string value.to_s else @logger.warn("Don't know how to convert to #{value_type}. Returning value unchanged") value end end
Coerce values in the event data to their appropriate type. This requires foreknowledge of what's in the data point, which is less than ideal. An alternative is to use a `code` filter and manipulate the individual point's data before sending to the output pipeline
# File lib/logstash/outputs/influxdb.rb, line 220 def coerce_values!(event_data) @coerce_values.each do |column, value_type| if event_data.has_key?(column) begin @logger.debug? and @logger.debug("Converting column #{column} to type #{value_type}: Current value: #{event_data[column]}") event_data[column] = coerce_value(value_type, event_data[column]) rescue => e @logger.error("Unhandled exception", :error => e.message) end end end event_data end
Create a data point from an event. If @use_event_fields_for_data_points is true, convert the event to a hash. Otherwise, use @data_points. Each key and value will be run through event#sprintf with the exception of a non-String value (which will be passed through)
# File lib/logstash/outputs/influxdb.rb, line 209 def create_point_from_event(event) Hash[ (@use_event_fields_for_data_points ? event.to_hash : @data_points).map do |k,v| [event.sprintf(k), (String === v ? event.sprintf(v) : v)] end ] end
# File lib/logstash/outputs/influxdb.rb, line 188 def dowrite(events, database) begin @influxdbClient.write_points(events, @time_precision, @retention_policy, @db ) rescue InfluxDB::AuthenticationError => ae @logger.warn("Authentication Error while writing to InfluxDB", :exception => ae) rescue InfluxDB::ConnectionError => ce @logger.warn("Connection Error while writing to InfluxDB", :exception => ce) rescue Exception => e @logger.warn("Non recoverable exception while writing to InfluxDB", :exception => e) end end
Remove a set of fields from the event data before sending it to Influxdb. This is useful for removing @timestamp, @version, etc
# File lib/logstash/outputs/influxdb.rb, line 257 def exclude_fields!(event_data) @exclude_fields.each { |field| event_data.delete(field) } end
# File lib/logstash/outputs/influxdb.rb, line 183 def flush(events, database, teardown = false) @logger.debug? and @logger.debug("Flushing #{events.size} events to #{database} - Teardown? #{teardown}") dowrite(events, database) end
# File lib/logstash/outputs/influxdb.rb, line 145 def receive(event) @logger.debug? and @logger.debug("Influxdb output: Received event: #{event}") # An Influxdb 0.9 event looks like this: # cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000 # ^ measurement ^ tags (optional) ^ fields ^ timestamp (optional) # # Since we'll be buffering them to send as a batch, we'll only collect # the values going into the points array time = timestamp_at_precision(event.timestamp, @time_precision.to_sym) point = create_point_from_event(event) exclude_fields!(point) coerce_values!(point) if point.has_key?('time') unless @allow_time_override logger.error("Cannot override value of time without 'allow_time_override'. Using event timestamp") else time = point.delete("time") end end tags, point = extract_tags(point) event_hash = { :series => event.sprintf(@measurement), :timestamp => time, :values => point } event_hash[:tags] = tags unless tags.empty? buffer_receive(event_hash, event.sprintf(@db)) end
# File lib/logstash/outputs/influxdb.rb, line 128 def register require 'cgi' @queue = [] buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) @auth_method = @user.nil? ? 'none'.freeze : "params".freeze @influxdbClient = InfluxDB::Client.new host: @host, port: @port, time_precision: @time_precision, use_ssl: @ssl, verify_ssl: false, retry: @max_retries, initial_delay: @initial_delay, auth_method: @auth_method, username: @user, password: @password.value end
Returns the numeric value of the given timestamp in the requested precision. precision must be one of the valid values for time_precision
# File lib/logstash/outputs/influxdb.rb, line 293 def timestamp_at_precision( timestamp, precision ) multiplier = case precision when :h then 1.0/3600 when :m then 1.0/60 when :s then 1 when :ms then 1000 when :u then 1000000 when :n then 1000000000 end (timestamp.to_f * multiplier).to_i end