class Sidekiq::Merger::Redis

Constants

KEY_PREFIX

Public Class Methods

lock_key(key) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 36
def lock_key(key)
  "#{KEY_PREFIX}:lock:#{key}"
end
merges_key() click to toggle source
# File lib/sidekiq/merger/redis.rb, line 20
def merges_key
  "#{KEY_PREFIX}:merges"
end
msg_key(key) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 28
def msg_key(key)
  "#{KEY_PREFIX}:msg:#{key}"
end
purge!() click to toggle source
# File lib/sidekiq/merger/redis.rb, line 7
    def purge!
      redis do |conn|
        conn.eval
        script = <<-SCRIPT
          for i=1, #ARGV do
            redis.call('del', unpack(redis.call('keys', ARGV[i])))
          end
          return true
        SCRIPT
        conn.eval(script, [], [merges_key, unique_msg_key("*"), msg_key("*"), lock_key("*")])
      end
    end
redis(&block) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 40
def redis(&block)
  Sidekiq.redis(&block)
end
time_key(key) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 32
def time_key(key)
  "#{KEY_PREFIX}:time:#{key}"
end
unique_msg_key(key) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 24
def unique_msg_key(key)
  "#{KEY_PREFIX}:unique_msg:#{key}"
end

Public Instance Methods

all_merges() click to toggle source
# File lib/sidekiq/merger/redis.rb, line 83
def all_merges
  redis { |conn| conn.smembers(merges_key) }
end
delete_merge(key) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 111
def delete_merge(key)
  redis do |conn|
    conn.multi do
      conn.del(unique_msg_key(key))
      conn.del(msg_key(key))
      conn.del(time_key(key))
      conn.del(lock_key(key))
      conn.srem(merges_key, key)
    end
  end
end
delete_message(key, msg) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 57
def delete_message(key, msg)
  msg_json = msg.to_json
  redis do |conn|
    conn.multi do
      conn.srem(unique_msg_key(key), msg_json)
      conn.lrem(msg_key(key), 0, msg_json)
    end
  end
end
get_merge(key) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 91
def get_merge(key)
  msgs = []
  redis { |conn| msgs = conn.lrange(msg_key(key), 0, -1) }
  msgs.map { |msg| JSON.parse(msg) }
end
lock_merge(key, ttl) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 87
def lock_merge(key, ttl)
  redis { |conn| conn.set(lock_key(key), true, nx: true, ex: ttl) }
end
merge_execution_time(key) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 67
def merge_execution_time(key)
  redis do |conn|
    t = conn.get(time_key(key))
    Time.at(t.to_i) unless t.nil?
  end
end
merge_exists?(key, msg) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 78
def merge_exists?(key, msg)
  msg_json = msg.to_json
  redis { |conn| conn.sismember(unique_msg_key(key), msg_json) }
end
merge_size(key) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 74
def merge_size(key)
  redis { |conn| conn.llen(msg_key(key)) }
end
pluck_merge(key) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 97
def pluck_merge(key)
  msgs = []
  redis do |conn|
    conn.multi do
      msgs = conn.lrange(msg_key(key), 0, -1)
      conn.del(unique_msg_key(key))
      conn.del(msg_key(key))
      conn.del(time_key(key))
      conn.srem(merges_key, key)
    end
  end
  extract_future_value(msgs).map { |msg| JSON.parse(msg) }
end
push_message(key, msg, execution_time) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 45
def push_message(key, msg, execution_time)
  msg_json = msg.to_json
  redis do |conn|
    conn.multi do
      conn.sadd(merges_key, key)
      conn.setnx(time_key(key), execution_time.to_i)
      conn.lpush(msg_key(key), msg_json)
      conn.sadd(unique_msg_key(key), msg_json)
    end
  end
end

Private Instance Methods

extract_future_value(future) click to toggle source
# File lib/sidekiq/merger/redis.rb, line 127
def extract_future_value(future)
  while future.value.is_a?(Redis::FutureNotReady)
    sleep(0.001)
  end
  future.value
end