class Resque::Kubernetes::JobsManager
Spins up Kubernetes
Jobs to run Resque
workers.
Attributes
owner[R]
Public Class Methods
new(owner)
click to toggle source
# File lib/resque/kubernetes/jobs_manager.rb, line 16 def initialize(owner) @owner = owner @default_namespace = "default" end
Public Instance Methods
apply_kubernetes_job()
click to toggle source
# File lib/resque/kubernetes/jobs_manager.rb, line 41 def apply_kubernetes_job manifest = DeepHash.new.merge!(owner.job_manifest) ensure_namespace(manifest) # Do not start job if we have reached our maximum count return if jobs_maxed?(manifest["metadata"]["name"], manifest["metadata"]["namespace"]) adjust_manifest(manifest) job = Kubeclient::Resource.new(manifest) jobs_client.create_job(job) end
reap_finished_jobs()
click to toggle source
# File lib/resque/kubernetes/jobs_manager.rb, line 21 def reap_finished_jobs finished_jobs.each do |job| begin jobs_client.delete_job(job.metadata.name, job.metadata.namespace) rescue KubeException => e raise unless e.error_code == 404 end end end
reap_finished_pods()
click to toggle source
# File lib/resque/kubernetes/jobs_manager.rb, line 31 def reap_finished_pods finished_pods.each do |pod| begin pods_client.delete_pod(pod.metadata.name, pod.metadata.namespace) rescue KubeException => e raise unless e.error_code == 404 end end end
Private Instance Methods
build_client(scope)
click to toggle source
# File lib/resque/kubernetes/jobs_manager.rb, line 70 def build_client(scope) context = ContextFactory.context return unless context @default_namespace = context.namespace if context.namespace Kubeclient::Client.new(context.endpoint + scope, context.version, context.options) end
client(scope)
click to toggle source
# File lib/resque/kubernetes/jobs_manager.rb, line 64 def client(scope) return RetriableClient.new(Resque::Kubernetes.kubeclient) if Resque::Kubernetes.kubeclient client = build_client(scope) RetriableClient.new(client) if client end
finished_jobs()
click to toggle source
# File lib/resque/kubernetes/jobs_manager.rb, line 78 def finished_jobs resque_jobs = jobs_client.get_jobs(label_selector: "resque-kubernetes=job") resque_jobs.select { |job| job.spec.completions == job.status.succeeded } end
finished_pods()
click to toggle source
# File lib/resque/kubernetes/jobs_manager.rb, line 83 def finished_pods resque_jobs = pods_client.get_pods(label_selector: "resque-kubernetes=pod") resque_jobs.select do |pod| pod.status.phase == "Succeeded" && pod.status.containerStatuses.all? do |status| status.state.terminated.reason == "Completed" end end end
jobs_client()
click to toggle source
# File lib/resque/kubernetes/jobs_manager.rb, line 56 def jobs_client @jobs_client ||= client("/apis/batch") end
jobs_maxed?(name, namespace)
click to toggle source
# File lib/resque/kubernetes/jobs_manager.rb, line 92 def jobs_maxed?(name, namespace) resque_jobs = jobs_client.get_jobs( label_selector: "resque-kubernetes=job,resque-kubernetes-group=#{name}", namespace: namespace ) running = resque_jobs.reject { |job| job.spec.completions == job.status.succeeded } running.size >= owner.max_workers end
pods_client()
click to toggle source
# File lib/resque/kubernetes/jobs_manager.rb, line 60 def pods_client @pods_client ||= client("") end