class Fluent::Plugin::NotifierOutput

Constants

NOTIFICATION_LEVELS
STATES_CLEAN_INTERVAL
STATES_EXPIRE_SECONDS

Attributes

defs[RW]
match_cache[RW]
negative_cache[RW]
states[RW]
tests[RW]

Public Instance Methods

check(tag, es) click to toggle source
# File lib/fluent/plugin/out_notifier.rb, line 198
def check(tag, es)
  notifications = []

  tag = if @input_tag_remove_prefix and
            tag.start_with?(@input_tag_remove_prefix_string) and tag.length > @input_tag_remove_prefix_length
          tag[@input_tag_remove_prefix_length..-1]
        else
          tag
        end

  es.each do |time,record|
    record.keys.each do |key|
      next if @negative_cache[key]

      defs = @match_cache[key]
      unless defs
        defs = []
        @defs.each do |d|
          defs.push(d) if d.match?(key)
        end
        @negative_cache[key] = true if defs.size < 1
      end

      defs.each do |d|
        next unless @tests.reduce(true){|r,t| r and t.test(tag, record)}

        alert = d.check(tag, time, record, key)
        if alert
          notifications.push(alert)
        end
      end
    end
  end

  notifications
end
configure(conf) click to toggle source

<match httpstatus.blog>

type notifier
default_tag notification
default_interval_1st 1m
default_repetitions_1st 5
default_interval_2nd 5m
default_repetitions_2nd 5
default_interval_3rd 30m
<test>
  check numeric
  target_key xxx
  lower_threshold xxx
  upper_threshold xxx
</test>
<test>
  check regexp
  target_key xxx
  include_pattern ^.$
  exclude_pattern ^.$
</test>
<def>
  pattern http_status_errors
  check numeric_upward
  warn_threshold 25
  crit_threshold 50
  tag alert

# tag_warn alert.warn # tag_crit alert.crit

  # target_keys blog_5xx_percentage
  target_key_pattern ^.*_5xx_percentage$
</def>
<def>
  pattern log_checker
  check string_find
  crit_pattern 'ERROR'
  warn_pattern 'WARNING'
  tag alert
  # target_keys message
  target_key_pattern ^.*_message$
</def>

</match>

Calls superclass method
# File lib/fluent/plugin/out_notifier.rb, line 123
def configure(conf)
  super

  @match_cache = {} # cache which has map (fieldname => definition(s))
  @negative_cache = {}
  @tests = []
  @defs = []
  @states = {} # key: tag+field ?

  if @input_tag_remove_prefix
    @input_tag_remove_prefix_string = @input_tag_remove_prefix + '.'
    @input_tag_remove_prefix_length = @input_tag_remove_prefix_string.length
  end

  if @default_interval_1st || @default_interval_2nd || @default_interval_3rd
    @default_intervals = [
      @default_interval_1st || @default_intervals[0],
      @default_interval_2nd || @default_intervals[1],
      @default_interval_3rd || @default_intervals[2],
    ]
  end
  if @default_repetitions_1st || @default_repetitions_2nd
    @default_repetitions = [
      @default_repetitions_1st || @default_repetitions[0],
      @default_repetitions_2nd || @default_repetitions[1],
    ]
  end

  @test_configs.each do |test_config|
    @tests << Test.new(test_config)
  end
  @def_configs.each do |def_config|
    @defs << Definition.new(def_config, self)
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_notifier.rb, line 159
def multi_workers_ready?
  true
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_notifier.rb, line 235
def process(tag, es)
  notifications = check(tag, es)

  if notifications.size > 0
    @mutex.synchronize do
      suppressed_emit(notifications)
    end
  end

  if Fluent::Engine.now - @last_status_cleaned > STATES_CLEAN_INTERVAL
    @mutex.synchronize do
      states_cleanup
      @last_status_cleaned = Fluent::Engine.now
    end
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_notifier.rb, line 163
def start
  super
  @mutex = Mutex.new
  @last_status_cleaned = Fluent::Engine.now
end
states_cleanup() click to toggle source
# File lib/fluent/plugin/out_notifier.rb, line 189
def states_cleanup
  now = Fluent::Engine.now
  @states.keys.each do |key|
    if now - @states[key].last_notified > STATES_EXPIRE_SECONDS
      @states.delete(key)
    end
  end
end
suppressed_emit(notifications) click to toggle source
# File lib/fluent/plugin/out_notifier.rb, line 169
def suppressed_emit(notifications)
  now = Fluent::Engine.now
  notifications.each do |n|
    hashkey = n.delete(:hashkey)
    definition = n.delete(:match_def)
    tag = n.delete(:emit_tag)

    state = @states[hashkey]
    if state
      unless state.suppress?(definition, n)
        router.emit(tag, now, n)
        state.update_notified(definition, n)
      end
    else
      router.emit(tag, now, n)
      @states[hashkey] = State.new(n)
    end
  end
end