class Sidekiq::Group::Collection

Constants

CID_EXPIRE_TTL
LOCK_TTL

Attributes

callback_class[R]
callback_options[R]
cid[R]

Public Class Methods

new(cid = nil) click to toggle source
# File lib/sidekiq/group/collection.rb, line 12
def initialize(cid = nil)
  @cid = cid || SecureRandom.urlsafe_base64(16)
end

Public Instance Methods

add(jid) click to toggle source
# File lib/sidekiq/group/collection.rb, line 26
def add(jid)
  Sidekiq::Logging.logger.info "Scheduling child job #{jid} for parent #{@cid}" if Sidekiq::Group.debug

  Sidekiq.redis do |r|
    r.multi do
      r.sadd("#{@cid}-jids", jid)
      r.expire("#{@cid}-jids", CID_EXPIRE_TTL)
    end
  end
end
callback_class=(value) click to toggle source
# File lib/sidekiq/group/collection.rb, line 16
def callback_class=(value)
  @callback_class = value
  persist('callback_class', value)
end
callback_options=(value) click to toggle source
# File lib/sidekiq/group/collection.rb, line 21
def callback_options=(value)
  @callback_options = value
  persist('callback_options', value.to_json)
end
spawned_jobs!() click to toggle source
# File lib/sidekiq/group/collection.rb, line 37
def spawned_jobs!
  persist('spawned_jobs', cid)
end
success(jid) click to toggle source
# File lib/sidekiq/group/collection.rb, line 41
def success(jid)
  remove_processed(jid)

  return unless processed_all_jobs?
  return if locked?

  callback_class, callback_options = callback_data
  options = JSON(callback_options)

  Sidekiq::Logging.logger.info "Scheduling callback job #{callback_class} with #{options}" if Sidekiq::Group.debug
  Sidekiq::Group::Worker.perform_async(callback_class, options)

  cleanup_redis
end

Private Instance Methods

callback_data() click to toggle source
# File lib/sidekiq/group/collection.rb, line 82
def callback_data
  Sidekiq.redis do |r|
    r.multi do
      r.hget(@cid, 'callback_class')
      r.hget(@cid, 'callback_options')
    end
  end
end
cleanup_redis() click to toggle source
# File lib/sidekiq/group/collection.rb, line 100
def cleanup_redis
  Sidekiq.redis { |r| r.del(@cid, "#{@cid}-jids") }
end
locked?() click to toggle source
# File lib/sidekiq/group/collection.rb, line 104
def locked?
  Sidekiq.redis do |r|
    r.multi do
      r.getset("#{@cid}-finished", 1)
      r.expire("#{@cid}-finished", LOCK_TTL)
    end.first
  end
end
pending() click to toggle source
# File lib/sidekiq/group/collection.rb, line 68
def pending
  @pending ||= Sidekiq.redis { |r| r.scard("#{@cid}-jids") }
end
persist(attribute, value) click to toggle source
# File lib/sidekiq/group/collection.rb, line 91
def persist(attribute, value)
  Sidekiq.redis do |r|
    r.multi do
      r.hset(@cid, attribute, value)
      r.expire(@cid, CID_EXPIRE_TTL)
    end
  end
end
processed_all_jobs?() click to toggle source
# File lib/sidekiq/group/collection.rb, line 72
def processed_all_jobs?
  Sidekiq::Logging.logger.info "Pending jobs: #{pending}" if Sidekiq::Group.debug

  spawned_all_jobs? && pending.zero?
end
remove_processed(jid) click to toggle source
# File lib/sidekiq/group/collection.rb, line 58
def remove_processed(jid)
  Sidekiq::Logging.logger.info "Child job #{jid} completed" if Sidekiq::Group.debug

  return if Sidekiq.redis { |r| r.srem("#{@cid}-jids", jid) }

  Sidekiq::Logging.logger.info "Could not remove child job #{jid} from Redis" if Sidekiq::Group.debug
  sleep 1
  Sidekiq.redis { |r| r.srem("#{@cid}-jids", jid) }
end
spawned_all_jobs?() click to toggle source
# File lib/sidekiq/group/collection.rb, line 78
def spawned_all_jobs?
  Sidekiq.redis { |r| r.hget(@cid, 'spawned_jobs') }.present?
end