class Rhcf::Timeseries::RedisMgetLuaStrategy
Public Instance Methods
crunch_values(manager, subject, resolution_id, point, filter, limit = 1000)
click to toggle source
# File lib/rhcf/timeseries/redis_strategies.rb, line 90 def crunch_values(manager, subject, resolution_id, point, filter, limit = 1000) register_lua_scripts!(manager.connection_to_use) point_prefix = [manager.prefix, EVENT_POINT_TOKEN, subject, resolution_id, point].join(NAMESPACE_SEPARATOR) set_key = [manager.prefix, EVENT_SET_TOKEN, resolution_id, point, subject].join(NAMESPACE_SEPARATOR) data = manager.connection_to_use.evalsha(evalsha_for(:mget_matching_smembers), keys: [set_key], argv: [point_prefix, filter && filter.to_lua_pattern, limit]) return {} if data.nil? result = {} begin data.first.each_with_index do |evt, idx| value = data.last[idx].to_i result[evt] = value end rescue p $!, $!.message raise end result end
evalsha_for(sym_os_lua_script)
click to toggle source
# File lib/rhcf/timeseries/redis_strategies.rb, line 113 def evalsha_for(sym_os_lua_script) @lua_script_register[sym_os_lua_script] || fail("Script for '#{sym_os_lua_script}' not registered") end
events_for_subject_on(manager, subject, point, resolution_id, filter)
click to toggle source
# File lib/rhcf/timeseries/redis_strategies.rb, line 79 def events_for_subject_on(manager, subject, point, resolution_id, filter) key = [manager.prefix, EVENT_SET_TOKEN, resolution_id, point, subject].join(NAMESPACE_SEPARATOR) events = if filter manager.connection_to_use.evalsha(evalsha_for(:smembers_matching), keys: [key], argv: [filter.to_lua_pattern]) else manager.connection_to_use.smembers(key) end events end
id()
click to toggle source
# File lib/rhcf/timeseries/redis_strategies.rb, line 77 def id; 'ME'; end
register_lua_scripts!(connection)
click to toggle source
# File lib/rhcf/timeseries/redis_strategies.rb, line 117 def register_lua_scripts!(connection) @lua_script_register ||= begin smembers_matching = <<-EOF local matches = {} for _, val in ipairs(redis.call('smembers', KEYS[1])) do if string.match(val, ARGV[1]) then table.insert(matches, val) end end return matches EOF mget_matching_smembers = <<-EOF local set_key = KEYS[1] local key_prefix = ARGV[1] local filter_pattern = ARGV[2] local limit = tonumber(ARGV[3]) local keys = {} local keys_to_mget = {} local function log(msg) -- redis.call('publish', 'log', msg) end local function mget_in_batches(keys_to_mget) local step = 1024 local results = {} local last_end = 0 local partial = {} local function mget_batch(ini , fin) log("Getting from " .. ini .. ' to ' .. fin .. ' on a total of ' .. #keys_to_mget) partial = redis.call('MGET', unpack(keys_to_mget, ini, fin)) for _, value in pairs(partial) do table.insert(results, value) end end for ending = step, #keys_to_mget, step do mget_batch(last_end + 1, ending) last_end = ending end if last_end < #keys_to_mget then mget_batch(last_end + 1, #keys_to_mget) end return results; end local function sort_and_limit_tuples(subjects, values) local dictionary = {} for i, subject in pairs(subjects) do local value = values[i] or 0 -- redis.call('publish', 'log', subject .. ' += ' .. value) dictionary[subject] = (dictionary[subject] or 0) + value end local tuples = {} for subject, value in pairs(dictionary) do -- redis.call('publish', 'log', subject .. ' = ' .. value) table.insert(tuples, { subject, value } ) end table.sort(tuples, function(a, b) return b[2] < a[2] end ) local new_subjects = {} local new_counts = {} for i, tuple in pairs(tuples) do if #new_subjects >= limit then break end local subject = tuple[1] local value = tuple[2] table.insert(new_subjects, subject) table.insert(new_counts, value) end return {new_subjects, new_counts} end for _, val in ipairs(redis.call('smembers', set_key)) do if (filter_pattern and string.match(val, filter_pattern)) or not filter_pattern then table.insert(keys, val) table.insert(keys_to_mget, key_prefix .. '#{NAMESPACE_SEPARATOR}' .. val) end end if table.getn(keys) > 0 then local values = mget_in_batches(keys_to_mget) local sorted = sort_and_limit_tuples(keys, values) log ("Values card " .. #values .. " | keys card: " .. #keys) return sorted else return {{},{}} end EOF { mget_matching_smembers: connection.script(:load, mget_matching_smembers), smembers_matching: connection.script(:load, smembers_matching) } end end