class Fluent::RedisMultiTypeCounterOutput

Attributes

db_number[R]
host[R]
password[R]
patterns[R]
port[R]
redis[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_multi_type_counter.rb, line 8
def initialize
  super
  require 'redis'
  require 'msgpack'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_multi_type_counter.rb, line 14
def configure(conf)
  super
  @host = conf.has_key?('host') ? conf['host'] : 'localhost'
  @port = conf.has_key?('port') ? conf['port'].to_i : 6379
  @password = conf.has_key?('password') ? conf['password'] : nil
  @db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil
  @patterns = []
  conf.elements.select { |element|
    element.name == 'pattern'
  }.each { |element|
    begin
      @patterns << Pattern.new(element)
    rescue RedisMultiTypeCounterException => e
      raise Fluent::ConfigError, e.message
    end
  }
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_redis_multi_type_counter.rb, line 45
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_redis_multi_type_counter.rb, line 41
def shutdown
  @redis.quit
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_multi_type_counter.rb, line 32
def start
  super
  @redis = Redis.new(
    :host => @host, :port => @port,
    :password => @password,
    :thread_safe => true, :db => @db_number
  )
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_redis_multi_type_counter.rb, line 49
def write(chunk)
  table = {}
  table.default = 0
  chunk.open { |io|
    begin
      MessagePack::Unpacker.new(io).each { |message|
        (tag, time, record) = message
        @patterns.select { |pattern|
          pattern.is_match?(record)
        }.each{ |pattern|
          count_key = pattern.get_count_key(time, record)
          count_hash_key = pattern.get_count_hash_key(record)
          count_zset_key = pattern.get_count_zset_key(record)
          store_list = pattern.store_list

          key = RecordKey.new(count_key, count_hash_key, count_zset_key, store_list)
                      if store_list
                        if table[key] == 0
                              table[key] = []
                            end

                        table[key] << pattern.get_count_value(record)
                      else
            table[key] += pattern.get_count_value(record)
                  end
        }
      }
    rescue EOFError
      # EOFError always occured when reached end of chunk.
    end
  }

  table.each_pair.select { |key, value|
    value != 0
  }.each_slice(@max_pipelining) { |items|
    @redis.pipelined do
      items.each do |key, value|
        if key.count_hash_key != nil
          @redis.hincrby(key.count_key, key.count_hash_key, value)
        elsif key.count_zset_key != nil
          @redis.zincrby(key.count_key, value, key.count_zset_key)
        else
                      if key.store_list
                        @redis.rpush(key.count_key, value)
          else
            @redis.incrby(key.count_key, value)
                  end
        end
      end
    end
  }
end