module Sidekiq::Status::Storage

Constants

BATCH_LIMIT
RESERVED_FIELDS

Protected Instance Methods

delete_and_unschedule(job_id, job_unix_time = nil) click to toggle source

Unschedules the job and deletes the Status @param [String] id job id @param [Num] job_unix_time, unix timestamp for the scheduled job

# File lib/sidekiq-status/storage.rb, line 39
def delete_and_unschedule(job_id, job_unix_time = nil)
  Sidekiq::Status.redis_adapter do |conn|
    scan_options = {offset: 0, conn: conn, start: (job_unix_time || '-inf'), end: (job_unix_time || '+inf')}

    while not (jobs = schedule_batch(scan_options)).empty?
      match = scan_scheduled_jobs_for_jid jobs, job_id
      unless match.nil?
        conn.zrem "schedule", match
        conn.del key(job_id)
        return true # Done
      end
      scan_options[:offset] += BATCH_LIMIT
    end
  end
  false
end
delete_status(id) click to toggle source

Deletes status hash info for given job id @param job id @retrun [Integer] number of keys that were removed

# File lib/sidekiq-status/storage.rb, line 59
def delete_status(id)
  redis_connection do |conn|
    conn.del(key(id))
  end
end
read_field_for_id(id, field) click to toggle source

Gets a single valued from job status hash @param [String] id job id @param [String] Symbol field fetched field name @return [String] Redis operation status code

# File lib/sidekiq-status/storage.rb, line 69
def read_field_for_id(id, field)
  Sidekiq::Status.redis_adapter do |conn|
    conn.hget(key(id), field)
  end
end
read_hash_for_id(id) click to toggle source

Gets the whole status hash from the job status @param [String] id job id @return [Hash] Hash stored in redis

# File lib/sidekiq-status/storage.rb, line 78
def read_hash_for_id(id)
  Sidekiq::Status.redis_adapter do |conn|
    conn.hgetall(key(id))
  end
end
store_for_id(id, status_updates, expiration = nil, redis_pool=nil) click to toggle source

Stores multiple values into a job’s status hash, sets last update time @param [String] id job id @param [Hash] status_updates updated values @param [Integer] expiration optional expire time in seconds @param [ConnectionPool] redis_pool optional redis connection pool @return [String] Redis operation status code

# File lib/sidekiq-status/storage.rb, line 14
def store_for_id(id, status_updates, expiration = nil, redis_pool=nil)
  status_updates.transform_values!(&:to_s)
  redis_connection(redis_pool) do |conn|
    conn.multi do |pipeline|
      pipeline.hset  key(id), 'update_time', Time.now.to_i, *(status_updates.to_a.flatten(1))
      pipeline.expire key(id), (expiration || Sidekiq::Status::DEFAULT_EXPIRY)
      pipeline.publish "status_updates", id
    end[0]
  end
end
store_status(id, status, expiration = nil, redis_pool=nil) click to toggle source

Stores job status and sets expiration time to it only in case of :failed or :stopped job @param [String] id job id @param [Symbol] job status @param [Integer] expiration optional expire time in seconds @param [ConnectionPool] redis_pool optional redis connection pool @return [String] Redis operation status code

# File lib/sidekiq-status/storage.rb, line 32
def store_status(id, status, expiration = nil, redis_pool=nil)
  store_for_id id, {status: status}, expiration, redis_pool
end

Private Instance Methods

key(id) click to toggle source
# File lib/sidekiq-status/storage.rb, line 121
def key(id)
  "sidekiq:status:#{id}"
end
redis_connection(redis_pool=nil) { |conn| ... } click to toggle source

Yields redis connection. Uses redis pool if available. @param [ConnectionPool] redis_pool optional redis connection pool

# File lib/sidekiq-status/storage.rb, line 109
def redis_connection(redis_pool=nil)
  if redis_pool
    redis_pool.with do |conn|
      yield conn
    end
  else
    Sidekiq.redis do |conn|
      yield conn
    end
  end
end
scan_scheduled_jobs_for_jid(scheduled_jobs, job_id) click to toggle source

Searches the jobs Array for the job_id @param [Array] scheduled_jobs, results of Redis schedule key @param [String] id job id

# File lib/sidekiq-status/storage.rb, line 100
def scan_scheduled_jobs_for_jid(scheduled_jobs, job_id)
  # A Little skecthy, I know, but the structure of these internal JSON
  # is predefined in such a way where this will not catch unintentional elements,
  # and this is notably faster than performing JSON.parse() for every listing:
  scheduled_jobs.select { |job_listing| job_listing.match(/\"jid\":\"#{job_id}\"/) }[0]
end
schedule_batch(options) click to toggle source

Gets the batch of scheduled jobs based on input options Uses Redis zrangebyscore for log(n) search, if unix-time is provided @param [Hash] options, options hash containing (REQUIRED) keys:

-  conn: Redis connection
-  start: start score (i.e. -inf or a unix timestamp)
-  end: end score (i.e. +inf or a unix timestamp)
-  offset: current progress through (all) jobs (e.g.: 100 if you want jobs from 100 to BATCH_LIMIT)
# File lib/sidekiq-status/storage.rb, line 93
def schedule_batch(options)
  Sidekiq::Status.wrap_redis_connection(options[:conn]).schedule_batch("schedule", options.merge(limit: BATCH_LIMIT))
end