class Fluent::RedisMultiTypeCounterOutput
Attributes
db_number[R]
host[R]
password[R]
patterns[R]
port[R]
redis[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_multi_type_counter.rb, line 8 def initialize super require 'redis' require 'msgpack' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_multi_type_counter.rb, line 14 def configure(conf) super @host = conf.has_key?('host') ? conf['host'] : 'localhost' @port = conf.has_key?('port') ? conf['port'].to_i : 6379 @password = conf.has_key?('password') ? conf['password'] : nil @db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil @patterns = [] conf.elements.select { |element| element.name == 'pattern' }.each { |element| begin @patterns << Pattern.new(element) rescue RedisMultiTypeCounterException => e raise Fluent::ConfigError, e.message end } end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_redis_multi_type_counter.rb, line 45 def format(tag, time, record) [tag, time, record].to_msgpack end
shutdown()
click to toggle source
# File lib/fluent/plugin/out_redis_multi_type_counter.rb, line 41 def shutdown @redis.quit end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_multi_type_counter.rb, line 32 def start super @redis = Redis.new( :host => @host, :port => @port, :password => @password, :thread_safe => true, :db => @db_number ) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_redis_multi_type_counter.rb, line 49 def write(chunk) table = {} table.default = 0 chunk.open { |io| begin MessagePack::Unpacker.new(io).each { |message| (tag, time, record) = message @patterns.select { |pattern| pattern.is_match?(record) }.each{ |pattern| count_key = pattern.get_count_key(time, record) count_hash_key = pattern.get_count_hash_key(record) count_zset_key = pattern.get_count_zset_key(record) store_list = pattern.store_list key = RecordKey.new(count_key, count_hash_key, count_zset_key, store_list) if store_list if table[key] == 0 table[key] = [] end table[key] << pattern.get_count_value(record) else table[key] += pattern.get_count_value(record) end } } rescue EOFError # EOFError always occured when reached end of chunk. end } table.each_pair.select { |key, value| value != 0 }.each_slice(@max_pipelining) { |items| @redis.pipelined do items.each do |key, value| if key.count_hash_key != nil @redis.hincrby(key.count_key, key.count_hash_key, value) elsif key.count_zset_key != nil @redis.zincrby(key.count_key, value, key.count_zset_key) else if key.store_list @redis.rpush(key.count_key, value) else @redis.incrby(key.count_key, value) end end end end } end