class Tennis::Backend::Redis

Public Class Methods

new(logger:, url:, namespace: "tennis") click to toggle source
Calls superclass method
# File lib/tennis/backend/redis.rb, line 15
def initialize(logger:, url:, namespace: "tennis")
  super(logger: logger)
  @redis_url = url
  @redis_namespace = namespace
end

Public Instance Methods

ack(_) click to toggle source
# File lib/tennis/backend/redis.rb, line 36
def ack(_)
  # Nothing to do here
end
enqueue(job:, method:, args:, delay: nil) click to toggle source

Delayed jobs are not yet supported with Redis backend

# File lib/tennis/backend/redis.rb, line 22
def enqueue(job:, method:, args:, delay: nil)
  meta = { "enqueued_at" => Time.now.to_i }
  task = Task.new(self, generate_task_id, job, method, args, meta)
  client_push(task)
end
receive(job_classes:, timeout: 1) click to toggle source
# File lib/tennis/backend/redis.rb, line 28
def receive(job_classes:, timeout: 1)
  unordered_queues = queues(job_classes).shuffle
  serialized_task = timeout && timeout >= 1 ?
    client_pop_with_timeout(unordered_queues, timeout) :
    client_pop(unordered_queues)
  serialized_task && deserialize_task(serialized_task)
end
requeue(task) click to toggle source
# File lib/tennis/backend/redis.rb, line 40
def requeue(task)
  (task.meta["requeued_at"] ||= []) << Time.now.to_i
  client_push(task)
end

Private Instance Methods

client() click to toggle source
# File lib/tennis/backend/redis.rb, line 47
def client
  @client ||= begin
    redis = ::Redis.new(url: @redis_url)
    ::Redis::Namespace.new(@redis_namespace, redis: redis)
  end
end
client_pop(unordered_queues) click to toggle source
# File lib/tennis/backend/redis.rb, line 65
      def client_pop(unordered_queues)
        script_lines = ["local element = nil"]
        unordered_queues.each do |queue_name|
          script_lines << <<-LUA
            element = redis.call("RPOP", "#{@redis_namespace}:#{queue_name}")
            if element ~= nil then
              return element
            end
          LUA
        end
        client.eval(script_lines.join("\n"))
      end
client_pop_with_timeout(unordered_queues, timeout) click to toggle source
# File lib/tennis/backend/redis.rb, line 59
def client_pop_with_timeout(unordered_queues, timeout)
  unordered_queues << timeout.to_i
  _, serialized_task = client.brpop(*unordered_queues)
  serialized_task
end
client_push(task) click to toggle source
# File lib/tennis/backend/redis.rb, line 54
def client_push(task)
  serialized_task = serialize(task)
  client.lpush(queue_name(task.job.class), serialized_task)
end
deserialize_task(serialized_task) click to toggle source
# File lib/tennis/backend/redis.rb, line 88
def deserialize_task(serialized_task)
  hash = Serializer.new.load(serialized_task)
  Task.new(self, hash["id"], hash["job"], hash["method"], hash["args"], hash["meta"])
end
generate_task_id() click to toggle source
# File lib/tennis/backend/redis.rb, line 93
def generate_task_id
  SecureRandom.hex(10)
end
queue_name(klass) click to toggle source
# File lib/tennis/backend/redis.rb, line 102
def queue_name(klass)
  @queue_names ||= {}
  @queue_names[klass] ||= "queue:#{klass.name.gsub("::", "-").downcase}"
end
queues(job_classes) click to toggle source
# File lib/tennis/backend/redis.rb, line 97
def queues(job_classes)
  @queues ||= {}
  @queues[job_classes] ||= job_classes.map { |klass| queue_name(klass) }
end
serialize(task) click to toggle source
# File lib/tennis/backend/redis.rb, line 78
def serialize(task)
  Serializer.new.dump({
    "id"     => task.task_id,
    "job"    => task.job,
    "method" => task.method,
    "args"   => task.args,
    "meta"   => task.meta,
  })
end