class Qyu::Store::Redis::Adapter

Qyu::Store::Redis::Adapter

Constants

TYPE

Public Class Methods

new(config) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 16
def initialize(config)
  init_client(config)
end
valid_config?(config) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 11
def valid_config?(config)
  ConfigurationValidator.new(config).valid?
end

Public Instance Methods

clear_completed_jobs() click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 100
def clear_completed_jobs
  # TODO
end
count_jobs() click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 91
def count_jobs
  @client.keys("job:*").count
end
delete_job(id) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 95
def delete_job(id)
  deleted = @client.del("job:#{id}")
  deleted > 0
end
delete_workflow(id) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 45
def delete_workflow(id)
  workflow_key = @client.keys("workflow:*:#{id}").first
  deleted = @client.del(workflow_key)
  deleted > 0
end
delete_workflow_by_name(name) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 51
def delete_workflow_by_name(name)
  workflow_key = @client.keys("workflow:#{name}:*").first
  deleted = @client.del(workflow_key)
  deleted > 0
end
find_job(id) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 69
def find_job(id)
  job = @client.hgetall("job:#{id}")
  return if job.eql?({})
  job['id'] = id
  job['payload'] = parse { job['payload'] }
  job['workflow'] = parse { job['workflow'] }
  job
end
find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) click to toggle source

Task

# File lib/qyu/store/redis/adapter.rb, line 105
def find_or_persist_task(name, queue_name, payload, job_id, parent_task_id)
  task_id = nil
  existent_keys = @client.keys("task:#{name}:#{queue_name}:#{job_id}:#{parent_task_id}:*")

  existent_keys.each do |task_key|
    task = @client.hgetall(task_key)
    task_payload = parse { task['payload'] }

    if compare_payloads(task_payload, payload)
      task_id = task['id']
      break
    end
  end

  unless task_id
    task_id = SecureRandom.uuid
    key = "task:#{name}:#{queue_name}:#{job_id}:#{parent_task_id}:#{task_id}"
    @client.hmset(
      key,
      :id, task_id,
      :name, name,
      :queue_name, queue_name,
      :payload, serialize { payload },
      :job_id, job_id,
      :parent_task_id, parent_task_id
    )
  end

  task_id
end
find_task(id) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 136
def find_task(id)
  task_key = @client.keys("task:*:*:*:*:#{id}").first
  return if task_key.nil?
  task = @client.hgetall(task_key)
  task['payload'] = parse { task['payload'] }
  task
end
find_task_ids_by_job_id_and_name(job_id, name) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 144
def find_task_ids_by_job_id_and_name(job_id, name)
  task_keys = @client.keys("task:#{name}:*:#{job_id}:*:*")
  return if task_keys.empty?
  task_keys.map do |task_key|
    @client.hget(task_key, 'id')
  end.uniq
end
find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 152
def find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids)
  task_keys = parent_task_ids.flat_map do |parent_task_id|
    @client.keys("task:#{name}:*:#{job_id}:#{parent_task_id}:*")
  end
  return if task_keys.empty?
  task_keys.map do |task_key|
    @client.hget(task_key, 'id')
  end.uniq
end
find_workflow(id) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 29
def find_workflow(id)
  workflow_key = @client.keys("workflow:*:#{id}").first
  return if workflow_key.nil?
  workflow = @client.hgetall(workflow_key)
  workflow['descriptor'] = parse { workflow['descriptor'] }
  workflow
end
find_workflow_by_name(name) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 37
def find_workflow_by_name(name)
  workflow_key = @client.keys("workflow:#{name}:*").first
  return if workflow_key.nil?
  workflow = @client.hgetall(workflow_key)
  workflow['descriptor'] = parse { workflow['descriptor'] }
  workflow
end
lock_task!(id, lease_time) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 172
def lock_task!(id, lease_time)
  Qyu.logger.debug '[LOCK] lock_task!'

  task_key = @client.keys("task:*:*:*:*:#{id}").first
  task = find_task(id)
  raise Qyu::Store::Redis::Errors::TaskNotFound unless task

  response = nil
  if task['locked_until'].nil? || DateTime.parse(task['locked_until']) < DateTime.now
    uuid = SecureRandom.uuid
    Qyu.logger.debug "[LOCK] uuid = #{uuid}"

    locked_until = seconds_after_time(lease_time)
    Qyu.logger.debug "[LOCK] locked_until = #{locked_until}"

    response = @client.hmset(task_key, 'locked_by', uuid, 'locked_until', locked_until)
  end

  response.eql?('OK') ? [uuid, locked_until] : [nil, nil]
end
persist_job(workflow, payload) click to toggle source

Job

# File lib/qyu/store/redis/adapter.rb, line 58
def persist_job(workflow, payload)
  id = SecureRandom.uuid
  key = "job:#{id}"
  @client.hmset(key, 
    :id, id, 
    :workflow, serialize { workflow }, 
    :payload, serialize { payload }
  )
  { 'id' => id, 'workflow' => workflow, 'payload' => payload }
end
persist_workflow(name, descriptor) click to toggle source

Workflow

# File lib/qyu/store/redis/adapter.rb, line 21
def persist_workflow(name, descriptor)
  id = SecureRandom.uuid
  validate_workflow!(name)
  key = "workflow:#{name}:#{id}"
  @client.hmset(key, :name, name, :id, id, :descriptor, serialize { descriptor })
  { 'name' => name, 'id' => id, 'descriptor' => descriptor }
end
renew_lock_lease(id, lease_time, lease_token) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 205
def renew_lock_lease(id, lease_time, lease_token)
  Qyu.logger.debug "renew_lock_lease id = #{id}, lease_time = #{lease_time}, lease_token = #{lease_token}"

  task_key = @client.keys("task:*:*:*:*:#{id}").first
  task = find_task(id)
  return nil unless task&.fetch('locked_until')

  if task['locked_by'].eql?(lease_token) && DateTime.parse(task['locked_until']) > DateTime.now
    locked_until = seconds_after_time(lease_time)
    Qyu.logger.debug "[LOCK] locked_until = #{locked_until}"

    @client.hmset(task_key, 'locked_until', locked_until)
    locked_until
  else
    return nil
  end
end
select_jobs(limit, offset, order = nil) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 78
def select_jobs(limit, offset, order = nil)
  job_keys = @client.keys('job:*')
  partial_job_keys = job_keys[offset.to_i, limit.to_i]
  return [] unless partial_job_keys
  
  partial_job_keys.map do |job_key|
    job = @client.hgetall(job_key)
    job['payload'] = parse { job['payload'] }
    job['workflow'] = parse { job['workflow'] }
    job
  end
end
select_tasks_by_job_id(job_id) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 162
def select_tasks_by_job_id(job_id)
  task_keys = @client.keys("task:*:*:#{job_id}:*:*")
  return if task_keys.empty?
  task_keys.map do |task_key|
    task = @client.hgetall(task_key)
    task['payload'] = parse { task['payload'] }
    task
  end
end
transaction() { || ... } click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 233
def transaction
  # TODO
  yield
end
unlock_task!(id, lease_token) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 193
def unlock_task!(id, lease_token)
  task_key = @client.keys("task:*:*:*:*:#{id}").first
  task = find_task(id)
  raise Qyu::Store::Redis::Errors::TaskNotFound unless task

  if task['locked_by'].eql?(lease_token)
    @client.hmset(task_key, 'locked_by', nil, 'locked_until', nil).eql?('OK')
  else
    false
  end
end
update_status(id, status) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 223
def update_status(id, status)
  task_key = @client.keys("task:*:*:*:*:#{id}").first
  raise Qyu::Store::Redis::Errors::TaskNotFound unless task_key
  @client.hmset(task_key, 'status', status).eql?('OK')
end
with_connection() { || ... } click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 229
def with_connection
  yield
end

Private Instance Methods

compare_payloads(payload1, payload2) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 253
def compare_payloads(payload1, payload2)
  symbolize_hash(payload1) == symbolize_hash(payload2)
end
init_client(config) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 261
def init_client(config)
  load_config(config)
  redis_client = ::Redis.new(@@redis_configuration)
  @client = ::Redis::Namespace.new(@@redis_configuration[:namespace], :redis => redis_client)
end
load_config(config) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 267
def load_config(config)
  if config[:url]
    @@redis_configuration = { url: config[:url] }
  else
    @@redis_configuration = {
      host: config[:host],
      port: config[:port],
      password: config[:password],
      db: config.fetch(:db) { 0 }
    }.compact
  end

  @@redis_configuration[:ssl_params] = config[:ssl_params] if config[:ssl_params]
  @@redis_configuration[:timeout] = config[:timeout] if config[:timeout]
  @@redis_configuration[:connect_timeout] = config[:connect_timeout] if config[:connect_timeout]
  @@redis_configuration[:read_timeout] = config[:read_timeout] if config[:read_timeout]
  @@redis_configuration[:write_timeout] = config[:write_timeout] if config[:write_timeout]
  @@redis_configuration[:namespace] = config.fetch(:namespace) { 'qyu' }
  true
end
parse() { || ... } click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 244
def parse
  JSON.parse(yield)
end
seconds_after_time(seconds, start_time = Time.now.utc) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 288
def seconds_after_time(seconds, start_time = Time.now.utc)
  start_time + seconds
end
serialize() { || ... } click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 240
def serialize
  yield.to_json
end
symbolize_hash(hash) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 257
def symbolize_hash(hash)
  hash.map { |key, value| [key.to_sym, value] }.to_h
end
validate_workflow!(name) click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 248
def validate_workflow!(name)
  workflow_keys = @client.keys("workflow:#{name}:*")
  raise Qyu::Store::Redis::Errors::WorkflowNotUnique if workflow_keys.any?
end