class Fluent::GroupCounterOutput

Constants

PATTERN_MAX_NUM

Attributes

count_interval[RW]
counts[RW]
last_checked[RW]
saved_at[RW]
saved_duration[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_groupcounter.rb, line 6
def initialize
  super
  require 'pathname'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_groupcounter.rb, line 39
def configure(conf)
  super

  if @count_interval
    @count_interval = @count_interval.to_i
  else
    @count_interval = case @unit
            when 'minute' then 60
            when 'hour' then 3600
            when 'day' then 86400
            else 
              raise RuntimeError, "@unit must be one of minute/hour/day"
            end
  end

  @aggregate = case @aggregate
               when 'tag' then :tag
               when 'all' then :all
               else
                 raise Fluent::ConfigError, "groupcounter aggregate allows tag/all"
               end

  @add_tag_prefix ||= @tag_prefix
  @remove_tag_prefix ||= @input_tag_remove_prefix
  if @output_per_tag
    raise Fluent::ConfigError, "add_tag_prefix must be specified with output_per_tag" unless @add_tag_prefix
  end
  if @add_tag_prefix
    @tag_prefix_string = @add_tag_prefix + '.'
  else
    @tag_prefix_string = ''
  end
  if @remove_tag_prefix
    @removed_prefix_string = @remove_tag_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end

  @group_by_keys = @group_by_keys.split(',') if @group_by_keys

  @pattern = {}
  (1..PATTERN_MAX_NUM).each do |i|
    next unless conf["pattern#{i}"]
    replace, regexp = conf["pattern#{i}"].split(/ +/, 2)
    raise Fluent::ConfigError, "pattern#{i} does not contain 2 parameters" unless regexp
    @pattern[replace] = Regexp.compile(regexp)
  end

  if @store_file
    f = Pathname.new(@store_file)
    if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?)
      raise Fluent::ConfigError, "#{@store_file} is not writable"
    end
  end

  @counts = count_initialized
  @hostname = Socket.gethostname
  @mutex = Mutex.new
end
count_initialized() click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 111
def count_initialized
  {}
end
countup(counts, count) click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 232
def countup(counts, count)
  counts[:count] = sum(counts[:count], count[:count])
  counts[:sum]   = sum(counts[:sum], count[:sum]) if @avg_key and count[:sum]
  counts[:max]   = max(counts[:max], count[:max]) if @max_key and count[:max]
  counts[:min]   = min(counts[:min], count[:min]) if @min_key and count[:min]
end
emit(tag, es, chain) click to toggle source

recieve messages at here

# File lib/fluent/plugin/out_groupcounter.rb, line 193
def emit(tag, es, chain)
  group_counts = {}

  tags = tag.split('.')
  es.each do |time, record|
    count = {}
    count[:count] = 1
    count[:sum] = record[@avg_key].to_f if @avg_key and record[@avg_key]
    count[:max] = record[@max_key].to_f if @max_key and record[@max_key]
    count[:min] = record[@min_key].to_f if @min_key and record[@min_key]

    group_key = group_key(tag, time, record)

    group_counts[group_key] ||= {}
    countup(group_counts[group_key], count)
  end
  summarize_counts(tag, group_counts)

  chain.next
rescue => e
  $log.warn "#{e.class} #{e.message} #{e.backtrace.first}"
end
flush() click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 152
def flush
  flushed, @counts = @counts, count_initialized()
  generate_output(flushed)
end
flush_emit() click to toggle source

this method emits messages (periodically called)

# File lib/fluent/plugin/out_groupcounter.rb, line 158
def flush_emit
  time = Fluent::Engine.now
  if @output_per_tag
    flush.each do |tag, message|
      Fluent::Engine.emit("#{@tag_prefix_string}#{tag}", time, message)
    end
  else
    message = flush
    Fluent::Engine.emit(@tag, time, message) unless message.empty?
  end
end
generate_fields(counts_per_tag, output = {}, key_prefix = '') click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 115
def generate_fields(counts_per_tag, output = {}, key_prefix = '')
  return {} unless counts_per_tag
  # total_count = counts_per_tag.delete('__total_count')

  counts_per_tag.each do |group_key, count|
    group_key_with = group_key.empty? ? "" : group_key + @delimiter
    output[key_prefix + group_key + @count_suffix] = count[:count] if count[:count]
    output[key_prefix + group_key_with + "#{@min_key}#{@min_suffix}"] = count[:min] if count[:min]
    output[key_prefix + group_key_with + "#{@max_key}#{@max_suffix}"] = count[:max] if count[:max]
    output[key_prefix + group_key_with + "#{@avg_key}#{@avg_suffix}"] = count[:sum] / (count[:count] * 1.0) if count[:sum] and count[:count] > 0
    # output[key_prefix + group_key_with + "rate"] = ((count[:count] * 100.0) / (1.00 * step)).floor / 100.0
    # output[key_prefix + group_key_with + "percentage"] = count[:count] * 100.0 / (1.00 * total_count) if total_count > 0
  end

  output
end
generate_output(counts) click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 132
def generate_output(counts)
  if @output_per_tag # tag => output
    return {'all' => generate_fields(counts['all'])} if @aggregate == :all

    output_pairs = {}
    counts.keys.each do |tag|
      output_pairs[stripped_tag(tag)] = generate_fields(counts[tag])
    end
    output_pairs
  else
    return generate_fields(counts['all']) if @aggregate == :all

    output = {}
    counts.keys.each do |tag|
      generate_fields(counts[tag], output, stripped_tag(tag) + @delimiter)
    end
    output
  end
end
group_key(tag, time, record) click to toggle source

Expand record with @group_by_keys, and get a value to be a group_key

# File lib/fluent/plugin/out_groupcounter.rb, line 240
def group_key(tag, time, record)
  if @group_by_expression
    tags = tag.split('.')
    group_key = expand_placeholder(@group_by_expression, record, tag, tags, Time.at(time))
  elsif @group_by_keys
    values = @group_by_keys.map {|key| record[key] || 'undef'}
    group_key = values.join(@delimiter)
  else
    return ""
  end
  group_key = group_key.to_s.force_encoding('ASCII-8BIT')

  @pattern.each {|replace, regexp| break if group_key.gsub!(regexp, replace) }
  group_key
end
load_status(file_path, count_interval) click to toggle source

Load internal status from a file

@param [String] file_path @param [Interger] count_interval

# File lib/fluent/plugin/out_groupcounter.rb, line 306
def load_status(file_path, count_interval)
  return unless (f = Pathname.new(file_path)).exist?

  begin
    f.open('rb') do |f|
      stored = Marshal.load(f)
      if stored[:aggregate] == @aggregate and
        stored[:group_by_keys] == @group_by_keys and

        if Fluent::Engine.now <= stored[:saved_at] + count_interval
          @counts = stored[:counts]
          @saved_at = stored[:saved_at]
          @saved_duration = stored[:saved_duration]

          # skip the saved duration to continue counting
          @last_checked = Fluent::Engine.now - @saved_duration
        else
          $log.warn "out_groupcounter: stored data is outdated. ignore stored data"
        end
      else
        $log.warn "out_groupcounter: configuration param was changed. ignore stored data"
      end
    end
  rescue => e
    $log.warn "out_groupcounter: Can't load store_file #{e.class} #{e.message}"
  end
end
max(a, b) click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 262
def max(a, b)
  return b if a.nil?
  return a if b.nil?
  a > b ? a : b
end
min(a, b) click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 268
def min(a, b)
  return b if a.nil?
  return a if b.nil?
  a > b ? b : a
end
save_status(file_path) click to toggle source

Store internal status into a file

@param [String] file_path

# File lib/fluent/plugin/out_groupcounter.rb, line 283
def save_status(file_path)

  begin
    Pathname.new(file_path).open('wb') do |f|
      @saved_at = Fluent::Engine.now
      @saved_duration = @saved_at - @last_checked
      Marshal.dump({
        :counts           => @counts,
        :saved_at         => @saved_at,
        :saved_duration   => @saved_duration,
        :aggregate        => @aggregate,
        :group_by_keys    => @group_by_keys,
      }, f)
    end
  rescue => e
    $log.warn "out_groupcounter: Can't write store_file #{e.class} #{e.message}"
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_groupcounter.rb, line 104
def shutdown
  super
  @watcher.terminate
  @watcher.join
  save_status(@store_file) if @store_file
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_groupcounter.rb, line 98
def start
  super
  load_status(@store_file, @count_interval) if @store_file
  start_watch
end
start_watch() click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 170
def start_watch
  # for internal, or tests only
  @watcher = Thread.new(&method(:watch))
end
stripped_tag(tag) click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 274
def stripped_tag(tag)
  return tag unless @remove_tag_prefix
  return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length
  tag
end
sum(a, b) click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 256
def sum(a, b)
  a ||= 0
  b ||= 0
  a + b
end
summarize_counts(tag, group_counts) click to toggle source

Summarize counts for each tag

# File lib/fluent/plugin/out_groupcounter.rb, line 217
def summarize_counts(tag, group_counts)
  tag = 'all' if @aggregate == :all
  @counts[tag] ||= {}
  
  @mutex.synchronize {
    group_counts.each do |group_key, count|
      @counts[tag][group_key] ||= {}
      countup(@counts[tag][group_key], count)
    end

    # total_count = group_counts.map {|group_key, count| count[:count] }.inject(:+)
    # @counts[tag]['__total_count'] = sum(@counts[tag]['__total_count'], total_count)
  }
end
watch() click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 175
def watch
  # instance variable, and public accessable, for test
  @last_checked ||= Fluent::Engine.now
  while true
    sleep 0.5
    begin
      if Fluent::Engine.now - @last_checked >= @count_interval
        now = Fluent::Engine.now
        flush_emit
        @last_checked = now
      end
    rescue => e
      $log.warn "#{e.class} #{e.message} #{e.backtrace.first}"
    end
  end
end

Private Instance Methods

expand_placeholder(str, record, tag, tags, time) click to toggle source
# File lib/fluent/plugin/out_groupcounter.rb, line 336
def expand_placeholder(str, record, tag, tags, time)
  struct = UndefOpenStruct.new(record)
  struct.tag  = tag
  struct.tags = tags
  struct.time = time
  struct.hostname = @hostname
  str = str.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..}
  eval "\"#{str}\"", struct.instance_eval { binding }
end