class Fluent::Plugin::RedisStoreOutput
Constants
- DEFAULT_BUFFER_TYPE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_store.rb, line 39 def initialize super require 'redis' unless defined?(Redis) == 'constant' require 'msgpack' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_store.rb, line 45 def configure(conf) compat_parameters_convert(conf, :buffer) super if @key_path == nil and @key == nil raise Fluent::ConfigError, "either key_path or key is required" end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 70 def format(tag, time, record) [tag, time.to_f, record].to_msgpack end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 74 def formatted_to_msgpack_binary? true end
generate_ltrim_script(key, maxlen, order)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 196 def generate_ltrim_script(key, maxlen, order) script = "local key = '" + key.to_s + "'\n" script += "local maxlen = " + maxlen.to_s + "\n" script += "local order ='" + order.to_s + "'\n" script += "local len = tonumber(redis.call('LLEN', key))\n" script += "if len > maxlen then\n" script += " if order == 'asc' then\n" script += " local l = len - maxlen\n" script += " return redis.call('LTRIM', key, l, -1)\n" script += " else\n" script += " return redis.call('LTRIM', key, 0, maxlen - 1)\n" script += " end\n" script += "end\n" return script end
generate_zremrangebyrank_script(key, maxlen, order)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 178 def generate_zremrangebyrank_script(key, maxlen, order) script = "local key = '" + key.to_s + "'\n" script += "local maxlen = " + maxlen.to_s + "\n" script += "local order ='" + order.to_s + "'\n" script += "local len = tonumber(redis.call('ZCOUNT', key, '-inf', '+inf'))\n" script += "if len > maxlen then\n" script += " if order == 'asc' then\n" script += " local l = len - maxlen\n" script += " if l >= 0 then\n" script += " return redis.call('ZREMRANGEBYRANK', key, 0, l)\n" script += " end\n" script += " else\n" script += " return redis.call('ZREMRANGEBYRANK', key, maxlen, -1)\n" script += " end\n" script += "end\n" return script end
get_key_from(record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 224 def get_key_from(record) if @key k = @key else k = traverse(record, @key_path).to_s end key = @key_prefix + k + @key_suffix raise Fluent::ConfigError, "key is empty" if key == '' key end
get_score_from(record, time)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 248 def get_score_from(record, time) if @score_path traverse(record, @score_path) else time end end
get_value_from(record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 236 def get_value_from(record) value = traverse(record, @value_path) case @format_type when 'json' value.to_json when 'msgpack' value.to_msgpack else value end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 78 def multi_workers_ready? true end
operation_for_list(record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 148 def operation_for_list(record) key = get_key_from(record) value = get_value_from(record) if @order == 'asc' @redis.rpush key, value else @redis.lpush key, value end set_key_expire key if 0 < @value_length script = generate_ltrim_script(key, @value_length, @order) @redis.eval script end end
operation_for_publish(record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 172 def operation_for_publish(record) key = get_key_from(record) value = get_value_from(record) @redis.publish key, value end
operation_for_set(record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 141 def operation_for_set(record) key = get_key_from(record) value = get_value_from(record) @redis.sadd key, value set_key_expire key end
operation_for_string(record)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 164 def operation_for_string(record) key = get_key_from(record) value = get_value_from(record) @redis.set key, value set_key_expire key end
operation_for_zset(record, time)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 116 def operation_for_zset(record, time) key = get_key_from(record) value = get_value_from(record) score = get_score_from(record, time) if @collision_policy if @collision_policy == 'NX' @redis.zadd(key, score, value, :nx => true) elsif @collision_policy == 'XX' @redis.zadd(key, score, value, :xx => true) end else @redis.zadd(key, score, value) end set_key_expire key if 0 < @value_expire now = Time.now.to_i @redis.zremrangebyscore key , '-inf' , (now - @value_expire) end if 0 < @value_length script = generate_zremrangebyrank_script(key, @value_length, @order) @redis.eval script end end
set_key_expire(key)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 256 def set_key_expire(key) if 0 < @key_expire @redis.expire key, @key_expire end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_store.rb, line 65 def shutdown @redis.quit super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_redis_store.rb, line 54 def start super if @path @redis = Redis.new(:path => @path, :password => @password, :timeout => @timeout, :thread_safe => true, :db => @db) else @redis = Redis.new(:host => @host, :port => @port, :password => @password, :timeout => @timeout, :thread_safe => true, :db => @db) end end
traverse(data, key)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 212 def traverse(data, key) val = data key.split('.').each{ |k| if val.has_key?(k) val = val[k] else return nil end } return val end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_redis_store.rb, line 82 def write(chunk) @redis.pipelined { chunk.open { |io| begin MessagePack::Unpacker.new(io).each { |message| begin (_, time, record) = message case @store_type when 'zset' operation_for_zset(record, time) when 'set' operation_for_set(record) when 'list' operation_for_list(record) when 'string' operation_for_string(record) when 'publish' operation_for_publish(record) end rescue NoMethodError => e puts e rescue Encoding::UndefinedConversionError => e log.error "Plugin error: " + e.to_s log.error "Original record: " + record.to_s puts e end } rescue EOFError # EOFError always occured when reached end of chunk. end } } end