class Qyu::Store::Memory::Adapter
Constants
- TYPE
Public Class Methods
new(_config)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 10 def initialize(_config) @workflows = {} @jobs = {} @tasks = {} @locks = {} @semaphore = Mutex.new end
valid_config?(_config)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 18 def self.valid_config?(_config) # TODO true end
Public Instance Methods
clear_completed_jobs()
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 77 def clear_completed_jobs # TODO end
count_jobs()
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 81 def count_jobs @jobs.count end
delete_job(id)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 73 def delete_job(id) @jobs.delete(id) end
delete_workflow(id)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 43 def delete_workflow(id) @workflows.delete(id) end
delete_workflow_by_name(name)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 47 def delete_workflow_by_name(name) workflow = find_workflow_by_name(name) return unless workflow delete_workflow(workflow['id']) end
find_job(id)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 53 def find_job(id) @jobs[id] end
find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) { |id| ... }
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 90 def find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) matching_task = @tasks.detect do |_id, attrs| attrs['job_id'] == job_id \ && attrs['name'] == name \ && attrs['payload'] == payload \ && attrs['queue_name'] == queue_name \ && attrs['parent_task_id'] == parent_task_id end return matching_task[0] if matching_task id = Qyu::Utils.uuid @tasks[id] = { 'name' => name, 'queue_name' => queue_name, 'parent_task_id' => parent_task_id, 'status' => Qyu::Status::QUEUED, 'payload' => payload, 'job_id' => job_id } yield(id) id end
find_task(id)
click to toggle source
Task
methods
# File lib/qyu/store/memory/adapter.rb, line 86 def find_task(id) @tasks[id] end
find_task_ids_by_job_id_and_name(job_id, name)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 113 def find_task_ids_by_job_id_and_name(job_id, name) @tasks.select do |_id, attrs| attrs['job_id'] == job_id && attrs['name'] == name end.map { |(id, _attr)| id } end
find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 119 def find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) @tasks.select do |_id, attrs| attrs['job_id'] == job_id && attrs['name'] == name && parent_task_ids.include?(attrs['parent_task_id']) end.map { |(id, _attr)| id } end
find_workflow(id)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 23 def find_workflow(id) @workflows[id] end
find_workflow_by_name(name)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 27 def find_workflow_by_name(name) @workflows.detect do |_id, wflow| wflow['name'] == name end.last end
lock_task!(id, lease_time)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 136 def lock_task!(id, lease_time) uuid = Qyu::Utils.uuid locked = false locked_until = nil @semaphore.synchronize do if @locks[id].nil? || @locks[id][:locked_until] < Time.now locked_until = Qyu::Utils.seconds_after_time(lease_time) @locks[id] = { locked_by: uuid, locked_until: locked_until } locked = true end end return [nil, nil] unless locked [uuid, locked_until] end
persist_job(workflow, payload)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 64 def persist_job(workflow, payload) id = Qyu::Utils.uuid @jobs[id] = { 'payload' => payload, 'workflow' => workflow } id end
persist_workflow(name, descriptor)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 33 def persist_workflow(name, descriptor) id = Qyu::Utils.uuid @workflows[id] = { 'id' => id, 'name' => name, 'descriptor' => descriptor } id end
renew_lock_lease(id, lease_time, lease_token)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 169 def renew_lock_lease(id, lease_time, lease_token) locked_until = nil @semaphore.synchronize do if @locks[id][:locked_by] == lease_token && Time.now <= @locks[id][:locked_until] locked_until = Qyu::Utils.seconds_after_time(lease_time) @locks[id] = { locked_by: lease_token, locked_until: locked_until } end end locked_until end
select_jobs(limit, offset, order = :asc)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 57 def select_jobs(limit, offset, order = :asc) ids = @jobs.keys[offset, limit] selected = ids.map { |id| { id: id }.merge(@jobs[id]) } return selected if order == :asc selected.reverse end
select_tasks_by_job_id(job_id)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 127 def select_tasks_by_job_id(job_id) @tasks.select { |_id, attrs| attrs['job_id'] == job_id }.map { |id, attrs| attrs.merge('id' => id) } end
task_status_counts(job_id)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 131 def task_status_counts(job_id) # TODO {} end
transaction() { || ... }
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 181 def transaction # TODO yield end
unlock_task!(id, lease_token)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 153 def unlock_task!(id, lease_token) unlocked = false @semaphore.synchronize do if @locks[id][:locked_by] == lease_token @locks.delete(id) unlocked = true end end unlocked end
update_status(id, status)
click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 165 def update_status(id, status) @tasks[id]['status'] = status end