class Fluent::Plugin::InfluxdbDeduplicationFilter

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_influxdb_deduplication.rb, line 20
def configure(conf)
  super

  if @time == nil and @tag == nil
    raise Fluent::ConfigError, "one of tag or time deduplication needs to be set."
  elsif @time != nil and @tag != nil
    raise Fluent::ConfigError, "tag and time deduplication are mutually exclusive."
  elsif @time != nil and (@time.key == nil or @time.key == "")
    raise Fluent::ConfigError, "an output 'key' field is required for time deduplication"
  elsif @tag != nil and (@tag == nil or @tag.key == "")
    raise Fluent::ConfigError, "an output 'key' field is required for tag deduplication"
  end
end
filter(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_influxdb_deduplication.rb, line 41
def filter(tag, time, record)
  if @time
    time_deduplication(time, record)
  else
    tag_deduplication(time, record)
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_influxdb_deduplication.rb, line 34
def start
  super

  @last_timestamp = 0
  @sequence = 0
end
tag_deduplication(time, record) click to toggle source
# File lib/fluent/plugin/filter_influxdb_deduplication.rb, line 91
def tag_deduplication(time, record)
  if time.is_a?(Integer)
    input_time = time
  elsif time.is_a?(Fluent::EventTime)
    input_time = time.sec * 1000000000 + time.nsec
  else
    @log.error("unreadable time")
    return nil
  end

  if input_time < @last_timestamp
    @log.debug("out of sequence timestamp")
    if @order_key
      record[@order_key] = false
    else
      @log.debug("out of order record dropped")
      return nil
    end
  elsif input_time == @last_timestamp
    @sequence = @sequence + 1
    record[@tag.key] = @sequence
    if @order_key
      record[@order_key] = true
    end
  else
    @sequence = 0
    @last_timestamp = input_time
    record[@tag.key] = 0
    if @order_key
      record[@order_key] = true
    end
  end

  record
end
time_deduplication(time, record) click to toggle source
# File lib/fluent/plugin/filter_influxdb_deduplication.rb, line 49
def time_deduplication(time, record)
  if time.is_a?(Integer)
    input_time = Fluent::EventTime.new(time)
  elsif time.is_a?(Fluent::EventTime)
    input_time = time
  else
    @log.error("unreadable time")
    return nil
  end

  nano_time = input_time.sec * 1000000000

  if input_time.sec < @last_timestamp
    @log.debug("out of sequence timestamp")
    if @order_key
      record[@order_key] = false
      record[@time.key] = nano_time
    else
      @log.debug("out of order record dropped")
      return nil
    end
  elsif input_time.sec == @last_timestamp and @sequence < 999999999
    @sequence = @sequence + 1
    record[@time.key] = nano_time + @sequence
    if @order_key
      record[@order_key] = true
    end
  elsif input_time.sec == @last_timestamp and @sequence == 999999999
    @log.error("received more then 999999999 records in a second")
    return nil
  else
    @sequence = 0
    @last_timestamp = input_time.sec
    record[@time.key] = nano_time
    if @order_key
      record[@order_key] = true
    end
  end

  record
end