class Fluent::Plugin::FlowCounterOutput

Constants

FOR_MISSING

Attributes

count_all[RW]
counts[RW]
last_checked[RW]
tick[R]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_flowcounter.rb, line 26
def configure(conf)
  super

  @tick = case @unit
          when :second then 1
          when :minute then 60
          when :hour then 3600
          when :day then 86400
          else
            raise Fluent::ConfigError, "flowcounter unit allows second/minute/hour/day"
          end
  if @output_style == :tagged and @aggregate != :tag
    raise Fluent::ConfigError, "flowcounter aggregate must be 'tag' when output_style is 'tagged'"
  end
  if @input_tag_remove_prefix
    @removed_prefix_string = @input_tag_remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end
  @count_all = false
  if @count_keys && !@count_keys.empty?
    @count_all = (@count_keys == ['*'])
    @count_bytes = true
  else
    @count_bytes = false
  end

  if @timestamp_counting
    @timestamp_timezone_offset = 0
    if @unit == :second
      raise Fluent::ConfigError, "timestamp_counting cannot be enabled with unit: second"
    elsif @unit == :day
      unless @timestamp_timezone
        raise Fluent::ConfigError, "timestamp_counting requires timestamp_timezone to be configured (e.g., '-0700') if unit is day"
      end
      @timestamp_timezone_offset = Time.zone_offset(@timestamp_timezone)
      unless @timestamp_timezone_offset
        raise Fluent::ConfigError, "invalid timestamp_timezone value (specify like '-0700')"
      end
    end
    @last_checked = nil
    @initializer = ->{ now = Fluent::EventTime.now.to_i; @last_checked = now - (now % @tick) - @timestamp_timezone_offset }
    @checker = ->{ Fluent::EventTime.now.to_i - @last_checked >= @tick }
    @updater = ->{ @last_checked += @tick; return Fluent::EventTime.new(@last_checked, 0), @tick }
  else
    @last_checked = nil
    @initializer = ->{ @last_checked = Fluent::Clock.now }
    @checker = ->{ Fluent::Clock.now - @last_checked >= @tick }
    @updater = ->{ prev = @last_checked; @last_checked = Fluent::Clock.now; return Fluent::EventTime.now, @last_checked - prev }
  end

  @counts = count_initialized
  @mutex = Mutex.new
end
count_initialized(keys=nil) click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 96
def count_initialized(keys=nil)
  if @aggregate == :all
    if @count_bytes
      {'count' => 0, 'bytes' => 0}
    else
      {'count' => 0}
    end
  elsif keys
    values = Array.new(keys.length){|i| 0 }
    Hash[[keys, values].transpose]
  else
    {}
  end
end
countup(name, counts, bytes) click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 111
def countup(name, counts, bytes)
  c = 'count'
  b = 'bytes'
  if @aggregate == :tag
    c = name + delimiter + 'count'
    b = name + delimiter + 'bytes' if @count_bytes
  end
  @mutex.synchronize {
    @counts[c] = (@counts[c] || 0) + counts
    @counts[b] = (@counts[b] || 0) + bytes if @count_bytes
  }
end
flush(step) click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 132
def flush(step)
  keys = delete_idle ? nil : @counts.keys
  flushed,@counts = @counts,count_initialized(keys)
  generate_output(flushed, step)
end
flush_emit(now, step) click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 155
def flush_emit(now, step)
  if @output_style == :tagged
    tagged_flush(step).each do |data|
      router.emit(@tag, now, data)
    end
  else
    router.emit(@tag, now, flush(step))
  end
end
generate_output(counts, step) click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 124
def generate_output(counts, step)
  rates = {}
  counts.keys.each {|key|
    rates[key + '_rate'] = ((counts[key] * 100.0) / (1.00 * step)).floor / 100.0
  }
  counts.update(rates)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 80
def multi_workers_ready?
  true
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 167
def process(tag, es)
  name = tag
  if @input_tag_remove_prefix and
      ( (tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @input_tag_remove_prefix)
    name = tag[@removed_length..-1]
  end
  c,b = 0,0
  if @count_all
    es.each {|time,record|
      c += 1
      b += record.to_msgpack.bytesize if @count_bytes
    }
  else
    es.each {|time,record|
      c += 1
      b += @count_keys.inject(0){|s,k| s + (record[k] || FOR_MISSING).bytesize} if @count_bytes
    }
  end
  countup(name, c, b)
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_flowcounter.rb, line 84
def start
  super

  @initializer.call
  timer_execute(:out_flowcounter_watcher, 0.5) do
    if @checker.call
      now, interval = @updater.call
      flush_emit(now, interval)
    end
  end
end
tagged_flush(step) click to toggle source
# File lib/fluent/plugin/out_flowcounter.rb, line 138
def tagged_flush(step)
  keys = delete_idle ? nil : @counts.keys
  flushed,@counts = @counts,count_initialized(keys)
  names = flushed.keys.select {|x| x.end_with?(delimiter + 'count')}.map {|x| x.chomp(delimiter + 'count')}
  names.map {|name|
    counts = {
      'count' => flushed[name + delimiter + 'count'],
    }
    if @count_bytes
      counts['bytes'] = flushed[name + delimiter + 'bytes']
    end
    data = generate_output(counts, step)
    data['tag'] = name
    data
  }
end