class Resque::Plugins::Loner::Helpers

Public Class Methods

cleanup_loners(queue) click to toggle source
# File lib/resque-loner/helpers.rb, line 79
def self.cleanup_loners(queue)
  keys = redis.keys("loners:queue:#{queue}:job:*")
  redis.del(*keys) unless keys.empty?
end
item_is_a_unique_job?(item) click to toggle source
# File lib/resque-loner/helpers.rb, line 38
def self.item_is_a_unique_job?(item)
  begin
    klass = constantize(item[:class] || item['class'])
    klass.included_modules.include?(::Resque::Plugins::UniqueJob)
  rescue
    false # Resque testsuite also submits strings as job classes while Resque.enqueue'ing,
  end     # so resque-loner should not start throwing up when that happens.
end
item_ttl(item) click to toggle source
# File lib/resque-loner/helpers.rb, line 47
def self.item_ttl(item)
  begin
    constantize(item[:class] || item['class']).loner_ttl
  rescue
    -1
  end
end
job_destroy(queue, klass, *args) click to toggle source
# File lib/resque-loner/helpers.rb, line 63
def self.job_destroy(queue, klass, *args)
  klass = klass.to_s
  redis_queue = "queue:#{queue}"

  redis.lrange(redis_queue, 0, -1).each do |string|
    json   = decode(string)

    match  = json['class'] == klass
    match &= json['args'] == args unless args.empty?

    if match
      Resque::Plugins::Loner::Helpers.mark_loner_as_unqueued(queue, json)
    end
  end
end
loner_lock_after_execution_period(item) click to toggle source
# File lib/resque-loner/helpers.rb, line 55
def self.loner_lock_after_execution_period(item)
  begin
    constantize(item[:class] || item['class']).loner_lock_after_execution_period
  rescue
    0
  end
end
loner_queued?(queue, item) click to toggle source
# File lib/resque-loner/helpers.rb, line 9
def self.loner_queued?(queue, item)
  return false unless item_is_a_unique_job?(item)
  redis.get(unique_job_queue_key(queue, item)) == '1'
end
mark_loner_as_queued(queue, item) click to toggle source
# File lib/resque-loner/helpers.rb, line 14
def self.mark_loner_as_queued(queue, item)
  return unless item_is_a_unique_job?(item)
  key = unique_job_queue_key(queue, item)
  redis.set(key, 1)
  unless (ttl = item_ttl(item)) == -1 # no need to incur overhead for default value
    redis.expire(key, ttl)
  end
end
mark_loner_as_unqueued(queue, job) click to toggle source
# File lib/resque-loner/helpers.rb, line 23
def self.mark_loner_as_unqueued(queue, job)
  item = job.is_a?(Resque::Job) ? job.payload : job
  return unless item_is_a_unique_job?(item)
  unless (ttl = loner_lock_after_execution_period(item)) == 0
    redis.expire(unique_job_queue_key(queue, item), ttl)
  else
    redis.del(unique_job_queue_key(queue, item))
  end
end
unique_job_queue_key(queue, item) click to toggle source
# File lib/resque-loner/helpers.rb, line 33
def self.unique_job_queue_key(queue, item)
  job_key = constantize(item[:class] || item['class']).redis_key(item)
  "loners:queue:#{queue}:job:#{job_key}"
end