class Sidekiq::Cron::Job

Constants

LAST_ENQUEUE_TIME_FORMAT
REMEMBER_THRESHOLD

how long we would like to store informations about previous enqueues

Attributes

args[RW]
cron[RW]
description[RW]
fetch_missing_args[R]
klass[RW]
last_enqueue_time[R]
message[RW]
name[RW]

Public Class Methods

all() click to toggle source

get all cron jobs

# File lib/sidekiq/cron/job.rb, line 202
def self.all
  job_hashes = nil
  Sidekiq.redis do |conn|
    set_members = conn.smembers(jobs_key)
    job_hashes = conn.pipelined do
      set_members.each do |key|
        conn.hgetall(key)
      end
    end
  end
  job_hashes.compact.reject(&:empty?).collect do |h|
    # no need to fetch missing args from redis since we just got this hash from there
    Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false))
  end
end
count() click to toggle source
# File lib/sidekiq/cron/job.rb, line 218
def self.count
  out = 0
  Sidekiq.redis do |conn|
    out = conn.scard(jobs_key)
  end
  out
end
create(hash) click to toggle source

create new instance of cron job

# File lib/sidekiq/cron/job.rb, line 240
def self.create hash
  new(hash).save
end
destroy(name) click to toggle source

destroy job by name

# File lib/sidekiq/cron/job.rb, line 245
def self.destroy name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  if job = find(name)
    job.destroy
  else
    false
  end
end
destroy_all!() click to toggle source

remove all job from cron

# File lib/sidekiq/cron/job.rb, line 511
def self.destroy_all!
  all.each do |job|
    job.destroy
  end
  logger.info { "Cron Jobs - deleted all jobs" }
end
destroy_removed_jobs(new_job_names) click to toggle source

remove “removed jobs” between current jobs and new jobs

# File lib/sidekiq/cron/job.rb, line 519
def self.destroy_removed_jobs new_job_names
  current_job_names = Sidekiq::Cron::Job.all.map(&:name)
  removed_job_names = current_job_names - new_job_names
  removed_job_names.each { |j| Sidekiq::Cron::Job.destroy(j) }
  removed_job_names
end
exists?(name) click to toggle source
# File lib/sidekiq/cron/job.rb, line 540
def self.exists? name
  out = false
  Sidekiq.redis do |conn|
    out = conn.exists redis_key name
  end
  out
end
find(name) click to toggle source
# File lib/sidekiq/cron/job.rb, line 226
def self.find name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  output = nil
  Sidekiq.redis do |conn|
    if exists? name
      output = Job.new conn.hgetall( redis_key(name) )
    end
  end
  output
end
load_from_array(array) click to toggle source

load cron jobs from Array input structure should look like: [

{
  'name'        => 'name_of_job',
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
{
  'name'  => 'Cool Job for Second Class',
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

]

# File lib/sidekiq/cron/job.rb, line 184
def self.load_from_array array
  errors = {}
  array.each do |job_data|
    job = new(job_data)
    errors[job.name] = job.errors unless job.save
  end
  errors
end
load_from_array!(array) click to toggle source

like to {#load_from_array} If exists old jobs in redis but removed from args, destroy old jobs

# File lib/sidekiq/cron/job.rb, line 195
def self.load_from_array! array
  job_names = array.map { |job| job["name"] }
  destroy_removed_jobs(job_names)
  load_from_array(array)
end
load_from_hash(hash) click to toggle source

load cron jobs from Hash input structure should look like: {

'name_of_job' => {
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
'My super iber cool job' => {
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

}

# File lib/sidekiq/cron/job.rb, line 152
def self.load_from_hash hash
  array = hash.inject([]) do |out,(key, job)|
    job['name'] = key
    out << job
  end
  load_from_array array
end
load_from_hash!(hash) click to toggle source

like to {#load_from_hash} If exists old jobs in redis but removed from args, destroy old jobs

# File lib/sidekiq/cron/job.rb, line 162
def self.load_from_hash! hash
  destroy_removed_jobs(hash.keys)
  load_from_hash(hash)
end
new(input_args = {}) click to toggle source
# File lib/sidekiq/cron/job.rb, line 259
def initialize input_args = {}
  args = Hash[input_args.map{ |k, v| [k.to_s, v] }]
  @fetch_missing_args = args.delete('fetch_missing_args')
  @fetch_missing_args = true if @fetch_missing_args.nil?

  @name = args["name"]
  @cron = args["cron"]
  @description = args["description"] if args["description"]

  #get class from klass or class
  @klass = args["klass"] || args["class"]

  #set status of job
  @status = args['status'] || status_from_redis

  #set last enqueue time - from args or from existing job
  if args['last_enqueue_time'] && !args['last_enqueue_time'].empty?
    @last_enqueue_time = parse_enqueue_time(args['last_enqueue_time'])
  else
    @last_enqueue_time = last_enqueue_time_from_redis
  end

  #get right arguments for job
  @args = args["args"].nil? ? [] : parse_args( args["args"] )
  @args += [Time.now.to_f] if args["date_as_argument"]

  @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
  @active_job_queue_name_prefix = args["queue_name_prefix"]
  @active_job_queue_name_delimiter = args["queue_name_delimiter"]

  if args["message"]
    @message = args["message"]
    message_data = Sidekiq.load_json(@message) || {}
    @queue = message_data['queue'] || "default"
  elsif @klass
    message_data = {
      "class" => @klass.to_s,
      "args"  => @args,
    }

    #get right data for message
    #only if message wasn't specified before
    klass_data = case @klass
      when Class
        @klass.get_sidekiq_options
      when String
        begin
          Sidekiq::Cron::Support.constantize(@klass).get_sidekiq_options
        rescue Exception => e
          #Unknown class
          {"queue"=>"default"}
        end
    end

    message_data = klass_data.merge(message_data)
    #override queue if setted in config
    #only if message is hash - can be string (dumped JSON)
    if args['queue']
      @queue = message_data['queue'] = args['queue']
    else
      @queue = message_data['queue'] || "default"
    end

    #dump message as json
    @message = message_data
  end

  @queue_name_with_prefix = queue_name_with_prefix
end

Private Class Methods

jid_history_key(name) click to toggle source
# File lib/sidekiq/cron/job.rb, line 622
def self.jid_history_key name
  "cron_job:#{name}:jid_history"
end
job_enqueued_key(name) click to toggle source

Redis key for storing one cron job run times (when poller added job to queue)

# File lib/sidekiq/cron/job.rb, line 618
def self.job_enqueued_key name
  "cron_job:#{name}:enqueued"
end
jobs_key() click to toggle source

Redis key for set of all cron jobs

# File lib/sidekiq/cron/job.rb, line 602
def self.jobs_key
  "cron_jobs"
end
redis_key(name) click to toggle source

Redis key for storing one cron job

# File lib/sidekiq/cron/job.rb, line 607
def self.redis_key name
  "cron_job:#{name}"
end

Public Instance Methods

active_job_message() click to toggle source

active job has different structure how it is loading data from sidekiq queue, it createaswrapper arround job

# File lib/sidekiq/cron/job.rb, line 122
def active_job_message
  {
    'class'        => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper',
    'wrapped'      => @klass,
    'queue'        => @queue_name_with_prefix,
    'description'  => @description,
    'args'         => [{
      'job_class'  => @klass,
      'job_id'     => SecureRandom.uuid,
      'queue_name' => @queue_name_with_prefix,
      'arguments'  => @args
    }]
  }
end
add_jid_history(jid) click to toggle source
# File lib/sidekiq/cron/job.rb, line 476
def add_jid_history(jid)
  jid_history = {
    jid: jid,
    enqueued: @last_enqueue_time
  }
  @history_size ||= (Sidekiq.options[:cron_history_size] || 10).to_i - 1
  Sidekiq.redis do |conn|
    conn.lpush jid_history_key,
               Sidekiq.dump_json(jid_history)
    # keep only last 10 entries in a fifo manner
    conn.ltrim jid_history_key, 0, @history_size
  end
end
destroy() click to toggle source

remove job from cron jobs by name input:

first arg: name (string) - name of job (must be same - case sensitive)
# File lib/sidekiq/cron/job.rb, line 493
def destroy
  Sidekiq.redis do |conn|
    #delete from set
    conn.srem self.class.jobs_key, redis_key

    #delete runned timestamps
    conn.del job_enqueued_key

    # delete jid_history
    conn.del jid_history_key

    #delete main job
    conn.del redis_key
  end
  logger.info { "Cron Jobs - deleted job with name: #{@name}" }
end
disable!() click to toggle source
# File lib/sidekiq/cron/job.rb, line 333
def disable!
  @status = "disabled"
  save
end
disabled?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 347
def disabled?
  !enabled?
end
enable!() click to toggle source
# File lib/sidekiq/cron/job.rb, line 338
def enable!
  @status = "enabled"
  save
end
enabled?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 343
def enabled?
  @status == "enabled"
end
enque!(time = Time.now.utc) click to toggle source

enque cron job to queue

# File lib/sidekiq/cron/job.rb, line 49
def enque! time = Time.now.utc
  @last_enqueue_time = time.strftime(LAST_ENQUEUE_TIME_FORMAT)

  klass_const =
      begin
        Sidekiq::Cron::Support.constantize(@klass.to_s)
      rescue NameError
        nil
      end

  jid =
    if klass_const
      if defined?(ActiveJob::Base) && klass_const < ActiveJob::Base
        enqueue_active_job(klass_const).try :provider_job_id
      else
        enqueue_sidekiq_worker(klass_const)
      end
    else
      if @active_job
        Sidekiq::Client.push(active_job_message)
      else
        Sidekiq::Client.push(sidekiq_worker_message)
      end
    end

  save_last_enqueue_time
  add_jid_history jid
  logger.debug { "enqueued #{@name}: #{@message}" }
end
enqueue_active_job(klass_const) click to toggle source
# File lib/sidekiq/cron/job.rb, line 85
def enqueue_active_job(klass_const)
  klass_const.set(queue: @queue).perform_later(*@args)
end
enqueue_sidekiq_worker(klass_const) click to toggle source
# File lib/sidekiq/cron/job.rb, line 89
def enqueue_sidekiq_worker(klass_const)
  klass_const.set(queue: queue_name_with_prefix).perform_async(*@args)
end
errors() click to toggle source
# File lib/sidekiq/cron/job.rb, line 407
def errors
  @errors ||= []
end
exists?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 548
def exists?
  self.class.exists? @name
end
formated_enqueue_time(now = Time.now.utc) click to toggle source
# File lib/sidekiq/cron/job.rb, line 532
def formated_enqueue_time now = Time.now.utc
  last_time(now).getutc.to_f.to_s
end
formated_last_time(now = Time.now.utc) click to toggle source
# File lib/sidekiq/cron/job.rb, line 536
def formated_last_time now = Time.now.utc
  last_time(now).getutc.iso8601
end
is_active_job?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 79
def is_active_job?
  @active_job || defined?(ActiveJob::Base) && Sidekiq::Cron::Support.constantize(@klass.to_s) < ActiveJob::Base
rescue NameError
  false
end
jid_history_from_redis() click to toggle source
# File lib/sidekiq/cron/job.rb, line 378
def jid_history_from_redis
  out =
    Sidekiq.redis do |conn|
      conn.lrange(jid_history_key, 0, -1) rescue nil
    end

  # returns nil if out nil
  out && out.map do |jid_history_raw|
    Sidekiq.load_json jid_history_raw
  end
end
klass_valid() click to toggle source
# File lib/sidekiq/cron/job.rb, line 431
def klass_valid
  case @klass
    when Class
      true
    when String
      @klass.size > 0
    else
  end
end
last_enqueue_time_from_redis() click to toggle source
# File lib/sidekiq/cron/job.rb, line 368
def last_enqueue_time_from_redis
  out = nil
  if fetch_missing_args
    Sidekiq.redis do |conn|
      out = parse_enqueue_time(conn.hget(redis_key, "last_enqueue_time")) rescue nil
    end
  end
  out
end
last_time(now = Time.now.utc) click to toggle source

Parse cron specification '* * * * *' and returns time when last run should be performed

# File lib/sidekiq/cron/job.rb, line 528
def last_time now = Time.now.utc
  parsed_cron.previous_time(now.utc).utc
end
pretty_message() click to toggle source
# File lib/sidekiq/cron/job.rb, line 351
def pretty_message
  JSON.pretty_generate Sidekiq.load_json(message)
rescue JSON::ParserError
  message
end
queue_name_with_prefix() click to toggle source
# File lib/sidekiq/cron/job.rb, line 98
def queue_name_with_prefix
  return @queue unless is_active_job?

  if !"#{@active_job_queue_name_delimiter}".empty?
    queue_name_delimiter = @active_job_queue_name_delimiter
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_delimiter) && !ActiveJob::Base.queue_name_delimiter.empty?
    queue_name_delimiter = ActiveJob::Base.queue_name_delimiter
  else
    queue_name_delimiter = '_'
  end

  if !"#{@active_job_queue_name_prefix}".empty?
    queue_name = "#{@active_job_queue_name_prefix}#{queue_name_delimiter}#{@queue}"
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_prefix) && !"#{ActiveJob::Base.queue_name_prefix}".empty?
    queue_name = "#{ActiveJob::Base.queue_name_prefix}#{queue_name_delimiter}#{@queue}"
  else
    queue_name = @queue
  end

  queue_name
end
remove_previous_enques(time) click to toggle source

remove previous informations about run times this will clear redis and make sure that redis will not overflow with memory

# File lib/sidekiq/cron/job.rb, line 32
def remove_previous_enques time
  Sidekiq.redis do |conn|
    conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}")
  end
end
save() click to toggle source

add job to cron jobs input:

name: (string) - name of job
cron: (string: '* * * * *' - cron specification when to run job
class: (string|class) - which class to perform

optional input:

queue: (string) - which queue to use for enquing (will override class queue)
args: (array|hash|nil) - arguments for permorm method
# File lib/sidekiq/cron/job.rb, line 450
def save
  #if job is invalid return false
  return false unless valid?

  Sidekiq.redis do |conn|

    #add to set of all jobs
    conn.sadd self.class.jobs_key, redis_key

    #add informations for this job!
    conn.hmset redis_key, *hash_to_redis(to_hash)

    #add information about last time! - don't enque right after scheduler poller starts!
    time = Time.now.utc
    conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.exists(job_enqueued_key)
  end
  logger.info { "Cron Jobs - add job with name: #{@name}" }
end
save_last_enqueue_time() click to toggle source
# File lib/sidekiq/cron/job.rb, line 469
def save_last_enqueue_time
  Sidekiq.redis do |conn|
    # update last enqueue time
    conn.hset redis_key, 'last_enqueue_time', @last_enqueue_time
  end
end
should_enque?(time) click to toggle source

crucial part of whole enquing job

# File lib/sidekiq/cron/job.rb, line 18
def should_enque? time
  enqueue = false
  enqueue = Sidekiq.redis do |conn|
    status == "enabled" &&
      not_past_scheduled_time?(time) &&
      not_enqueued_after?(time) &&
      conn.zadd(job_enqueued_key, formated_enqueue_time(time), formated_last_time(time))
  end
  enqueue
end
sidekiq_worker_message() click to toggle source

siodekiq worker message

# File lib/sidekiq/cron/job.rb, line 94
def sidekiq_worker_message
  @message.is_a?(String) ? Sidekiq.load_json(@message) : @message
end
sort_name() click to toggle source
# File lib/sidekiq/cron/job.rb, line 552
def sort_name
  "#{status == "enabled" ? 0 : 1}_#{name}".downcase
end
status() click to toggle source
# File lib/sidekiq/cron/job.rb, line 329
def status
  @status
end
status_from_redis() click to toggle source
# File lib/sidekiq/cron/job.rb, line 357
def status_from_redis
  out = "enabled"
  if fetch_missing_args
    Sidekiq.redis do |conn|
      status = conn.hget redis_key, "status"
      out = status if status
    end
  end
  out
end
test_and_enque_for_time!(time) click to toggle source

test if job should be enqued If yes add it to queue

# File lib/sidekiq/cron/job.rb, line 39
def test_and_enque_for_time! time
  #should this job be enqued?
  if should_enque?(time)
    enque!

    remove_previous_enques(time)
  end
end
to_hash() click to toggle source

export job data to hash

# File lib/sidekiq/cron/job.rb, line 391
def to_hash
  {
    name: @name,
    klass: @klass,
    cron: @cron,
    description: @description,
    args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []),
    message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}),
    status: @status,
    active_job: @active_job,
    queue_name_prefix: @active_job_queue_name_prefix,
    queue_name_delimiter: @active_job_queue_name_delimiter,
    last_enqueue_time: @last_enqueue_time,
  }
end
valid?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 411
def valid?
  #clear previous errors
  @errors = []

  errors << "'name' must be set" if @name.nil? || @name.size == 0
  if @cron.nil? || @cron.size == 0
    errors << "'cron' must be set"
  else
    begin
      @parsed_cron = Fugit.do_parse_cron(@cron)
    rescue => e
      errors << "'cron' -> #{@cron.inspect} -> #{e.class}: #{e.message}"
    end
  end

  errors << "'klass' (or class) must be set" unless klass_valid

  errors.empty?
end

Private Instance Methods

hash_to_redis(hash) click to toggle source

Give Hash returns array for using it for redis.hmset

# File lib/sidekiq/cron/job.rb, line 638
def hash_to_redis hash
  hash.inject([]){ |arr,kv| arr + [kv[0], kv[1]] }
end
jid_history_key() click to toggle source
# File lib/sidekiq/cron/job.rb, line 632
def jid_history_key
  self.class.jid_history_key @name
end
job_enqueued_key() click to toggle source

Redis key for storing one cron job run times (when poller added job to queue)

# File lib/sidekiq/cron/job.rb, line 628
def job_enqueued_key
  self.class.job_enqueued_key @name
end
not_enqueued_after?(time) click to toggle source
# File lib/sidekiq/cron/job.rb, line 562
def not_enqueued_after?(time)
  @last_enqueue_time.nil? || @last_enqueue_time.to_i < last_time(time).to_i
end
not_past_scheduled_time?(current_time) click to toggle source
# File lib/sidekiq/cron/job.rb, line 593
def not_past_scheduled_time?(current_time)
  last_cron_time = parsed_cron.previous_time(current_time).utc
    # or could it be?
  #last_cron_time = last_time(current_time)
  return false if (current_time.to_i - last_cron_time.to_i) > 60
  true
end
parse_args(args) click to toggle source

Try parsing inbound args into an array. args from Redis will be encoded JSON; try to load JSON, then failover to string array.

# File lib/sidekiq/cron/job.rb, line 570
def parse_args(args)
  case args
  when String
    begin
      Sidekiq.load_json(args)
    rescue JSON::ParserError
      [*args]   # cast to string array
    end
  when Hash
    [args]      # just put hash into array
  when Array
    args        # do nothing, already array
  else
    [*args]     # cast to string array
  end
end
parse_enqueue_time(timestamp) click to toggle source
# File lib/sidekiq/cron/job.rb, line 587
def parse_enqueue_time(timestamp)
  DateTime.strptime(timestamp, LAST_ENQUEUE_TIME_FORMAT).to_time.utc
rescue ArgumentError
  DateTime.parse(timestamp).to_time.utc
end
parsed_cron() click to toggle source
# File lib/sidekiq/cron/job.rb, line 558
def parsed_cron
  @parsed_cron ||= Fugit.parse_cron(@cron)
end
redis_key() click to toggle source

Redis key for storing one cron job

# File lib/sidekiq/cron/job.rb, line 612
def redis_key
  self.class.redis_key @name
end