class Fluent::DeriveOutput

Constants

KEY_MAX_NUM

Attributes

key_pattern[R]

for test

key_pattern_adjustment[R]
keys[R]
prev[R]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_derive.rb, line 26
def configure(conf)
  super

  if @key_pattern
    key_pattern, @key_pattern_adjustment = @key_pattern.split(/ +/, 2)
    @key_pattern_adjustment = parse_adjustment(@key_pattern_adjustment)
    @key_pattern = Regexp.compile(key_pattern)
  else
    @keys = {}
    (1..KEY_MAX_NUM).each do |i|
      next unless conf["key#{i}"] 
      key, adjustment = conf["key#{i}"].split(/ +/, 2)
      adjustment = parse_adjustment(adjustment)
      @keys[key] = adjustment
    end
  end
  raise Fluent::ConfigError, "Either of `key_pattern` or `key1` must be specified" if (@key_pattern.nil? and @keys.empty?)

  raise Fluent::ConfigError, "Either of `tag`, `add_tag_prefix`, or `remove_tag_prefix` must be specified" if (@tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil?)
  @tag_prefix = "#{@add_tag_prefix}." if @add_tag_prefix
  @tag_prefix_match = "#{@remove_tag_prefix}." if @remove_tag_prefix
  @tag_proc =
    if @tag
      Proc.new {|tag| @tag }
    elsif @tag_prefix and @tag_prefix_match
      Proc.new {|tag| "#{@tag_prefix}#{lstrip(tag, @tag_prefix_match)}" }
    elsif @tag_prefix_match
      Proc.new {|tag| lstrip(tag, @tag_prefix_match) }
    elsif @tag_prefix
      Proc.new {|tag| "#{@tag_prefix}#{tag}" }
    else
      Proc.new {|tag| tag }
    end

  raise Fluent::ConfigError, "`max` must be greater than `min`" if (@min && @max && @min >= @max)

  @prev = {}
  @mutex = Mutex.new
rescue => e
  raise Fluent::ConfigError, "#{e.class} #{e.message} #{e.backtrace.first}"
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_derive.rb, line 76
def emit(tag, es, chain)
  emit_tag = @tag_proc.call(tag)

  if @key_pattern
    es.each do |time, record|
      record.each do |key, value|
        next unless key =~ @key_pattern
        value = value.to_i
        prev_time, prev_value = get_prev_value(tag, key)
        unless prev_time && prev_value
          save_to_prev(time, tag, key, value)
          record[key] = nil
          next
        end
        # adjustment
        rate = calc_rate(tag, key, value, prev_value, time, prev_time, @key_pattern_adjustment)
        rate = truncate_min(rate, @min) if @min
        rate = truncate_max(rate, @max) if @max
        # Set new value
        record[key] = rate
        save_to_prev(time, tag, key, value)
      end
      Fluent::Engine.emit(emit_tag, time, record)
    end
  else #keys
    es.each do |time, record|
      @keys.each do |key, adjustment|
        next unless  value = record[key]
        value = value.to_i
        prev_time, prev_value = get_prev_value(tag, key)
        unless prev_time && prev_value
          save_to_prev(time, tag, key, value)
          record[key] = nil
          next
        end
        # adjustment
        rate = calc_rate(tag, key, value, prev_value, time, prev_time, adjustment)
        rate = truncate_min(rate, @min) if @min
        rate = truncate_max(rate, @max) if @max
        # Set new value
        record[key] = rate
        save_to_prev(time, tag, key, value)
      end
      Fluent::Engine.emit(emit_tag, time, record)
    end
  end

  chain.next
rescue => e
  log.warn e.message
  log.warn e.backtrace.join(', ')
end
get_prev_value(tag, key) click to toggle source

@return [Array] time, value

# File lib/fluent/plugin/out_derive.rb, line 130
def get_prev_value(tag, key)
  @prev["#{tag}:#{key}"] || []
end
save_to_prev(time, tag, key, value) click to toggle source
# File lib/fluent/plugin/out_derive.rb, line 134
def save_to_prev(time, tag, key, value)
  @mutex.synchronize { @prev["#{tag}:#{key}"] = [time, value] }
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_derive.rb, line 72
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_derive.rb, line 68
def start
  super
end

Private Instance Methods

_calc_rate(cur_value, prev_value) click to toggle source
# File lib/fluent/plugin/out_derive.rb, line 174
def _calc_rate(cur_value, prev_value)
  rate = cur_value - prev_value
  if @counter_mode
    if rate < 0
      rate += 2 ** 32 - 1
    end
    if rate < 0
      rate += 2 ** 64 - 2 ** 32
    end
  end
  rate
end
calc_rate(tag, key, cur_value, prev_value, cur_time, prev_time, adjustment = nil) click to toggle source
# File lib/fluent/plugin/out_derive.rb, line 155
def calc_rate(tag, key, cur_value, prev_value, cur_time, prev_time, adjustment = nil)
  if cur_time - prev_time <= 0
    log.warn "Could not calculate the rate. multiple input less than one second or minus delta of seconds on tag=#{tag}, key=#{key}"
    return nil
  end
  if @time_unit_division
    rate = _calc_rate(cur_value , prev_value)/(cur_time - prev_time)
  else
    rate = _calc_rate(cur_value , prev_value)
  end
  if adjustment && adjustment[0] == '*'
    rate * adjustment[1]
  elsif adjustment && adjustment[0] == '/'
    rate / adjustment[1]
  else
    rate
  end
end
lstrip(string, substring) click to toggle source
# File lib/fluent/plugin/out_derive.rb, line 140
def lstrip(string, substring)
  string.index(substring) == 0 ? string[substring.size..-1] : string
end
parse_adjustment(str) click to toggle source
# File lib/fluent/plugin/out_derive.rb, line 144
def parse_adjustment(str)
  case str
  when /^\*(\d+)$/
    ['*', $1.to_i]
  when /^\/(\d+)$/
    ['/', $1.to_i]
  else
    nil
  end
end
truncate_max(value, max) click to toggle source
# File lib/fluent/plugin/out_derive.rb, line 192
def truncate_max(value, max)
  return nil unless value
  (value > max) ? max : value
end
truncate_min(value, min) click to toggle source
# File lib/fluent/plugin/out_derive.rb, line 187
def truncate_min(value, min)
  return nil unless value
  (value < min) ? min : value
end