class Fluent::DedupOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dedup.rb, line 17 def configure(conf) super unless conf.include?('key') raise Fluent::ConfigError, "config parameter `key` is required" end @states = {} end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_dedup.rb, line 37 def emit(tag, es, chain) es.each do |time, record| next if dup?(tag, record) Fluent::Engine.emit("dedup.#{tag}", time, record) end chain.next end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dedup.rb, line 31 def shutdown super save_states end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dedup.rb, line 25 def start super restore_states end
Private Instance Methods
dup?(tag, record)
click to toggle source
# File lib/fluent/plugin/out_dedup.rb, line 70 def dup?(tag, record) is_dup = false if record.include?(@key) @states[tag] = new_lru unless @states.include?(tag) if @states[tag].fetch(record[@key]) is_dup = true else @states[tag][record[@key]] = true end else log.warn "record does not have key `#{@key}`, record: #{record.to_json}" end is_dup end
new_lru()
click to toggle source
# File lib/fluent/plugin/out_dedup.rb, line 85 def new_lru if 0 < @cache_ttl LruRedux::TTL::ThreadSafeCache.new(@cache_per_tag, @cache_ttl) else LruRedux::ThreadSafeCache.new(@cache_per_tag) end end
restore_states()
click to toggle source
# File lib/fluent/plugin/out_dedup.rb, line 47 def restore_states if not @file.nil? and File.file?(@file) dump = JSON.parse(File.open(@file).read) rescue {} dump.each do |tag, ids| lru = new_lru ids.each {|id| lru[id] = true} @states[tag] = lru end end end
save_states()
click to toggle source
# File lib/fluent/plugin/out_dedup.rb, line 58 def save_states unless @file.nil? File.open(@file, 'wb') do |f| dump = {} @states.each do |tag, lru| dump[tag] = lru.to_a.map(&:first) end f.print(dump.to_json) end end end