class Fluent::UniqueCounterOutput
Constants
- UNITS
Attributes
counts[RW]
last_checked[RW]
tick[RW]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_unique_counter.rb, line 22 def configure(conf) super if @unit and UNITS[@unit.to_sym].nil? raise Fluent::ConfigError, "'unit' must be one of minutes/hours/days" end @count_interval = UNITS[@unit.to_sym] if @unit @counts = [] @mutex = Mutex.new end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_unique_counter.rb, line 69 def emit(tag, es, chain) c = [] es.each do |time,record| value = record[@unique_key] next if value.nil? c << value.to_s.force_encoding('ASCII-8BIT') end @mutex.synchronize { @counts += c } chain.next end
flush_emit()
click to toggle source
# File lib/fluent/plugin/out_unique_counter.rb, line 45 def flush_emit flushed,@counts = @counts,[] message = {'unique_count' => flushed.uniq.count } Fluent::Engine.emit(@tag, Fluent::Engine.now, message) end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_unique_counter.rb, line 39 def shutdown super @watcher.terminate @watcher.join end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_unique_counter.rb, line 34 def start super start_watch end
start_watch()
click to toggle source
# File lib/fluent/plugin/out_unique_counter.rb, line 51 def start_watch # for internal, or tests only @watcher = Thread.new(&method(:watch)) end
watch()
click to toggle source
# File lib/fluent/plugin/out_unique_counter.rb, line 56 def watch # instance variable, and public accessable, for test @last_checked = Fluent::Engine.now while true sleep 0.5 if (Fluent::Engine.now - @last_checked) >= @count_interval now = Fluent::Engine.now flush_emit @last_checked = now end end end