class Sidekiq::Merger::Redis
Constants
- KEY_PREFIX
Public Class Methods
lock_key(key)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 36 def lock_key(key) "#{KEY_PREFIX}:lock:#{key}" end
merges_key()
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 20 def merges_key "#{KEY_PREFIX}:merges" end
msg_key(key)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 28 def msg_key(key) "#{KEY_PREFIX}:msg:#{key}" end
purge!()
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 7 def purge! redis do |conn| conn.eval script = <<-SCRIPT for i=1, #ARGV do redis.call('del', unpack(redis.call('keys', ARGV[i]))) end return true SCRIPT conn.eval(script, [], [merges_key, unique_msg_key("*"), msg_key("*"), lock_key("*")]) end end
redis(&block)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 40 def redis(&block) Sidekiq.redis(&block) end
time_key(key)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 32 def time_key(key) "#{KEY_PREFIX}:time:#{key}" end
unique_msg_key(key)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 24 def unique_msg_key(key) "#{KEY_PREFIX}:unique_msg:#{key}" end
Public Instance Methods
all_merges()
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 83 def all_merges redis { |conn| conn.smembers(merges_key) } end
delete_merge(key)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 111 def delete_merge(key) redis do |conn| conn.multi do conn.del(unique_msg_key(key)) conn.del(msg_key(key)) conn.del(time_key(key)) conn.del(lock_key(key)) conn.srem(merges_key, key) end end end
delete_message(key, msg)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 57 def delete_message(key, msg) msg_json = msg.to_json redis do |conn| conn.multi do conn.srem(unique_msg_key(key), msg_json) conn.lrem(msg_key(key), 0, msg_json) end end end
get_merge(key)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 91 def get_merge(key) msgs = [] redis { |conn| msgs = conn.lrange(msg_key(key), 0, -1) } msgs.map { |msg| JSON.parse(msg) } end
lock_merge(key, ttl)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 87 def lock_merge(key, ttl) redis { |conn| conn.set(lock_key(key), true, nx: true, ex: ttl) } end
merge_execution_time(key)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 67 def merge_execution_time(key) redis do |conn| t = conn.get(time_key(key)) Time.at(t.to_i) unless t.nil? end end
merge_exists?(key, msg)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 78 def merge_exists?(key, msg) msg_json = msg.to_json redis { |conn| conn.sismember(unique_msg_key(key), msg_json) } end
merge_size(key)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 74 def merge_size(key) redis { |conn| conn.llen(msg_key(key)) } end
pluck_merge(key)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 97 def pluck_merge(key) msgs = [] redis do |conn| conn.multi do msgs = conn.lrange(msg_key(key), 0, -1) conn.del(unique_msg_key(key)) conn.del(msg_key(key)) conn.del(time_key(key)) conn.srem(merges_key, key) end end extract_future_value(msgs).map { |msg| JSON.parse(msg) } end
push_message(key, msg, execution_time)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 45 def push_message(key, msg, execution_time) msg_json = msg.to_json redis do |conn| conn.multi do conn.sadd(merges_key, key) conn.setnx(time_key(key), execution_time.to_i) conn.lpush(msg_key(key), msg_json) conn.sadd(unique_msg_key(key), msg_json) end end end
Private Instance Methods
extract_future_value(future)
click to toggle source
# File lib/sidekiq/merger/redis.rb, line 127 def extract_future_value(future) while future.value.is_a?(Redis::FutureNotReady) sleep(0.001) end future.value end