class SidekiqRobustJob::SidekiqJobManager

Attributes

clock[R]
digest_generator[R]
jobs_repository[R]
memory_monitor[R]

Public Class Methods

new(jobs_repository:, clock:, digest_generator:, memory_monitor:) click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 6
def initialize(jobs_repository:, clock:, digest_generator:, memory_monitor:)
  @jobs_repository = jobs_repository
  @clock = clock
  @digest_generator = digest_generator
  @memory_monitor = memory_monitor
end

Public Instance Methods

perform(job_id) click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 44
def perform(job_id)
  job = jobs_repository.find(job_id)
  return if job.unprocessable?

  job.started(memory_monitor: memory_monitor)
  jobs_repository.save(job)
  job.execute
end
perform_async(job_class, *arguments) click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 13
def perform_async(job_class, *arguments)
  job = create_job(job_class, *arguments)
  return if job.unprocessable?
  job_class.original_perform_async(job.id).tap do |sidekiq_jid|
    job.assign_sidekiq_data(execute_at: clock.now, sidekiq_jid: sidekiq_jid)
    jobs_repository.save(job)
  end
end
perform_at(job_class, time, *arguments) click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 31
def perform_at(job_class, time, *arguments)
  job = create_job(job_class, *arguments)
  return if job.unprocessable?
  job_class.original_perform_at(time, job.id).tap do |sidekiq_jid|
    job.assign_sidekiq_data(execute_at: time, sidekiq_jid: sidekiq_jid)
    jobs_repository.save(job)
  end
end
perform_in(job_class, interval, *arguments) click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 22
def perform_in(job_class, interval, *arguments)
  job = create_job(job_class, *arguments)
  return if job.unprocessable?
  job_class.original_perform_in(interval, job.id).tap do |sidekiq_jid|
    job.assign_sidekiq_data(execute_at: clock.now + interval, sidekiq_jid: sidekiq_jid)
    jobs_repository.save(job)
  end
end
set(job_class, options = {}) click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 40
def set(job_class, options = {})
  SidekiqRobustJob::DependenciesContainer["setter_proxy_job"].build(job_class, options)
end

Private Instance Methods

create_job(job_class, *arguments) click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 55
def create_job(job_class, *arguments)
  jobs_repository.build(
    job_class: job_class,
    arguments: Array.wrap(arguments),
    enqueued_at: clock.now,
    digest: digest_generator.generate(job_class, *arguments),
    queue: job_class.get_sidekiq_options.fetch("queue", "default"),
    uniqueness_strategy: job_class.get_sidekiq_options.fetch("uniqueness_strategy",
      SidekiqRobustJob::UniquenessStrategy.no_uniqueness),
    enqueue_conflict_resolution_strategy: job_class.get_sidekiq_options.fetch("enqueue_conflict_resolution_strategy",
      SidekiqRobustJob::EnqueueConflictResolutionStrategy.do_nothing)
  ).tap do |job|
    jobs_repository.save(job) if persist_job_immediately?(job_class)
    jobs_repository.transaction do
      resolve_potential_conflict_for_enqueueing(job)
      jobs_repository.save(job) if persist_after_resolving_conflict_for_enqueueing(job, job_class)
    end
  end
end
persist_after_resolving_conflict_for_enqueueing(job, job_class) click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 85
def persist_after_resolving_conflict_for_enqueueing(job, job_class)
  return true if persist_self_dropped_jobs?(job_class)

  !job.dropped?
end
persist_job_immediately?(job_class) click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 81
def persist_job_immediately?(job_class)
  persist_self_dropped_jobs?(job_class)
end
persist_self_dropped_jobs?(job_class) click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 91
def persist_self_dropped_jobs?(job_class)
  job_class.get_sidekiq_options.fetch("persist_self_dropped_jobs", true)
end
resolve_potential_conflict_for_enqueueing(job) click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 75
def resolve_potential_conflict_for_enqueueing(job)
  SidekiqRobustJob::DependenciesContainer["enqueue_conflict_resolution_resolver"]
    .resolve(job.enqueue_conflict_resolution_strategy)
    .execute(job)
end