class Gush::Client

Attributes

configuration[R]

Public Class Methods

new(config = Gush.configuration) click to toggle source
# File lib/gush/client.rb, line 7
def initialize(config = Gush.configuration)
  @configuration = config
end

Public Instance Methods

all_workflows() click to toggle source
# File lib/gush/client.rb, line 74
def all_workflows
  connection_pool.with do |redis|
    redis.scan_each(match: "gush.workflows.*").map do |key|
      id = key.sub("gush.workflows.", "")
      find_workflow(id)
    end
  end
end
configure() { |configuration| ... } click to toggle source
# File lib/gush/client.rb, line 11
def configure
  yield configuration
end
create_workflow(name) click to toggle source
# File lib/gush/client.rb, line 15
def create_workflow(name)
  begin
    name.constantize.create
  rescue NameError
    raise WorkflowNotFound.new("Workflow with given name doesn't exist")
  end
  flow
end
destroy_job(workflow_id, job) click to toggle source
# File lib/gush/client.rb, line 141
def destroy_job(workflow_id, job)
  connection_pool.with do |redis|
    redis.del("gush.jobs.#{workflow_id}.#{job.klass}")
  end
end
destroy_workflow(workflow) click to toggle source
# File lib/gush/client.rb, line 134
def destroy_workflow(workflow)
  connection_pool.with do |redis|
    redis.del("gush.workflows.#{workflow.id}")
  end
  workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end
enqueue_job(workflow_id, job) click to toggle source
# File lib/gush/client.rb, line 162
def enqueue_job(workflow_id, job)
  job.enqueue!
  persist_job(workflow_id, job)
  queue = job.queue || configuration.namespace

  Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name])
end
expire_job(workflow_id, job, ttl=nil) click to toggle source
# File lib/gush/client.rb, line 155
def expire_job(workflow_id, job, ttl=nil)
  ttl = ttl || configuration.ttl
  connection_pool.with do |redis|
    redis.expire("gush.jobs.#{workflow_id}.#{job.name}", ttl)
  end
end
expire_workflow(workflow, ttl=nil) click to toggle source
# File lib/gush/client.rb, line 147
def expire_workflow(workflow, ttl=nil)
  ttl = ttl || configuration.ttl
  connection_pool.with do |redis|
    redis.expire("gush.workflows.#{workflow.id}", ttl)
  end
  workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) }
end
find_job(workflow_id, job_name) click to toggle source
# File lib/gush/client.rb, line 119
def find_job(workflow_id, job_name)
  job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_name)

  data = if job_name_match
           find_job_by_klass_and_id(workflow_id, job_name)
         else
           find_job_by_klass(workflow_id, job_name)
         end

  return nil if data.nil?

  data = Gush::JSON.decode(data, symbolize_keys: true)
  Gush::Job.from_hash(data)
end
find_workflow(id) click to toggle source
# File lib/gush/client.rb, line 83
def find_workflow(id)
  connection_pool.with do |redis|
    data = redis.get("gush.workflows.#{id}")

    unless data.nil?
      hash = Gush::JSON.decode(data, symbolize_keys: true)
      keys = redis.scan_each(match: "gush.jobs.#{id}.*")

      nodes = keys.each_with_object([]) do |key, array|
        array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
      end

      workflow_from_hash(hash, nodes)
    else
      raise WorkflowNotFound.new("Workflow with given id doesn't exist")
    end
  end
end
next_free_job_id(workflow_id, job_klass) click to toggle source
# File lib/gush/client.rb, line 45
def next_free_job_id(workflow_id, job_klass)
  job_id = nil

  loop do
    job_id = SecureRandom.uuid
    available = connection_pool.with do |redis|
      !redis.hexists("gush.jobs.#{workflow_id}.#{job_klass}", job_id)
    end

    break if available
  end

  job_id
end
next_free_workflow_id() click to toggle source
# File lib/gush/client.rb, line 60
def next_free_workflow_id
  id = nil
  loop do
    id = SecureRandom.uuid
    available = connection_pool.with do |redis|
      !redis.exists("gush.workflow.#{id}")
    end

    break if available
  end

  id
end
persist_job(workflow_id, job) click to toggle source
# File lib/gush/client.rb, line 113
def persist_job(workflow_id, job)
  connection_pool.with do |redis|
    redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json)
  end
end
persist_workflow(workflow) click to toggle source
# File lib/gush/client.rb, line 102
def persist_workflow(workflow)
  connection_pool.with do |redis|
    redis.set("gush.workflows.#{workflow.id}", workflow.to_json)
  end

  workflow.jobs.each {|job| persist_job(workflow.id, job) }
  workflow.mark_as_persisted

  true
end
start_workflow(workflow, job_names = []) click to toggle source
# File lib/gush/client.rb, line 24
def start_workflow(workflow, job_names = [])
  workflow.mark_as_started
  persist_workflow(workflow)

  jobs = if job_names.empty?
           workflow.initial_jobs
         else
           job_names.map {|name| workflow.find_job(name) }
         end

  jobs.each do |job|
    enqueue_job(workflow.id, job)
  end
end
stop_workflow(id) click to toggle source
# File lib/gush/client.rb, line 39
def stop_workflow(id)
  workflow = find_workflow(id)
  workflow.mark_as_stopped
  persist_workflow(workflow)
end

Private Instance Methods

build_redis() click to toggle source
# File lib/gush/client.rb, line 205
def build_redis
  Redis.new(url: configuration.redis_url).tap do |instance|
    RedisClassy.redis = instance
  end
end
connection_pool() click to toggle source
# File lib/gush/client.rb, line 211
def connection_pool
  @connection_pool ||= ConnectionPool.new(size: configuration.concurrency, timeout: 1) { build_redis }
end
find_job_by_klass(workflow_id, job_name) click to toggle source
# File lib/gush/client.rb, line 180
def find_job_by_klass(workflow_id, job_name)
  new_cursor, result = connection_pool.with do |redis|
    redis.hscan("gush.jobs.#{workflow_id}.#{job_name}", 0, count: 1)
  end

  return nil if result.empty?

  job_id, job = *result[0]

  job
end
find_job_by_klass_and_id(workflow_id, job_name) click to toggle source
# File lib/gush/client.rb, line 172
def find_job_by_klass_and_id(workflow_id, job_name)
  job_klass, job_id = job_name.split('|')

  connection_pool.with do |redis|
    redis.hget("gush.jobs.#{workflow_id}.#{job_klass}", job_id)
  end
end
workflow_from_hash(hash, nodes = []) click to toggle source
# File lib/gush/client.rb, line 192
def workflow_from_hash(hash, nodes = [])
  flow = hash[:klass].constantize.new(*hash[:arguments])
  flow.jobs = []
  flow.stopped = hash.fetch(:stopped, false)
  flow.id = hash[:id]

  flow.jobs = nodes.map do |node|
    Gush::Job.from_hash(node)
  end

  flow
end