class SidekiqUniqueJobs::Orphans::RubyReaper

Class DeleteOrphans provides deletion of orphaned digests

@note this is a much slower version of the lua script but does not crash redis

@author Mikael Henriksson <mikael@mhenrixon.com>

Constants

RUN_SUFFIX

@return [String] the suffix for :RUN locks

Attributes

digests[R]

@!attribute [r] digests

@return [SidekiqUniqueJobs::Digests] digest collection
retried[R]

@!attribute [r] retried

@return [Redis::SortedSet] the Sidekiq RetrySet
scheduled[R]

@!attribute [r] scheduled

@return [Redis::SortedSet] the Sidekiq ScheduleSet

Public Class Methods

new(conn) click to toggle source

Initialize a new instance of DeleteOrphans

@param [Redis] conn a connection to redis

Calls superclass method SidekiqUniqueJobs::Orphans::Reaper::new
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 34
def initialize(conn)
  super(conn)
  @digests   = SidekiqUniqueJobs::Digests.new
  @scheduled = Redis::SortedSet.new(SCHEDULE)
  @retried   = Redis::SortedSet.new(RETRY)
end

Public Instance Methods

active?(digest) click to toggle source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 123
def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
  Sidekiq.redis do |conn|
    procs = conn.sscan_each("processes").to_a
    return false if procs.empty?

    procs.sort.each do |key|
      valid, workers = conn.pipelined do
        # TODO: Remove the if statement in the future
        if conn.respond_to?(:exists?)
          conn.exists?(key)
        else
          conn.exists(key)
        end
        conn.hgetall("#{key}:workers")
      end

      next unless valid
      next unless workers.any?

      workers.each_pair do |_tid, job|
        next unless (item = safe_load_json(job))

        payload = safe_load_json(item[PAYLOAD])

        return true if match?(digest, payload[LOCK_DIGEST])
        return true if considered_active?(payload[CREATED_AT])
      end
    end

    false
  end
end
belongs_to_job?(digest) click to toggle source

Checks if the digest has a matching job.

1. It checks the scheduled set
2. It checks the retry set
3. It goes through all queues

@param [String] digest the digest to search for

@return [true] when either of the checks return true @return [false] when no job was found for this digest

# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 78
def belongs_to_job?(digest)
  scheduled?(digest) || retried?(digest) || enqueued?(digest) || active?(digest)
end
call() click to toggle source

Delete orphaned digests

@return [Integer] the number of reaped locks

# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 47
def call
  BatchDelete.call(orphans, conn)
end
considered_active?(time_f) click to toggle source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 162
def considered_active?(time_f)
  (Time.now - reaper_timeout).to_f < time_f
end
enqueued?(digest) click to toggle source

Checks if the digest exists in a Sidekiq::Queue

@param [String] digest the current digest

@return [true] when digest exists in any queue

# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 111
def enqueued?(digest)
  Sidekiq.redis do |conn|
    queues(conn) do |queue|
      entries(conn, queue) do |entry|
        return true if entry.include?(digest)
      end
    end

    false
  end
end
entries(conn, queue, &block) click to toggle source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 179
def entries(conn, queue, &block) # rubocop:disable Metrics/MethodLength
  queue_key    = "queue:#{queue}"
  initial_size = conn.llen(queue_key)
  deleted_size = 0
  page         = 0
  page_size    = 50

  loop do
    range_start = (page * page_size) - deleted_size
    range_end   = range_start + page_size - 1
    entries     = conn.lrange(queue_key, range_start, range_end)
    page       += 1

    break if entries.empty?

    entries.each(&block)

    deleted_size = initial_size - conn.llen(queue_key)
  end
end
in_sorted_set?(key, digest) click to toggle source

Checks a sorted set for the existance of this digest

@param [String] key the key for the sorted set @param [String] digest the digest to scan for

@return [true] when found @return [false] when missing

# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 210
def in_sorted_set?(key, digest)
  conn.zscan_each(key, match: "*#{digest}*", count: 1).to_a.any?
end
match?(key_one, key_two) click to toggle source
# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 156
def match?(key_one, key_two)
  return false if key_one.nil? || key_two.nil?

  key_one.delete_suffix(RUN_SUFFIX) == key_two.delete_suffix(RUN_SUFFIX)
end
orphans() click to toggle source

Find orphaned digests

@return [Array<String>] an array of orphaned digests

# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 57
def orphans
  conn.zrevrange(digests.key, 0, -1).each_with_object([]) do |digest, memo|
    next if belongs_to_job?(digest)

    memo << digest
    break if memo.size >= reaper_count
  end
end
queues(conn, &block) click to toggle source

Loops through all the redis queues and yields them one by one

@param [Redis] conn the connection to use for fetching queues

@return [void]

@yield queues one at a time

# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 175
def queues(conn, &block)
  conn.sscan_each("queues", &block)
end
retried?(digest) click to toggle source

Checks if the digest exists in the Sidekiq::RetrySet

@param [String] digest the current digest

@return [true] when digest exists in retry set

# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 100
def retried?(digest)
  in_sorted_set?(RETRY, digest)
end
scheduled?(digest) click to toggle source

Checks if the digest exists in the Sidekiq::ScheduledSet

@param [String] digest the current digest

@return [true] when digest exists in scheduled set

# File lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb, line 89
def scheduled?(digest)
  in_sorted_set?(SCHEDULE, digest)
end