class Fluent::RedisStoreOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_store.rb, line 27 def initialize super require 'oj' require 'redis' require 'msgpack' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_store.rb, line 34 def configure(conf) super if @key_path == nil and @key == nil raise Fluent::ConfigError, "either key_path or key is required" end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 57 def format(tag, time, record) [tag, time, record].to_msgpack end
generate_ltrim_script(key, maxlen, order)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 163 def generate_ltrim_script(key, maxlen, order) script = "local key = '" + key.to_s + "'\n" script += "local maxlen = " + maxlen.to_s + "\n" script += "local order ='" + order.to_s + "'\n" script += "local len = tonumber(redis.call('LLEN', key))\n" script += "if len > maxlen then\n" script += " if order == 'asc' then\n" script += " local l = len - maxlen\n" script += " return redis.call('LTRIM', key, l, -1)\n" script += " else\n" script += " return redis.call('LTRIM', key, 0, maxlen - 1)\n" script += " end\n" script += "end\n" return script end
generate_zremrangebyrank_script(key, maxlen, order)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 145 def generate_zremrangebyrank_script(key, maxlen, order) script = "local key = '" + key.to_s + "'\n" script += "local maxlen = " + maxlen.to_s + "\n" script += "local order ='" + order.to_s + "'\n" script += "local len = tonumber(redis.call('ZCOUNT', key, '-inf', '+inf'))\n" script += "if len > maxlen then\n" script += " if order == 'asc' then\n" script += " local l = len - maxlen\n" script += " if l >= 0 then\n" script += " return redis.call('ZREMRANGEBYRANK', key, 0, l)\n" script += " end\n" script += " else\n" script += " return redis.call('ZREMRANGEBYRANK', key, maxlen, -1)\n" script += " end\n" script += "end\n" return script end
get_key_from(record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 191 def get_key_from(record) if @key k = @key else k = traverse(record, @key_path).to_s end key = @key_prefix + k + @key_suffix raise Fluent::ConfigError, "key is empty" if key == '' key end
get_score_from(record, time)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 216 def get_score_from(record, time) if @score_path traverse(record, @score_path) else time end end
get_value_from(record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 203 def get_value_from(record) value = traverse(record, @value_path) case @format_type when 'json' #value.to_json value = Oj.dump(value) when 'msgpack' value.to_msgpack else value end end
operation_for_list(record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 115 def operation_for_list(record) key = get_key_from(record) value = get_value_from(record) if @order == 'asc' @redis.rpush key, value else @redis.lpush key, value end set_key_expire key if 0 < @value_length script = generate_ltrim_script(key, @value_length, @order) @redis.eval script end end
operation_for_publish(record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 139 def operation_for_publish(record) key = get_key_from(record) value = get_value_from(record) @redis.publish key, value end
operation_for_set(record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 108 def operation_for_set(record) key = get_key_from(record) value = get_value_from(record) @redis.sadd key, value set_key_expire key end
operation_for_string(record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 131 def operation_for_string(record) key = get_key_from(record) value = get_value_from(record) @redis.set key, value set_key_expire key end
operation_for_zset(record, time)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 91 def operation_for_zset(record, time) key = get_key_from(record) value = get_value_from(record) score = get_score_from(record, time) @redis.zadd key, score, value set_key_expire key if 0 < @value_expire now = Time.now.to_i @redis.zremrangebyscore key , '-inf' , (now - @value_expire) end if 0 < @value_length script = generate_zremrangebyrank_script(key, @value_length, @order) @redis.eval script end end
set_key_expire(key)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 224 def set_key_expire(key) if 0 < @key_expire @redis.expire key, @key_expire end end
shutdown()
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 53 def shutdown @redis.quit end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_store.rb, line 42 def start super if @path @redis = Redis.new(:path => @path, :password => @password, :timeout => @timeout, :thread_safe => true, :db => @db) else @redis = Redis.new(:host => @host, :port => @port, :password => @password, :timeout => @timeout, :thread_safe => true, :db => @db) end end
traverse(data, key)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 179 def traverse(data, key) val = data key.split('.').each{ |k| if val.has_key?(k) val = val[k] else return nil end } return val end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 61 def write(chunk) @redis.pipelined { chunk.open { |io| begin MessagePack::Unpacker.new(io).each { |message| begin (tag, time, record) = message case @store_type when 'zset' operation_for_zset(record, time) when 'set' operation_for_set(record) when 'list' operation_for_list(record) when 'string' operation_for_string(record) when 'publish' operation_for_publish(record) end rescue NoMethodError => e puts e end } rescue EOFError # EOFError always occured when reached end of chunk. end } } end