class Gush::Client
Attributes
configuration[R]
Public Class Methods
new(config = Gush.configuration)
click to toggle source
# File lib/gush/client.rb, line 7 def initialize(config = Gush.configuration) @configuration = config end
Public Instance Methods
all_workflows()
click to toggle source
# File lib/gush/client.rb, line 74 def all_workflows connection_pool.with do |redis| redis.scan_each(match: "gush.workflows.*").map do |key| id = key.sub("gush.workflows.", "") find_workflow(id) end end end
configure() { |configuration| ... }
click to toggle source
# File lib/gush/client.rb, line 11 def configure yield configuration end
create_workflow(name)
click to toggle source
# File lib/gush/client.rb, line 15 def create_workflow(name) begin name.constantize.create rescue NameError raise WorkflowNotFound.new("Workflow with given name doesn't exist") end flow end
destroy_job(workflow_id, job)
click to toggle source
# File lib/gush/client.rb, line 141 def destroy_job(workflow_id, job) connection_pool.with do |redis| redis.del("gush.jobs.#{workflow_id}.#{job.klass}") end end
destroy_workflow(workflow)
click to toggle source
# File lib/gush/client.rb, line 134 def destroy_workflow(workflow) connection_pool.with do |redis| redis.del("gush.workflows.#{workflow.id}") end workflow.jobs.each {|job| destroy_job(workflow.id, job) } end
enqueue_job(workflow_id, job)
click to toggle source
# File lib/gush/client.rb, line 162 def enqueue_job(workflow_id, job) job.enqueue! persist_job(workflow_id, job) queue = job.queue || configuration.namespace Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name]) end
expire_job(workflow_id, job, ttl=nil)
click to toggle source
# File lib/gush/client.rb, line 155 def expire_job(workflow_id, job, ttl=nil) ttl = ttl || configuration.ttl connection_pool.with do |redis| redis.expire("gush.jobs.#{workflow_id}.#{job.name}", ttl) end end
expire_workflow(workflow, ttl=nil)
click to toggle source
# File lib/gush/client.rb, line 147 def expire_workflow(workflow, ttl=nil) ttl = ttl || configuration.ttl connection_pool.with do |redis| redis.expire("gush.workflows.#{workflow.id}", ttl) end workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) } end
find_job(workflow_id, job_name)
click to toggle source
# File lib/gush/client.rb, line 119 def find_job(workflow_id, job_name) job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_name) data = if job_name_match find_job_by_klass_and_id(workflow_id, job_name) else find_job_by_klass(workflow_id, job_name) end return nil if data.nil? data = Gush::JSON.decode(data, symbolize_keys: true) Gush::Job.from_hash(data) end
find_workflow(id)
click to toggle source
# File lib/gush/client.rb, line 83 def find_workflow(id) connection_pool.with do |redis| data = redis.get("gush.workflows.#{id}") unless data.nil? hash = Gush::JSON.decode(data, symbolize_keys: true) keys = redis.scan_each(match: "gush.jobs.#{id}.*") nodes = keys.each_with_object([]) do |key, array| array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } end workflow_from_hash(hash, nodes) else raise WorkflowNotFound.new("Workflow with given id doesn't exist") end end end
next_free_job_id(workflow_id, job_klass)
click to toggle source
# File lib/gush/client.rb, line 45 def next_free_job_id(workflow_id, job_klass) job_id = nil loop do job_id = SecureRandom.uuid available = connection_pool.with do |redis| !redis.hexists("gush.jobs.#{workflow_id}.#{job_klass}", job_id) end break if available end job_id end
next_free_workflow_id()
click to toggle source
# File lib/gush/client.rb, line 60 def next_free_workflow_id id = nil loop do id = SecureRandom.uuid available = connection_pool.with do |redis| !redis.exists("gush.workflow.#{id}") end break if available end id end
persist_job(workflow_id, job)
click to toggle source
# File lib/gush/client.rb, line 113 def persist_job(workflow_id, job) connection_pool.with do |redis| redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json) end end
persist_workflow(workflow)
click to toggle source
# File lib/gush/client.rb, line 102 def persist_workflow(workflow) connection_pool.with do |redis| redis.set("gush.workflows.#{workflow.id}", workflow.to_json) end workflow.jobs.each {|job| persist_job(workflow.id, job) } workflow.mark_as_persisted true end
start_workflow(workflow, job_names = [])
click to toggle source
# File lib/gush/client.rb, line 24 def start_workflow(workflow, job_names = []) workflow.mark_as_started persist_workflow(workflow) jobs = if job_names.empty? workflow.initial_jobs else job_names.map {|name| workflow.find_job(name) } end jobs.each do |job| enqueue_job(workflow.id, job) end end
stop_workflow(id)
click to toggle source
# File lib/gush/client.rb, line 39 def stop_workflow(id) workflow = find_workflow(id) workflow.mark_as_stopped persist_workflow(workflow) end
Private Instance Methods
build_redis()
click to toggle source
# File lib/gush/client.rb, line 205 def build_redis Redis.new(url: configuration.redis_url).tap do |instance| RedisClassy.redis = instance end end
connection_pool()
click to toggle source
# File lib/gush/client.rb, line 211 def connection_pool @connection_pool ||= ConnectionPool.new(size: configuration.concurrency, timeout: 1) { build_redis } end
find_job_by_klass(workflow_id, job_name)
click to toggle source
# File lib/gush/client.rb, line 180 def find_job_by_klass(workflow_id, job_name) new_cursor, result = connection_pool.with do |redis| redis.hscan("gush.jobs.#{workflow_id}.#{job_name}", 0, count: 1) end return nil if result.empty? job_id, job = *result[0] job end
find_job_by_klass_and_id(workflow_id, job_name)
click to toggle source
# File lib/gush/client.rb, line 172 def find_job_by_klass_and_id(workflow_id, job_name) job_klass, job_id = job_name.split('|') connection_pool.with do |redis| redis.hget("gush.jobs.#{workflow_id}.#{job_klass}", job_id) end end
workflow_from_hash(hash, nodes = [])
click to toggle source
# File lib/gush/client.rb, line 192 def workflow_from_hash(hash, nodes = []) flow = hash[:klass].constantize.new(*hash[:arguments]) flow.jobs = [] flow.stopped = hash.fetch(:stopped, false) flow.id = hash[:id] flow.jobs = nodes.map do |node| Gush::Job.from_hash(node) end flow end