class Resque::Plugins::Stages::StagedGroupStage

rubocop:disable Metrics/ClassLength

Attributes

group_stage_id[R]

Public Class Methods

new(group_stage_id) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 20
def initialize(group_stage_id)
  @group_stage_id = group_stage_id
end

Public Instance Methods

<=>(other) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 24
def <=>(other)
  return nil unless other.is_a?(Resque::Plugins::Stages::StagedGroupStage)

  group_stage_id <=> other.group_stage_id
end
add_job(staged_group_job) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 177
def add_job(staged_group_job)
  redis.rpush stage_key, staged_group_job.job_id
end
blank?() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 211
def blank?
  !redis.exists(stage_key) && !redis.exists(staged_group_key)
end
delete() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 185
def delete
  jobs.each(&:delete)

  staged_group&.remove_stage self

  redis.del stage_key
  redis.del staged_group_key
end
enqueue(klass, *args) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 30
def enqueue(klass, *args)
  job = create_enqueue_job(klass, args)

  return job if status == :pending

  self.status = :running if status != :running

  job.status = :queued
  Resque.enqueue(*job.enqueue_args)

  job
end
enqueue_at(timestamp, klass, *args) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 56
def enqueue_at(timestamp, klass, *args)
  job = create_enqueue_job(klass, args)

  return job if status == :pending

  self.status = :running if status != :running

  job.status = :queued
  Resque.enqueue_at(timestamp, *job.enqueue_args)

  job
end
enqueue_at_with_queue(queue, timestamp, klass, *args) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 69
def enqueue_at_with_queue(queue, timestamp, klass, *args)
  job = create_enqueue_job(klass, args)

  return job if status == :pending

  self.status = :running if status != :running

  job.status = :queued
  Resque.enqueue_at_with_queue(queue, timestamp, *job.enqueue_args)

  job
end
enqueue_in(number_of_seconds_from_now, klass, *args) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 82
def enqueue_in(number_of_seconds_from_now, klass, *args)
  job = create_enqueue_job(klass, args)

  return job if status == :pending

  self.status = :running if status != :running

  job.status = :queued
  Resque.enqueue_in(number_of_seconds_from_now, *job.enqueue_args)

  job
end
enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 95
def enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args)
  job = create_enqueue_job(klass, args)

  return job if status == :pending

  self.status = :running if status != :running

  job.status = :queued
  Resque.enqueue_in_with_queue(queue, number_of_seconds_from_now, *job.enqueue_args)

  job
end
enqueue_to(queue, klass, *args) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 43
def enqueue_to(queue, klass, *args)
  job = create_enqueue_job(klass, args)

  return job if status == :pending

  self.status = :running if status != :running

  job.status = :queued
  Resque.enqueue_to(queue, *job.enqueue_args)

  job
end
initiate() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 194
def initiate
  self.status = :running

  jobs.each do |job|
    next if job.completed?
    next if job.queued?

    job.enqueue_job
  end

  job_completed
end
job_completed() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 207
def job_completed
  self.status = :complete if jobs.all?(&:completed?)
end
jobs(start = 0, stop = -1) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 140
def jobs(start = 0, stop = -1)
  redis.lrange(stage_key, start, stop).map { |id| Resque::Plugins::Stages::StagedJob.new(id) }
end
jobs_by_status(status) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 144
def jobs_by_status(status)
  jobs.select { |job| job.status == status }
end
num_jobs() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 173
def num_jobs
  redis.llen(stage_key)
end
number() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 118
def number
  redis.hget(staged_group_key, "number")&.to_i || 1
end
number=(value) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 122
def number=(value)
  redis.hset(staged_group_key, "number", value)
end
order_param(sort_option, current_sort, current_order) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 163
def order_param(sort_option, current_sort, current_order)
  current_order ||= "asc"

  if sort_option == current_sort
    current_order == "asc" ? "desc" : "asc"
  else
    "asc"
  end
end
paginated_jobs(sort_key = :class_name, sort_order = "asc", page_num = 1, queue_page_size = 20) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 148
def paginated_jobs(sort_key = :class_name,
                   sort_order = "asc",
                   page_num = 1,
                   queue_page_size = 20)
  queue_page_size = queue_page_size.to_i
  queue_page_size = 20 if queue_page_size < 1

  job_list = sorted_jobs(sort_key)

  page_start = (page_num - 1) * queue_page_size
  page_start = 0 if page_start > job_list.length || page_start.negative?

  (sort_order == "desc" ? job_list.reverse : job_list)[page_start..(page_start + queue_page_size - 1)]
end
remove_job(staged_group_job) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 181
def remove_job(staged_group_job)
  redis.lrem(stage_key, 0, staged_group_job.job_id)
end
staged_group() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 126
def staged_group
  return nil if staged_group_id.blank?

  @staged_group ||= Resque::Plugins::Stages::StagedGroup.new(staged_group_id)
end
staged_group=(value) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 132
def staged_group=(value)
  @staged_group    = value
  @staged_group_id = value.group_id

  value.add_stage(self)
  redis.hset(staged_group_key, "staged_group_id", value.group_id)
end
status() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 108
def status
  redis.hget(staged_group_key, "status")&.to_sym || :pending
end
status=(value) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 112
def status=(value)
  redis.hset(staged_group_key, "status", value.to_s)

  staged_group&.stage_completed if status == :complete
end
verify() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 215
def verify
  return build_new_structure if staged_group.blank?

  staged_group.verify_stage(self)
end
verify_job(job) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 221
def verify_job(job)
  ids = redis.lrange(stage_key, 0, -1)

  return if ids.include?(job.job_id)

  redis.lpush(stage_key, job.job_id)
end

Private Instance Methods

build_new_structure() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 231
def build_new_structure
  group = Resque::Plugins::Stages::StagedGroup.new(SecureRandom.uuid)

  self.staged_group = group
end
create_enqueue_job(klass, args) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 237
def create_enqueue_job(klass, args)
  Resque::Plugins::Stages::StagedJob.create_job self, klass, *args
end
job_sort_value(job, sort_key) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 259
def job_sort_value(job, sort_key)
  case sort_key.to_sym
    when :class_name,
        :status,
        :status_message
      job.public_send(sort_key)

    when :queue_time
      job.public_send(sort_key).to_s
  end
end
sorted_jobs(sort_key) click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 253
def sorted_jobs(sort_key)
  jobs.sort_by do |job|
    job_sort_value(job, sort_key)
  end
end
stage_key() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 241
def stage_key
  "StagedGroupStage::#{group_stage_id}"
end
staged_group_id() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 249
def staged_group_id
  @staged_group_id ||= redis.hget(staged_group_key, "staged_group_id")
end
staged_group_key() click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 245
def staged_group_key
  "#{stage_key}::staged_group"
end