class Fluent::Plugin::DataCounterOutput

Constants

DEFAULT_STORAGE_TYPE
PATTERN_MAX_NUM

Attributes

counts[RW]
last_checked[RW]
saved_at[RW]
saved_duration[RW]
tick[RW]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datacounter.rb, line 57
def configure(conf)
  super

  if @count_interval
    @tick = @count_interval.to_i
  else
    @tick = case @unit
            when :minute then 60
            when :hour then 3600
            when :day then 86400
            else
              raise RuntimeError, "BUG: unknown unit: #{@unit}"
            end
  end

  @patterns = [[0, 'unmatched', nil]]
  pattern_names = ['unmatched']

  pattern_keys = conf.keys.select{|k| k =~ /^pattern(\d+)$/}
  invalids = pattern_keys.select{|arg| arg =~ /^pattern(\d+)/ and not (1..PATTERN_MAX_NUM).include?($1.to_i)}
  if invalids.size > 0
    log.warn "invalid number patterns (valid pattern number:1-20)", invalids: invalids
  end
  (1..PATTERN_MAX_NUM).each do |i|
    next unless conf["pattern#{i}"]
    name,regexp = conf["pattern#{i}"].split(' ', 2)
    @patterns.push([i, name, Regexp.new(regexp)])
    pattern_names.push(name)
  end
  pattern_index_list = conf.keys.select{|s| s =~ /^pattern\d$/}.map{|v| (/^pattern(\d)$/.match(v))[1].to_i}
  unless pattern_index_list.reduce(true){|v,i| v and @patterns[i]}
    raise Fluent::ConfigError, "jump of pattern index found"
  end
  unless @patterns.length == pattern_names.uniq.length
    raise Fluent::ConfigError, "duplicated pattern names"
  end

  if @output_per_tag
    raise Fluent::ConfigError, "tag_prefix must be specified with output_per_tag" unless @tag_prefix
    @tag_prefix_string = @tag_prefix + '.'
  end

  if @input_tag_remove_prefix
    @removed_prefix_string = @input_tag_remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end

  if @store_storage
    @storage = storage_create(usage: 'resume')
  end

  if system_config.workers > 1
    log.warn "Fluentd is now working with multi process workers, and datacounter plugin will produce counter results in each separeted processes."
  end

  @counts = count_initialized
  @mutex = Mutex.new
end
count_initialized(keys=nil) click to toggle source
# File lib/fluent/plugin/out_datacounter.rb, line 134
def count_initialized(keys=nil)
  # counts['tag'][num] = count
  # counts['tag'][-1] = sum
  if @aggregate == :all
    {'all' => ([0] * (@patterns.length + 1))}
  elsif keys
    values = Array.new(keys.length) {|i|
      Array.new(@patterns.length + 1){|j| 0 }
    }
    Hash[[keys, values].transpose]
  else
    {}
  end
end
countups(tag, counts) click to toggle source
# File lib/fluent/plugin/out_datacounter.rb, line 149
def countups(tag, counts)
  if @aggregate == :all
    tag = 'all'
  end

  @mutex.synchronize {
    @counts[tag] ||= [0] * (@patterns.length + 1)
    sum = 0
    counts.each_with_index do |count, i|
      sum += count
      @counts[tag][i] += count
    end
    @counts[tag][-1] += sum
  }
end
flush(step) click to toggle source
# File lib/fluent/plugin/out_datacounter.rb, line 219
def flush(step) # returns one message
  flushed = nil
  @mutex.synchronize {
    flushed,@counts = @counts,count_initialized(@counts.keys.dup.select{|k| @counts[k][-1] > 0})
  }
  generate_output(flushed, step)
end
flush_emit(step) click to toggle source
# File lib/fluent/plugin/out_datacounter.rb, line 235
def flush_emit(step)
  time = Fluent::Engine.now
  if @output_per_tag
    # tag - message maps
    flush_per_tags(step).each do |tag,message|
      router.emit(@tag_prefix_string + tag, time, message)
    end
  else
    message = flush(step)
    if message.keys.size > 0
      router.emit(@tag, time, message)
    end
  end
end
flush_per_tags(step) click to toggle source
# File lib/fluent/plugin/out_datacounter.rb, line 227
def flush_per_tags(step) # returns map of tag - message
  flushed = nil
  @mutex.synchronize {
    flushed,@counts = @counts,count_initialized(@counts.keys.dup.select{|k| @counts[k][-1] > 0})
  }
  generate_output_per_tags(flushed, step)
end
generate_fields(step, target_counts, attr_prefix, output) click to toggle source
# File lib/fluent/plugin/out_datacounter.rb, line 172
def generate_fields(step, target_counts, attr_prefix, output)
  sum = if @outcast_unmatched
          target_counts[1..-2].inject(:+)
        else
          target_counts[-1]
        end
  messages = target_counts.delete_at(-1)

  target_counts.each_with_index do |count,i|
    name = @patterns[i][1]
    output[attr_prefix + name + '_count'] = count
    output[attr_prefix + name + '_rate'] = ((count * 100.0) / (1.00 * step)).floor / 100.0
    unless i == 0 and @outcast_unmatched
      output[attr_prefix + name + '_percentage'] = count * 100.0 / (1.00 * sum) if sum > 0
    end
    if @output_messages
      output[attr_prefix + 'messages'] = messages
    end
  end

  output
end
generate_output(counts, step) click to toggle source
# File lib/fluent/plugin/out_datacounter.rb, line 195
def generate_output(counts, step)
  if @aggregate == :all
    return generate_fields(step, counts['all'], '', {})
  end

  output = {}
  counts.keys.each do |tag|
    generate_fields(step, counts[tag], stripped_tag(tag) + '_', output)
  end
  output
end
generate_output_per_tags(counts, step) click to toggle source
# File lib/fluent/plugin/out_datacounter.rb, line 207
def generate_output_per_tags(counts, step)
  if @aggregate == :all
    return {'all' => generate_fields(step, counts['all'], '', {})}
  end

  output_pairs = {}
  counts.keys.each do |tag|
    output_pairs[stripped_tag(tag)] = generate_fields(step, counts[tag], '', {})
  end
  output_pairs
end
load_status(tick) click to toggle source

Load internal status from a storage

@param [Interger] tick The count interval

# File lib/fluent/plugin/out_datacounter.rb, line 307
def load_status(tick)
  stored = @storage.get(:stored_value)
  return unless stored

  begin
    patterns = stored["patterns"].map{|idx, label, regexp|
      if regexp
        [idx, label, Regexp.compile(regexp)]
      else
        [idx, label, regexp]
      end
    }

    if stored["aggregate"] == @aggregate.to_s and
      stored["count_key"] == @count_key and
      patterns == @patterns

      if Fluent::Engine.now <= stored["saved_at"] + tick
        @mutex.synchronize {
          @counts = stored["counts"]
          @saved_at = stored["saved_at"]
          @saved_duration = stored["saved_duration"]

          # skip the saved duration to continue counting
          @last_checked = Fluent::Engine.now - @saved_duration
        }
      else
        log.warn "stored data is outdated. ignore stored data"
      end
    else
      log.warn "configuration param was changed. ignore stored data"
    end
  rescue => e
    log.warn "Can't load store_storage", error: e
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_datacounter.rb, line 116
def multi_workers_ready?
  true
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_datacounter.rb, line 257
def process(tag, es)
  c = [0] * @patterns.length

  es.each do |time,record|
    value = record[@count_key]
    next if value.nil?

    value = value.to_s.force_encoding('ASCII-8BIT')
    matched = false
    @patterns.each do |index, name, regexp|
      next unless regexp and regexp.match(value)
      c[index] += 1
      matched = true
      break
    end
    c[0] += 1 unless matched
  end
  countups(tag, c)
end
save_status() click to toggle source

Store internal status into a storage

# File lib/fluent/plugin/out_datacounter.rb, line 279
def save_status()
  begin
    @saved_at = Fluent::Engine.now
    @saved_duration = @saved_at - @last_checked
    patterns = @patterns.map{|idx, label, regexp|
      if regexp
        [idx, label, regexp.source]
      else
        [idx, label, regexp]
      end
    }
    value = {
      "counts"         => @counts,
      "saved_at"       => @saved_at,
      "saved_duration" => @saved_duration,
      "aggregate"      => @aggregate,
      "count_key"      => @count_key.to_s,
      "patterns"       => patterns,
    }
    @storage.put(:stored_value, value)
  rescue => e
    log.warn "Can't write store_storage", error: e
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datacounter.rb, line 129
def shutdown
  save_status() if @store_storage
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datacounter.rb, line 120
def start
  super

  load_status(@tick) if @store_storage

  @last_checked = Fluent::Engine.now
  timer_execute(:out_datacounter_timer, @tick, &method(:watch))
end
stripped_tag(tag) click to toggle source
# File lib/fluent/plugin/out_datacounter.rb, line 165
def stripped_tag(tag)
  return tag unless @input_tag_remove_prefix
  return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length
  return tag[@removed_length..-1] if tag == @input_tag_remove_prefix
  tag
end
watch() click to toggle source
# File lib/fluent/plugin/out_datacounter.rb, line 250
def watch
  # instance variable, and public accessable, for test
  now = Fluent::Engine.now
  flush_emit(now - @last_checked)
  @last_checked = now
end