class Fluent::Plugin::FlowCounterOutput
Constants
- FOR_MISSING
Attributes
count_all[RW]
counts[RW]
last_checked[RW]
tick[R]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_flowcounter.rb, line 26 def configure(conf) super @tick = case @unit when :second then 1 when :minute then 60 when :hour then 3600 when :day then 86400 else raise Fluent::ConfigError, "flowcounter unit allows second/minute/hour/day" end if @output_style == :tagged and @aggregate != :tag raise Fluent::ConfigError, "flowcounter aggregate must be 'tag' when output_style is 'tagged'" end if @input_tag_remove_prefix @removed_prefix_string = @input_tag_remove_prefix + '.' @removed_length = @removed_prefix_string.length end @count_all = false if @count_keys && !@count_keys.empty? @count_all = (@count_keys == ['*']) @count_bytes = true else @count_bytes = false end if @timestamp_counting @timestamp_timezone_offset = 0 if @unit == :second raise Fluent::ConfigError, "timestamp_counting cannot be enabled with unit: second" elsif @unit == :day unless @timestamp_timezone raise Fluent::ConfigError, "timestamp_counting requires timestamp_timezone to be configured (e.g., '-0700') if unit is day" end @timestamp_timezone_offset = Time.zone_offset(@timestamp_timezone) unless @timestamp_timezone_offset raise Fluent::ConfigError, "invalid timestamp_timezone value (specify like '-0700')" end end @last_checked = nil @initializer = ->{ now = Fluent::EventTime.now.to_i; @last_checked = now - (now % @tick) - @timestamp_timezone_offset } @checker = ->{ Fluent::EventTime.now.to_i - @last_checked >= @tick } @updater = ->{ @last_checked += @tick; return Fluent::EventTime.new(@last_checked, 0), @tick } else @last_checked = nil @initializer = ->{ @last_checked = Fluent::Clock.now } @checker = ->{ Fluent::Clock.now - @last_checked >= @tick } @updater = ->{ prev = @last_checked; @last_checked = Fluent::Clock.now; return Fluent::EventTime.now, @last_checked - prev } end @counts = count_initialized @mutex = Mutex.new end
count_initialized(keys=nil)
click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 96 def count_initialized(keys=nil) if @aggregate == :all if @count_bytes {'count' => 0, 'bytes' => 0} else {'count' => 0} end elsif keys values = Array.new(keys.length){|i| 0 } Hash[[keys, values].transpose] else {} end end
countup(name, counts, bytes)
click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 111 def countup(name, counts, bytes) c = 'count' b = 'bytes' if @aggregate == :tag c = name + delimiter + 'count' b = name + delimiter + 'bytes' if @count_bytes end @mutex.synchronize { @counts[c] = (@counts[c] || 0) + counts @counts[b] = (@counts[b] || 0) + bytes if @count_bytes } end
flush(step)
click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 132 def flush(step) keys = delete_idle ? nil : @counts.keys flushed,@counts = @counts,count_initialized(keys) generate_output(flushed, step) end
flush_emit(now, step)
click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 155 def flush_emit(now, step) if @output_style == :tagged tagged_flush(step).each do |data| router.emit(@tag, now, data) end else router.emit(@tag, now, flush(step)) end end
generate_output(counts, step)
click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 124 def generate_output(counts, step) rates = {} counts.keys.each {|key| rates[key + '_rate'] = ((counts[key] * 100.0) / (1.00 * step)).floor / 100.0 } counts.update(rates) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 80 def multi_workers_ready? true end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 167 def process(tag, es) name = tag if @input_tag_remove_prefix and ( (tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @input_tag_remove_prefix) name = tag[@removed_length..-1] end c,b = 0,0 if @count_all es.each {|time,record| c += 1 b += record.to_msgpack.bytesize if @count_bytes } else es.each {|time,record| c += 1 b += @count_keys.inject(0){|s,k| s + (record[k] || FOR_MISSING).bytesize} if @count_bytes } end countup(name, c, b) end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_flowcounter.rb, line 84 def start super @initializer.call timer_execute(:out_flowcounter_watcher, 0.5) do if @checker.call now, interval = @updater.call flush_emit(now, interval) end end end
tagged_flush(step)
click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 138 def tagged_flush(step) keys = delete_idle ? nil : @counts.keys flushed,@counts = @counts,count_initialized(keys) names = flushed.keys.select {|x| x.end_with?(delimiter + 'count')}.map {|x| x.chomp(delimiter + 'count')} names.map {|name| counts = { 'count' => flushed[name + delimiter + 'count'], } if @count_bytes counts['bytes'] = flushed[name + delimiter + 'bytes'] end data = generate_output(counts, step) data['tag'] = name data } end