class Qyu::Store::Memory::Adapter

Qyu::Store::Memory::Adapter

Constants

TYPE

Public Class Methods

new(_config) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 10
def initialize(_config)
  @workflows = {}
  @jobs = {}
  @tasks = {}
  @locks = {}
  @semaphore = Mutex.new
end
valid_config?(_config) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 18
def self.valid_config?(_config)
  # TODO
  true
end

Public Instance Methods

clear_completed_jobs() click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 77
def clear_completed_jobs
  # TODO
end
count_jobs() click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 81
def count_jobs
  @jobs.count
end
delete_job(id) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 73
def delete_job(id)
  @jobs.delete(id)
end
delete_workflow(id) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 43
def delete_workflow(id)
  @workflows.delete(id)
end
delete_workflow_by_name(name) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 47
def delete_workflow_by_name(name)
  workflow = find_workflow_by_name(name)
  return unless workflow
  delete_workflow(workflow['id'])
end
find_job(id) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 53
def find_job(id)
  @jobs[id]
end
find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) { |id| ... } click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 90
def find_or_persist_task(name, queue_name, payload, job_id, parent_task_id)
  matching_task = @tasks.detect do |_id, attrs|
    attrs['job_id'] == job_id \
    && attrs['name'] == name \
    && attrs['payload'] == payload \
    && attrs['queue_name'] == queue_name \
    && attrs['parent_task_id'] == parent_task_id
  end
  return matching_task[0] if matching_task

  id = Qyu::Utils.uuid
  @tasks[id] = {
    'name' => name,
    'queue_name' => queue_name,
    'parent_task_id' => parent_task_id,
    'status' => Qyu::Status::QUEUED,
    'payload' => payload,
    'job_id' => job_id
  }
  yield(id)
  id
end
find_task(id) click to toggle source

Task methods

# File lib/qyu/store/memory/adapter.rb, line 86
def find_task(id)
  @tasks[id]
end
find_task_ids_by_job_id_and_name(job_id, name) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 113
def find_task_ids_by_job_id_and_name(job_id, name)
  @tasks.select do |_id, attrs|
    attrs['job_id'] == job_id && attrs['name'] == name
  end.map { |(id, _attr)| id }
end
find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 119
def find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids)
  @tasks.select do |_id, attrs|
    attrs['job_id'] == job_id &&
    attrs['name'] == name &&
    parent_task_ids.include?(attrs['parent_task_id'])
  end.map { |(id, _attr)| id }
end
find_workflow(id) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 23
def find_workflow(id)
  @workflows[id]
end
find_workflow_by_name(name) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 27
def find_workflow_by_name(name)
  @workflows.detect do |_id, wflow|
    wflow['name'] == name
  end.last
end
lock_task!(id, lease_time) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 136
def lock_task!(id, lease_time)
  uuid = Qyu::Utils.uuid
  locked = false
  locked_until = nil
  @semaphore.synchronize do
    if @locks[id].nil? || @locks[id][:locked_until] < Time.now
      locked_until = Qyu::Utils.seconds_after_time(lease_time)
      @locks[id] = { locked_by: uuid, locked_until: locked_until }
      locked = true
    end
  end

  return [nil, nil] unless locked

  [uuid, locked_until]
end
persist_job(workflow, payload) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 64
def persist_job(workflow, payload)
  id = Qyu::Utils.uuid
  @jobs[id] = {
    'payload'  => payload,
    'workflow' => workflow
  }
  id
end
persist_workflow(name, descriptor) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 33
def persist_workflow(name, descriptor)
  id = Qyu::Utils.uuid
  @workflows[id] = {
    'id'         => id,
    'name'       => name,
    'descriptor' => descriptor
  }
  id
end
renew_lock_lease(id, lease_time, lease_token) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 169
def renew_lock_lease(id, lease_time, lease_token)
  locked_until = nil
  @semaphore.synchronize do
    if @locks[id][:locked_by] == lease_token && Time.now <= @locks[id][:locked_until]
      locked_until = Qyu::Utils.seconds_after_time(lease_time)
      @locks[id] = { locked_by: lease_token, locked_until: locked_until }
    end
  end

  locked_until
end
select_jobs(limit, offset, order = :asc) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 57
def select_jobs(limit, offset, order = :asc)
  ids = @jobs.keys[offset, limit]
  selected = ids.map { |id| { id: id }.merge(@jobs[id]) }
  return selected if order == :asc
  selected.reverse
end
select_tasks_by_job_id(job_id) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 127
def select_tasks_by_job_id(job_id)
  @tasks.select { |_id, attrs| attrs['job_id'] == job_id }.map { |id, attrs| attrs.merge('id' => id) }
end
task_status_counts(job_id) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 131
def task_status_counts(job_id)
  # TODO
  {}
end
transaction() { || ... } click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 181
def transaction
  # TODO
  yield
end
unlock_task!(id, lease_token) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 153
def unlock_task!(id, lease_token)
  unlocked = false
  @semaphore.synchronize do
    if @locks[id][:locked_by] == lease_token
      @locks.delete(id)
      unlocked = true
    end
  end

  unlocked
end
update_status(id, status) click to toggle source
# File lib/qyu/store/memory/adapter.rb, line 165
def update_status(id, status)
  @tasks[id]['status'] = status
end