class Fluent::LossyCountOutput
Public Instance Methods
configure(config)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_lossycount.rb, line 16 def configure(config) super unless config.has_key?('key_name') raise Fluent::ConfigError, "you must set 'key_name'" end unless config.has_key?('output_tag') raise Fluent::ConfigError, "you must set 'output_tag'" end @mutex = Mutex.new @sleep_wait = 0.4 init_counter() end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_lossycount.rb, line 93 def emit(tag, es, chain) es.each {|time,record| k = traverse(record, @key_name) if k if k.is_a?(Array) k.each{ |v| @counter.add(v.to_s) } else @counter.add(k.to_s) end end } chain.next end
flush_counter(step)
click to toggle source
# File lib/fluent/plugin/out_lossycount.rb, line 45 def flush_counter(step) freq_counter = {} metrics = {} @mutex.synchronize { freq_counter = @counter.get() if @enable_metrics metrics = @counter.get_metrics() end init_counter() } flush_time = Fluent::Engine.now.to_i if @verbose $log.info "flushtime : " + flush_time.to_s $log.info "{ " end freq_counter.each_pair { |key, value| map = {@output_key_name => key, @output_timestamp_name => flush_time, @output_value_name => value} if @verbose $log.info map end Fluent::Engine.emit(@output_tag, Fluent::Engine.now, map) } if @verbose $log.info "}" end if @enable_metrics if @verbose $log.info "metrics : " + metrics.to_s end Fluent::Engine.emit(@metrics_tag, Fluent::Engine.now, metrics) end end
init_counter()
click to toggle source
# File lib/fluent/plugin/out_lossycount.rb, line 41 def init_counter() @counter = Counter::LossyCounter.new({:gamma => @gamma, :epsilon => @epsilon}) end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_lossycount.rb, line 35 def shutdown super @watcher.terminate @watcher.join end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_lossycount.rb, line 30 def start super start_watch end
start_watch()
click to toggle source
# File lib/fluent/plugin/out_lossycount.rb, line 79 def start_watch @watcher = Thread.new{ @last_checked = Fluent::Engine.now.to_i while true sleep @sleep_wait now = Fluent::Engine.now.to_i if now - @last_checked >= @time_windows flush_counter(now - @last_checked) @last_checked = now end end } end
traverse(data, key)
click to toggle source
# File lib/fluent/plugin/out_lossycount.rb, line 109 def traverse(data, key) val = data key.split('.').each{ |k| if val.has_key?(k) val = val[k] else return nil end } return val end