class Qyu::Store::Redis::Adapter
Constants
- TYPE
Public Class Methods
new(config)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 16 def initialize(config) init_client(config) end
valid_config?(config)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 11 def valid_config?(config) ConfigurationValidator.new(config).valid? end
Public Instance Methods
clear_completed_jobs()
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 100 def clear_completed_jobs # TODO end
count_jobs()
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 91 def count_jobs @client.keys("job:*").count end
delete_job(id)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 95 def delete_job(id) deleted = @client.del("job:#{id}") deleted > 0 end
delete_workflow(id)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 45 def delete_workflow(id) workflow_key = @client.keys("workflow:*:#{id}").first deleted = @client.del(workflow_key) deleted > 0 end
delete_workflow_by_name(name)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 51 def delete_workflow_by_name(name) workflow_key = @client.keys("workflow:#{name}:*").first deleted = @client.del(workflow_key) deleted > 0 end
find_job(id)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 69 def find_job(id) job = @client.hgetall("job:#{id}") return if job.eql?({}) job['id'] = id job['payload'] = parse { job['payload'] } job['workflow'] = parse { job['workflow'] } job end
find_or_persist_task(name, queue_name, payload, job_id, parent_task_id)
click to toggle source
Task
# File lib/qyu/store/redis/adapter.rb, line 105 def find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) task_id = nil existent_keys = @client.keys("task:#{name}:#{queue_name}:#{job_id}:#{parent_task_id}:*") existent_keys.each do |task_key| task = @client.hgetall(task_key) task_payload = parse { task['payload'] } if compare_payloads(task_payload, payload) task_id = task['id'] break end end unless task_id task_id = SecureRandom.uuid key = "task:#{name}:#{queue_name}:#{job_id}:#{parent_task_id}:#{task_id}" @client.hmset( key, :id, task_id, :name, name, :queue_name, queue_name, :payload, serialize { payload }, :job_id, job_id, :parent_task_id, parent_task_id ) end task_id end
find_task(id)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 136 def find_task(id) task_key = @client.keys("task:*:*:*:*:#{id}").first return if task_key.nil? task = @client.hgetall(task_key) task['payload'] = parse { task['payload'] } task end
find_task_ids_by_job_id_and_name(job_id, name)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 144 def find_task_ids_by_job_id_and_name(job_id, name) task_keys = @client.keys("task:#{name}:*:#{job_id}:*:*") return if task_keys.empty? task_keys.map do |task_key| @client.hget(task_key, 'id') end.uniq end
find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 152 def find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) task_keys = parent_task_ids.flat_map do |parent_task_id| @client.keys("task:#{name}:*:#{job_id}:#{parent_task_id}:*") end return if task_keys.empty? task_keys.map do |task_key| @client.hget(task_key, 'id') end.uniq end
find_workflow(id)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 29 def find_workflow(id) workflow_key = @client.keys("workflow:*:#{id}").first return if workflow_key.nil? workflow = @client.hgetall(workflow_key) workflow['descriptor'] = parse { workflow['descriptor'] } workflow end
find_workflow_by_name(name)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 37 def find_workflow_by_name(name) workflow_key = @client.keys("workflow:#{name}:*").first return if workflow_key.nil? workflow = @client.hgetall(workflow_key) workflow['descriptor'] = parse { workflow['descriptor'] } workflow end
lock_task!(id, lease_time)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 172 def lock_task!(id, lease_time) Qyu.logger.debug '[LOCK] lock_task!' task_key = @client.keys("task:*:*:*:*:#{id}").first task = find_task(id) raise Qyu::Store::Redis::Errors::TaskNotFound unless task response = nil if task['locked_until'].nil? || DateTime.parse(task['locked_until']) < DateTime.now uuid = SecureRandom.uuid Qyu.logger.debug "[LOCK] uuid = #{uuid}" locked_until = seconds_after_time(lease_time) Qyu.logger.debug "[LOCK] locked_until = #{locked_until}" response = @client.hmset(task_key, 'locked_by', uuid, 'locked_until', locked_until) end response.eql?('OK') ? [uuid, locked_until] : [nil, nil] end
persist_job(workflow, payload)
click to toggle source
Job
# File lib/qyu/store/redis/adapter.rb, line 58 def persist_job(workflow, payload) id = SecureRandom.uuid key = "job:#{id}" @client.hmset(key, :id, id, :workflow, serialize { workflow }, :payload, serialize { payload } ) { 'id' => id, 'workflow' => workflow, 'payload' => payload } end
persist_workflow(name, descriptor)
click to toggle source
Workflow
# File lib/qyu/store/redis/adapter.rb, line 21 def persist_workflow(name, descriptor) id = SecureRandom.uuid validate_workflow!(name) key = "workflow:#{name}:#{id}" @client.hmset(key, :name, name, :id, id, :descriptor, serialize { descriptor }) { 'name' => name, 'id' => id, 'descriptor' => descriptor } end
renew_lock_lease(id, lease_time, lease_token)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 205 def renew_lock_lease(id, lease_time, lease_token) Qyu.logger.debug "renew_lock_lease id = #{id}, lease_time = #{lease_time}, lease_token = #{lease_token}" task_key = @client.keys("task:*:*:*:*:#{id}").first task = find_task(id) return nil unless task&.fetch('locked_until') if task['locked_by'].eql?(lease_token) && DateTime.parse(task['locked_until']) > DateTime.now locked_until = seconds_after_time(lease_time) Qyu.logger.debug "[LOCK] locked_until = #{locked_until}" @client.hmset(task_key, 'locked_until', locked_until) locked_until else return nil end end
select_jobs(limit, offset, order = nil)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 78 def select_jobs(limit, offset, order = nil) job_keys = @client.keys('job:*') partial_job_keys = job_keys[offset.to_i, limit.to_i] return [] unless partial_job_keys partial_job_keys.map do |job_key| job = @client.hgetall(job_key) job['payload'] = parse { job['payload'] } job['workflow'] = parse { job['workflow'] } job end end
select_tasks_by_job_id(job_id)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 162 def select_tasks_by_job_id(job_id) task_keys = @client.keys("task:*:*:#{job_id}:*:*") return if task_keys.empty? task_keys.map do |task_key| task = @client.hgetall(task_key) task['payload'] = parse { task['payload'] } task end end
transaction() { || ... }
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 233 def transaction # TODO yield end
unlock_task!(id, lease_token)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 193 def unlock_task!(id, lease_token) task_key = @client.keys("task:*:*:*:*:#{id}").first task = find_task(id) raise Qyu::Store::Redis::Errors::TaskNotFound unless task if task['locked_by'].eql?(lease_token) @client.hmset(task_key, 'locked_by', nil, 'locked_until', nil).eql?('OK') else false end end
update_status(id, status)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 223 def update_status(id, status) task_key = @client.keys("task:*:*:*:*:#{id}").first raise Qyu::Store::Redis::Errors::TaskNotFound unless task_key @client.hmset(task_key, 'status', status).eql?('OK') end
with_connection() { || ... }
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 229 def with_connection yield end
Private Instance Methods
compare_payloads(payload1, payload2)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 253 def compare_payloads(payload1, payload2) symbolize_hash(payload1) == symbolize_hash(payload2) end
init_client(config)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 261 def init_client(config) load_config(config) redis_client = ::Redis.new(@@redis_configuration) @client = ::Redis::Namespace.new(@@redis_configuration[:namespace], :redis => redis_client) end
load_config(config)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 267 def load_config(config) if config[:url] @@redis_configuration = { url: config[:url] } else @@redis_configuration = { host: config[:host], port: config[:port], password: config[:password], db: config.fetch(:db) { 0 } }.compact end @@redis_configuration[:ssl_params] = config[:ssl_params] if config[:ssl_params] @@redis_configuration[:timeout] = config[:timeout] if config[:timeout] @@redis_configuration[:connect_timeout] = config[:connect_timeout] if config[:connect_timeout] @@redis_configuration[:read_timeout] = config[:read_timeout] if config[:read_timeout] @@redis_configuration[:write_timeout] = config[:write_timeout] if config[:write_timeout] @@redis_configuration[:namespace] = config.fetch(:namespace) { 'qyu' } true end
parse() { || ... }
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 244 def parse JSON.parse(yield) end
seconds_after_time(seconds, start_time = Time.now.utc)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 288 def seconds_after_time(seconds, start_time = Time.now.utc) start_time + seconds end
serialize() { || ... }
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 240 def serialize yield.to_json end
symbolize_hash(hash)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 257 def symbolize_hash(hash) hash.map { |key, value| [key.to_sym, value] }.to_h end
validate_workflow!(name)
click to toggle source
# File lib/qyu/store/redis/adapter.rb, line 248 def validate_workflow!(name) workflow_keys = @client.keys("workflow:#{name}:*") raise Qyu::Store::Redis::Errors::WorkflowNotUnique if workflow_keys.any? end