module Sidekiq::Status::Storage
Constants
- BATCH_LIMIT
- RESERVED_FIELDS
Protected Instance Methods
Unschedules the job and deletes the Status
@param [String] id job id @param [Num] job_unix_time, unix timestamp for the scheduled job
# File lib/sidekiq-status/storage.rb, line 39 def delete_and_unschedule(job_id, job_unix_time = nil) Sidekiq::Status.redis_adapter do |conn| scan_options = {offset: 0, conn: conn, start: (job_unix_time || '-inf'), end: (job_unix_time || '+inf')} while not (jobs = schedule_batch(scan_options)).empty? match = scan_scheduled_jobs_for_jid jobs, job_id unless match.nil? conn.zrem "schedule", match conn.del key(job_id) return true # Done end scan_options[:offset] += BATCH_LIMIT end end false end
Deletes status hash info for given job id @param job id @retrun [Integer] number of keys that were removed
# File lib/sidekiq-status/storage.rb, line 59 def delete_status(id) redis_connection do |conn| conn.del(key(id)) end end
Gets a single valued from job status hash @param [String] id job id @param [String] Symbol field fetched field name @return [String] Redis operation status code
# File lib/sidekiq-status/storage.rb, line 69 def read_field_for_id(id, field) Sidekiq::Status.redis_adapter do |conn| conn.hget(key(id), field) end end
Gets the whole status hash from the job status @param [String] id job id @return [Hash] Hash stored in redis
# File lib/sidekiq-status/storage.rb, line 78 def read_hash_for_id(id) Sidekiq::Status.redis_adapter do |conn| conn.hgetall(key(id)) end end
Stores multiple values into a job’s status hash, sets last update time @param [String] id job id @param [Hash] status_updates updated values @param [Integer] expiration optional expire time in seconds @param [ConnectionPool] redis_pool optional redis connection pool @return [String] Redis operation status code
# File lib/sidekiq-status/storage.rb, line 14 def store_for_id(id, status_updates, expiration = nil, redis_pool=nil) status_updates.transform_values!(&:to_s) redis_connection(redis_pool) do |conn| conn.multi do |pipeline| pipeline.hset key(id), 'update_time', Time.now.to_i, *(status_updates.to_a.flatten(1)) pipeline.expire key(id), (expiration || Sidekiq::Status::DEFAULT_EXPIRY) pipeline.publish "status_updates", id end[0] end end
Stores job status and sets expiration time to it only in case of :failed or :stopped job @param [String] id job id @param [Symbol] job status @param [Integer] expiration optional expire time in seconds @param [ConnectionPool] redis_pool optional redis connection pool @return [String] Redis operation status code
# File lib/sidekiq-status/storage.rb, line 32 def store_status(id, status, expiration = nil, redis_pool=nil) store_for_id id, {status: status}, expiration, redis_pool end
Private Instance Methods
# File lib/sidekiq-status/storage.rb, line 121 def key(id) "sidekiq:status:#{id}" end
Yields redis connection. Uses redis pool if available. @param [ConnectionPool] redis_pool optional redis connection pool
# File lib/sidekiq-status/storage.rb, line 109 def redis_connection(redis_pool=nil) if redis_pool redis_pool.with do |conn| yield conn end else Sidekiq.redis do |conn| yield conn end end end
Searches the jobs Array for the job_id @param [Array] scheduled_jobs, results of Redis schedule key @param [String] id job id
# File lib/sidekiq-status/storage.rb, line 100 def scan_scheduled_jobs_for_jid(scheduled_jobs, job_id) # A Little skecthy, I know, but the structure of these internal JSON # is predefined in such a way where this will not catch unintentional elements, # and this is notably faster than performing JSON.parse() for every listing: scheduled_jobs.select { |job_listing| job_listing.match(/\"jid\":\"#{job_id}\"/) }[0] end
Gets the batch of scheduled jobs based on input options Uses Redis zrangebyscore for log(n) search, if unix-time is provided @param [Hash] options, options hash containing (REQUIRED) keys:
- conn: Redis connection - start: start score (i.e. -inf or a unix timestamp) - end: end score (i.e. +inf or a unix timestamp) - offset: current progress through (all) jobs (e.g.: 100 if you want jobs from 100 to BATCH_LIMIT)
# File lib/sidekiq-status/storage.rb, line 93 def schedule_batch(options) Sidekiq::Status.wrap_redis_connection(options[:conn]).schedule_batch("schedule", options.merge(limit: BATCH_LIMIT)) end