class Fluent::LossyCountOutput

Public Instance Methods

configure(config) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_lossycount.rb, line 16
def configure(config)
    super
    unless config.has_key?('key_name')
        raise Fluent::ConfigError, "you must set 'key_name'"
    end
    unless config.has_key?('output_tag')
        raise Fluent::ConfigError, "you must set 'output_tag'"
    end

    @mutex = Mutex.new
    @sleep_wait = 0.4
    init_counter()
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_lossycount.rb, line 93
def emit(tag, es, chain)
    es.each {|time,record|
        k = traverse(record, @key_name)
        if k
            if k.is_a?(Array)
                k.each{ |v|
                    @counter.add(v.to_s)
                }
            else
                @counter.add(k.to_s)
            end
        end
    }
    chain.next
end
flush_counter(step) click to toggle source
# File lib/fluent/plugin/out_lossycount.rb, line 45
def flush_counter(step)
    freq_counter = {}
    metrics = {}
    @mutex.synchronize {
        freq_counter = @counter.get()
        if @enable_metrics
            metrics = @counter.get_metrics()
        end 
        init_counter()
    }
    flush_time = Fluent::Engine.now.to_i
    if @verbose
        $log.info "flushtime : " + flush_time.to_s
        $log.info "{ "
    end
    freq_counter.each_pair { |key, value|
        map = {@output_key_name => key, @output_timestamp_name => flush_time, @output_value_name => value}
        if @verbose
            $log.info map
        end
        Fluent::Engine.emit(@output_tag, Fluent::Engine.now, map)
    }
    if @verbose
        $log.info "}"
    end

    if @enable_metrics
        if @verbose
            $log.info "metrics : " + metrics.to_s
        end
        Fluent::Engine.emit(@metrics_tag, Fluent::Engine.now, metrics)
    end
end
init_counter() click to toggle source
# File lib/fluent/plugin/out_lossycount.rb, line 41
def init_counter()
    @counter = Counter::LossyCounter.new({:gamma => @gamma, :epsilon => @epsilon})
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_lossycount.rb, line 35
def shutdown
    super
    @watcher.terminate
    @watcher.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_lossycount.rb, line 30
def start
    super
    start_watch
end
start_watch() click to toggle source
# File lib/fluent/plugin/out_lossycount.rb, line 79
def start_watch
    @watcher = Thread.new{
        @last_checked = Fluent::Engine.now.to_i
        while true
            sleep @sleep_wait
            now = Fluent::Engine.now.to_i
            if now - @last_checked >= @time_windows
                flush_counter(now - @last_checked)
                @last_checked = now
            end
        end
    }
end
traverse(data, key) click to toggle source
# File lib/fluent/plugin/out_lossycount.rb, line 109
def traverse(data, key)
    val = data
    key.split('.').each{ |k|
        if val.has_key?(k)
            val = val[k]
        else
            return nil
        end
    }
    return val
end