class InfluxDBOutput

A buffered output plugin for Fluentd and InfluxDB 2

Constants

DEFAULT_BUFFER_TYPE

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_influxdb2.rb, line 81
def configure(conf)
  compat_parameters_convert(conf, :inject)
  super
  case @time_precision
  when 'ns' then
    @precision_formatter = ->(ns_time) { ns_time }
  when 'us' then
    @precision_formatter = ->(ns_time) { (ns_time / 1e3).round }
  when 'ms' then
    @precision_formatter = ->(ns_time) { (ns_time / 1e6).round }
  when 's' then
    @precision_formatter = ->(ns_time) { (ns_time / 1e9).round }
  else
    raise Fluent::ConfigError, "The time precision #{@time_precision} is not supported. You should use: " \
                               'second (s), millisecond (ms), microsecond (us), or nanosecond (ns).'
  end
  @precision = InfluxDB2::WritePrecision.new.get_from_value(@time_precision)
  raise Fluent::ConfigError, 'The InfluxDB URL should be defined.' if @url.empty?
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_influxdb2.rb, line 115
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_influxdb2.rb, line 110
def shutdown
  super
  @client.close!
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_influxdb2.rb, line 101
def start
  super
  log.info  "Connecting to InfluxDB: url: #{@url}, bucket: #{@bucket}, org: #{@org}, precision = #{@precision}, " \
            "use_ssl = #{@use_ssl}, verify_mode = #{@verify_mode}"
  @client = InfluxDB2::Client.new(@url, @token, bucket: @bucket, org: @org, precision: @precision, use_ssl: @use_ssl,
                                                verify_mode: @verify_mode)
  @write_api = @client.create_write_api
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_influxdb2.rb, line 119
def write(chunk)
  points = []
  tag = chunk.metadata.tag
  bucket, measurement = expand_placeholders(chunk)
  chunk.msgpack_each do |time, record|
    if time.is_a?(Integer)
      time_formatted = time
    else
      nano_seconds = time.sec * 1e9
      nano_seconds += time.nsec
      time_formatted = @precision_formatter.call(nano_seconds)
    end
    point = InfluxDB2::Point
            .new(name: measurement)
    record.each_pair do |k, v|
      if k.eql?(@time_key)
        time_formatted = v
      else
        _parse_field(k, v, point)
      end
      point.add_tag('fluentd', tag) if @tag_fluentd
    end
    point.time(time_formatted, @precision)
    points << point
  end
  @write_api.write(data: points, bucket: bucket)
  log.debug "Written points: #{points}"
end

Private Instance Methods

_parse_field(key, value, point) click to toggle source
# File lib/fluent/plugin/out_influxdb2.rb, line 160
def _parse_field(key, value, point)
  if @tag_keys.include?(key)
    point.add_tag(key, value)
  elsif @field_keys.empty? || @field_keys.include?(key)
    if @field_cast_to_float & value.is_a?(Integer)
      point.add_field(key, Float(value))
    elsif value.is_a?(Hash)
      value.each_pair do |nested_k, nested_v|
        _parse_field("#{key}.#{nested_k}", nested_v, point)
      end
    else
      point.add_field(key, value)
    end
  end
end
expand_placeholders(chunk) click to toggle source
# File lib/fluent/plugin/out_influxdb2.rb, line 150
def expand_placeholders(chunk)
  bucket = extract_placeholders(@bucket, chunk)
  measurement = if @measurement
                  extract_placeholders(@measurement, chunk)
                else
                  chunk.metadata.tag
                end
  [bucket, measurement]
end