module KubeQueue::Worker::ClassMethods

Attributes

template[W]

Public Instance Methods

active_job?() click to toggle source
# File lib/kube_queue/worker.rb, line 21
def active_job?
  defined?(ActiveJob) && ancestors.include?(ActiveJob::Base)
end
enqueue(*args) click to toggle source
# File lib/kube_queue/worker.rb, line 51
def enqueue(*args)
  job = new(*args)
  KubeQueue.executor.enqueue(job)
  job
end
Also aliased as: perform_async
enqueue_at(*args) click to toggle source
# File lib/kube_queue/worker.rb, line 58
def enqueue_at(*args)
  args = args.dup
  timestamp = args.pop
  job = new(*args)
  job.scheduled_at = timestamp
  KubeQueue.executor.enqueue(job)
  job
end
find(job_id) click to toggle source
# File lib/kube_queue/worker.rb, line 36
def find(job_id)
  namespace = job_spec.namespace

  name = job_spec.job_name(job_id)

  res = KubeQueue.client.get_job(namespace, name)
  worker = KubeQueue.fetch_worker(res.metadata.annotations['kube-queue-job-class'])

  payload = deserialize_annotation_payload(res.metadata.annotations['kube-queue-job-payload'])

  job = worker.new(*payload)
  job.resource = res
  job
end
list() click to toggle source
# File lib/kube_queue/worker.rb, line 25
def list
  namespace = job_spec.namespace

  KubeQueue.client.list_job(job_spec.job_class, namespace).map do |res|
    worker = KubeQueue.fetch_worker(res.metadata.annotations['kube-queue-job-class'])
    job = worker.new(*payload)
    job.resource = res
    job
  end
end
manifest() click to toggle source
# File lib/kube_queue/worker.rb, line 71
def manifest
  new.manifest
end
perform_async(*args)
Alias for: enqueue
read_template() click to toggle source
# File lib/kube_queue/worker.rb, line 67
def read_template
  File.read(template)
end
template() click to toggle source
# File lib/kube_queue/worker.rb, line 17
def template
  @template ||= File.expand_path('../../../template/job.yaml', __FILE__)
end

Private Instance Methods

deserialize_annotation_payload(payload) click to toggle source
# File lib/kube_queue/worker.rb, line 77
def deserialize_annotation_payload(payload)
  return payload if payload.empty?

  payload = JSON.parse(payload)

  # Compatibility for ActiveJob serialized payload
  payload = [payload] unless payload.is_a?(Array)

  payload = ActiveJob::Arguments.deserialize(payload) if defined?(ActiveJob::Arguments)

  payload
end