class Fluent::GroupCounterOutput
Constants
- PATTERN_MAX_NUM
Attributes
count_interval[RW]
counts[RW]
last_checked[RW]
saved_at[RW]
saved_duration[RW]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_groupcounter.rb, line 6 def initialize super require 'pathname' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_groupcounter.rb, line 39 def configure(conf) super if @count_interval @count_interval = @count_interval.to_i else @count_interval = case @unit when 'minute' then 60 when 'hour' then 3600 when 'day' then 86400 else raise RuntimeError, "@unit must be one of minute/hour/day" end end @aggregate = case @aggregate when 'tag' then :tag when 'all' then :all else raise Fluent::ConfigError, "groupcounter aggregate allows tag/all" end @add_tag_prefix ||= @tag_prefix @remove_tag_prefix ||= @input_tag_remove_prefix if @output_per_tag raise Fluent::ConfigError, "add_tag_prefix must be specified with output_per_tag" unless @add_tag_prefix end if @add_tag_prefix @tag_prefix_string = @add_tag_prefix + '.' else @tag_prefix_string = '' end if @remove_tag_prefix @removed_prefix_string = @remove_tag_prefix + '.' @removed_length = @removed_prefix_string.length end @group_by_keys = @group_by_keys.split(',') if @group_by_keys @pattern = {} (1..PATTERN_MAX_NUM).each do |i| next unless conf["pattern#{i}"] replace, regexp = conf["pattern#{i}"].split(/ +/, 2) raise Fluent::ConfigError, "pattern#{i} does not contain 2 parameters" unless regexp @pattern[replace] = Regexp.compile(regexp) end if @store_file f = Pathname.new(@store_file) if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?) raise Fluent::ConfigError, "#{@store_file} is not writable" end end @counts = count_initialized @hostname = Socket.gethostname @mutex = Mutex.new end
count_initialized()
click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 111 def count_initialized {} end
countup(counts, count)
click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 232 def countup(counts, count) counts[:count] = sum(counts[:count], count[:count]) counts[:sum] = sum(counts[:sum], count[:sum]) if @avg_key and count[:sum] counts[:max] = max(counts[:max], count[:max]) if @max_key and count[:max] counts[:min] = min(counts[:min], count[:min]) if @min_key and count[:min] end
emit(tag, es, chain)
click to toggle source
recieve messages at here
# File lib/fluent/plugin/out_groupcounter.rb, line 193 def emit(tag, es, chain) group_counts = {} tags = tag.split('.') es.each do |time, record| count = {} count[:count] = 1 count[:sum] = record[@avg_key].to_f if @avg_key and record[@avg_key] count[:max] = record[@max_key].to_f if @max_key and record[@max_key] count[:min] = record[@min_key].to_f if @min_key and record[@min_key] group_key = group_key(tag, time, record) group_counts[group_key] ||= {} countup(group_counts[group_key], count) end summarize_counts(tag, group_counts) chain.next rescue => e $log.warn "#{e.class} #{e.message} #{e.backtrace.first}" end
flush()
click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 152 def flush flushed, @counts = @counts, count_initialized() generate_output(flushed) end
flush_emit()
click to toggle source
this method emits messages (periodically called)
# File lib/fluent/plugin/out_groupcounter.rb, line 158 def flush_emit time = Fluent::Engine.now if @output_per_tag flush.each do |tag, message| Fluent::Engine.emit("#{@tag_prefix_string}#{tag}", time, message) end else message = flush Fluent::Engine.emit(@tag, time, message) unless message.empty? end end
generate_fields(counts_per_tag, output = {}, key_prefix = '')
click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 115 def generate_fields(counts_per_tag, output = {}, key_prefix = '') return {} unless counts_per_tag # total_count = counts_per_tag.delete('__total_count') counts_per_tag.each do |group_key, count| group_key_with = group_key.empty? ? "" : group_key + @delimiter output[key_prefix + group_key + @count_suffix] = count[:count] if count[:count] output[key_prefix + group_key_with + "#{@min_key}#{@min_suffix}"] = count[:min] if count[:min] output[key_prefix + group_key_with + "#{@max_key}#{@max_suffix}"] = count[:max] if count[:max] output[key_prefix + group_key_with + "#{@avg_key}#{@avg_suffix}"] = count[:sum] / (count[:count] * 1.0) if count[:sum] and count[:count] > 0 # output[key_prefix + group_key_with + "rate"] = ((count[:count] * 100.0) / (1.00 * step)).floor / 100.0 # output[key_prefix + group_key_with + "percentage"] = count[:count] * 100.0 / (1.00 * total_count) if total_count > 0 end output end
generate_output(counts)
click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 132 def generate_output(counts) if @output_per_tag # tag => output return {'all' => generate_fields(counts['all'])} if @aggregate == :all output_pairs = {} counts.keys.each do |tag| output_pairs[stripped_tag(tag)] = generate_fields(counts[tag]) end output_pairs else return generate_fields(counts['all']) if @aggregate == :all output = {} counts.keys.each do |tag| generate_fields(counts[tag], output, stripped_tag(tag) + @delimiter) end output end end
group_key(tag, time, record)
click to toggle source
Expand record with @group_by_keys, and get a value to be a group_key
# File lib/fluent/plugin/out_groupcounter.rb, line 240 def group_key(tag, time, record) if @group_by_expression tags = tag.split('.') group_key = expand_placeholder(@group_by_expression, record, tag, tags, Time.at(time)) elsif @group_by_keys values = @group_by_keys.map {|key| record[key] || 'undef'} group_key = values.join(@delimiter) else return "" end group_key = group_key.to_s.force_encoding('ASCII-8BIT') @pattern.each {|replace, regexp| break if group_key.gsub!(regexp, replace) } group_key end
load_status(file_path, count_interval)
click to toggle source
Load internal status from a file
@param [String] file_path @param [Interger] count_interval
# File lib/fluent/plugin/out_groupcounter.rb, line 306 def load_status(file_path, count_interval) return unless (f = Pathname.new(file_path)).exist? begin f.open('rb') do |f| stored = Marshal.load(f) if stored[:aggregate] == @aggregate and stored[:group_by_keys] == @group_by_keys and if Fluent::Engine.now <= stored[:saved_at] + count_interval @counts = stored[:counts] @saved_at = stored[:saved_at] @saved_duration = stored[:saved_duration] # skip the saved duration to continue counting @last_checked = Fluent::Engine.now - @saved_duration else $log.warn "out_groupcounter: stored data is outdated. ignore stored data" end else $log.warn "out_groupcounter: configuration param was changed. ignore stored data" end end rescue => e $log.warn "out_groupcounter: Can't load store_file #{e.class} #{e.message}" end end
max(a, b)
click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 262 def max(a, b) return b if a.nil? return a if b.nil? a > b ? a : b end
min(a, b)
click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 268 def min(a, b) return b if a.nil? return a if b.nil? a > b ? b : a end
save_status(file_path)
click to toggle source
Store internal status into a file
@param [String] file_path
# File lib/fluent/plugin/out_groupcounter.rb, line 283 def save_status(file_path) begin Pathname.new(file_path).open('wb') do |f| @saved_at = Fluent::Engine.now @saved_duration = @saved_at - @last_checked Marshal.dump({ :counts => @counts, :saved_at => @saved_at, :saved_duration => @saved_duration, :aggregate => @aggregate, :group_by_keys => @group_by_keys, }, f) end rescue => e $log.warn "out_groupcounter: Can't write store_file #{e.class} #{e.message}" end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_groupcounter.rb, line 104 def shutdown super @watcher.terminate @watcher.join save_status(@store_file) if @store_file end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_groupcounter.rb, line 98 def start super load_status(@store_file, @count_interval) if @store_file start_watch end
start_watch()
click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 170 def start_watch # for internal, or tests only @watcher = Thread.new(&method(:watch)) end
stripped_tag(tag)
click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 274 def stripped_tag(tag) return tag unless @remove_tag_prefix return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length tag end
sum(a, b)
click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 256 def sum(a, b) a ||= 0 b ||= 0 a + b end
summarize_counts(tag, group_counts)
click to toggle source
Summarize counts for each tag
# File lib/fluent/plugin/out_groupcounter.rb, line 217 def summarize_counts(tag, group_counts) tag = 'all' if @aggregate == :all @counts[tag] ||= {} @mutex.synchronize { group_counts.each do |group_key, count| @counts[tag][group_key] ||= {} countup(@counts[tag][group_key], count) end # total_count = group_counts.map {|group_key, count| count[:count] }.inject(:+) # @counts[tag]['__total_count'] = sum(@counts[tag]['__total_count'], total_count) } end
watch()
click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 175 def watch # instance variable, and public accessable, for test @last_checked ||= Fluent::Engine.now while true sleep 0.5 begin if Fluent::Engine.now - @last_checked >= @count_interval now = Fluent::Engine.now flush_emit @last_checked = now end rescue => e $log.warn "#{e.class} #{e.message} #{e.backtrace.first}" end end end
Private Instance Methods
expand_placeholder(str, record, tag, tags, time)
click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 336 def expand_placeholder(str, record, tag, tags, time) struct = UndefOpenStruct.new(record) struct.tag = tag struct.tags = tags struct.time = time struct.hostname = @hostname str = str.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..} eval "\"#{str}\"", struct.instance_eval { binding } end