class Fluent::UniqueCounterOutput

Constants

UNITS

Attributes

counts[RW]
last_checked[RW]
tick[RW]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_unique_counter.rb, line 22
def configure(conf)
  super

  if @unit and UNITS[@unit.to_sym].nil?
    raise Fluent::ConfigError, "'unit' must be one of minutes/hours/days"
  end
  @count_interval = UNITS[@unit.to_sym] if @unit

  @counts = []
  @mutex = Mutex.new
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_unique_counter.rb, line 69
def emit(tag, es, chain)
  c = []

  es.each do |time,record|
    value = record[@unique_key]
    next if value.nil?

    c << value.to_s.force_encoding('ASCII-8BIT')
  end
  @mutex.synchronize {
    @counts += c
  }

  chain.next
end
flush_emit() click to toggle source
# File lib/fluent/plugin/out_unique_counter.rb, line 45
def flush_emit
  flushed,@counts = @counts,[]
  message = {'unique_count' => flushed.uniq.count }
  Fluent::Engine.emit(@tag, Fluent::Engine.now, message)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_unique_counter.rb, line 39
def shutdown
  super
  @watcher.terminate
  @watcher.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_unique_counter.rb, line 34
def start
  super
  start_watch
end
start_watch() click to toggle source
# File lib/fluent/plugin/out_unique_counter.rb, line 51
def start_watch
  # for internal, or tests only
  @watcher = Thread.new(&method(:watch))
end
watch() click to toggle source
# File lib/fluent/plugin/out_unique_counter.rb, line 56
def watch
  # instance variable, and public accessable, for test
  @last_checked = Fluent::Engine.now
  while true
    sleep 0.5
    if (Fluent::Engine.now - @last_checked) >= @count_interval
      now = Fluent::Engine.now
      flush_emit
      @last_checked = now
    end
  end
end