class Fluent::DedupOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dedup.rb, line 17
def configure(conf)
  super
  unless conf.include?('key')
    raise Fluent::ConfigError, "config parameter `key` is required"
  end
  @states = {}
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_dedup.rb, line 37
def emit(tag, es, chain)
  es.each do |time, record|
    next if dup?(tag, record)
    Fluent::Engine.emit("dedup.#{tag}", time, record)
  end

  chain.next
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dedup.rb, line 31
def shutdown
  super

  save_states
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dedup.rb, line 25
def start
  super

  restore_states
end

Private Instance Methods

dup?(tag, record) click to toggle source
# File lib/fluent/plugin/out_dedup.rb, line 70
def dup?(tag, record)
  is_dup = false
  if record.include?(@key)
    @states[tag] = new_lru unless @states.include?(tag)
    if @states[tag].fetch(record[@key])
      is_dup = true
    else
      @states[tag][record[@key]] = true
    end
  else
    log.warn "record does not have key `#{@key}`, record: #{record.to_json}"
  end
  is_dup
end
new_lru() click to toggle source
# File lib/fluent/plugin/out_dedup.rb, line 85
def new_lru
  if 0 < @cache_ttl
    LruRedux::TTL::ThreadSafeCache.new(@cache_per_tag, @cache_ttl)
  else
    LruRedux::ThreadSafeCache.new(@cache_per_tag)
  end
end
restore_states() click to toggle source
# File lib/fluent/plugin/out_dedup.rb, line 47
def restore_states
  if not @file.nil? and File.file?(@file)
    dump = JSON.parse(File.open(@file).read) rescue {}
    dump.each do |tag, ids|
      lru = new_lru
      ids.each {|id| lru[id] = true}
      @states[tag] = lru
    end
  end
end
save_states() click to toggle source
# File lib/fluent/plugin/out_dedup.rb, line 58
def save_states
  unless @file.nil?
    File.open(@file, 'wb') do |f|
      dump = {}
      @states.each do |tag, lru|
        dump[tag] = lru.to_a.map(&:first)
      end
      f.print(dump.to_json)
    end
  end
end