class Gush::Client

Attributes

configuration[R]
sidekiq[R]

Public Class Methods

new(config = Gush.configuration) click to toggle source
# File lib/gush/client.rb, line 5
def initialize(config = Gush.configuration)
  @configuration = config
  @sidekiq = build_sidekiq
end

Public Instance Methods

all_workflows() click to toggle source
# File lib/gush/client.rb, line 76
def all_workflows
  connection_pool.with do |redis|
    #redis.keys("gush.workflows.*").map do |key|
    redis.keys(build_redis_key("gush.workflows.*")).map do |key|
      #id = key.sub("gush.workflows.", "")
      id = key.sub(build_redis_key("gush.workflows."), "")
      find_workflow(id)
    end
  end
end
configure() { |configuration| ... } click to toggle source
# File lib/gush/client.rb, line 10
def configure
  yield configuration
  @sidekiq = build_sidekiq
end
create_workflow(name) click to toggle source
# File lib/gush/client.rb, line 15
def create_workflow(name)
  begin
    name.constantize.create
  rescue NameError
    raise WorkflowNotFound.new("Workflow with given name doesn't exist")
  end
  flow
end
destroy_job(workflow_id, job) click to toggle source
# File lib/gush/client.rb, line 151
def destroy_job(workflow_id, job)
  connection_pool.with do |redis|
    redis.del(build_redis_key("gush.jobs.#{workflow_id}.#{job.name}"))
  end
end
destroy_workflow(workflow) click to toggle source
# File lib/gush/client.rb, line 144
def destroy_workflow(workflow)
  connection_pool.with do |redis|
    redis.del(build_redis_key("gush.workflows.#{workflow.id}"))
  end
  workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end
enqueue_job(workflow_id, job) click to toggle source
# File lib/gush/client.rb, line 165
def enqueue_job(workflow_id, job)
  job.enqueue!
  persist_job(workflow_id, job)

  sidekiq.push(
    'class' => Gush::Worker,
    #'queue' => configuration.namespace,
    'queue' => configuration.sidekiq_queue,
    'args'  => [workflow_id, job.name]
  )
end
find_workflow(id) click to toggle source
# File lib/gush/client.rb, line 87
def find_workflow(id)
  connection_pool.with do |redis|
    #data = redis.get("gush.workflows.#{id}")
    data = redis.get(build_redis_key("gush.workflows.#{id}"))

    unless data.nil?
      hash = Gush::JSON.decode(data, symbolize_keys: true)
      #keys = redis.keys("gush.jobs.#{id}.*")
      keys = redis.keys(build_redis_key("gush.jobs.#{id}.*"))
      nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
      workflow_from_hash(hash, nodes)
    else
      raise WorkflowNotFound.new("Workflow with given id doesn't exist")
    end
  end
end
load_job(workflow_id, job_id) click to toggle source
# File lib/gush/client.rb, line 122
def load_job(workflow_id, job_id)
  workflow = find_workflow(workflow_id)
  job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_id)
  hypen = '-' if job_name_match.nil?

  keys = connection_pool.with do |redis|
    #redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*")
    redis.keys(build_redis_key("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*"))
  end

  return nil if keys.nil?

  data = connection_pool.with do |redis|
    redis.get(keys.first)
  end

  return nil if data.nil?

  data = Gush::JSON.decode(data, symbolize_keys: true)
  Gush::Job.from_hash(workflow, data)
end
next_free_job_id(workflow_id,job_klass) click to toggle source
# File lib/gush/client.rb, line 45
def next_free_job_id(workflow_id,job_klass)
  job_identifier = nil
  loop do
    id = SecureRandom.uuid
    job_identifier = "#{job_klass}-#{id}"
    available = connection_pool.with do |redis|
      !redis.exists(build_redis_key("gush.jobs.#{workflow_id}.#{job_identifier}"))
      #!redis.exists("gush.jobs.#{workflow_id}.#{job_identifier}")
    end

    break if available
  end

  job_identifier
end
next_free_workflow_id() click to toggle source
# File lib/gush/client.rb, line 61
def next_free_workflow_id
  id = nil
  loop do
    id = SecureRandom.uuid
    available = connection_pool.with do |redis|
      !redis.exists(build_redis_key("gush.workflow.#{id}"))
      #!redis.exists("gush.workflow.#{id}")
    end

    break if available
  end

  id
end
persist_job(workflow_id, job) click to toggle source
# File lib/gush/client.rb, line 115
def persist_job(workflow_id, job)
  connection_pool.with do |redis|
    #redis.set("gush.jobs.#{workflow_id}.#{job.name}", job.to_json)
    redis.set(build_redis_key("gush.jobs.#{workflow_id}.#{job.name}"), job.to_json)
  end
end
persist_workflow(workflow) click to toggle source
# File lib/gush/client.rb, line 104
def persist_workflow(workflow)
  connection_pool.with do |redis|
    #redis.set("gush.workflows.#{workflow.id}", workflow.to_json)
    redis.set(build_redis_key("gush.workflows.#{workflow.id}"), workflow.to_json)
  end

  workflow.jobs.each {|job| persist_job(workflow.id, job) }
  workflow.mark_as_persisted
  true
end
start_workflow(workflow, job_names = []) click to toggle source
# File lib/gush/client.rb, line 24
def start_workflow(workflow, job_names = [])
  workflow.mark_as_started
  persist_workflow(workflow)

  jobs = if job_names.empty?
           workflow.initial_jobs
         else
           job_names.map {|name| workflow.find_job(name) }
         end

  jobs.each do |job|
    enqueue_job(workflow.id, job)
  end
end
stop_workflow(id) click to toggle source
# File lib/gush/client.rb, line 39
def stop_workflow(id)
  workflow = find_workflow(id)
  workflow.mark_as_stopped
  persist_workflow(workflow)
end
worker_report(message) click to toggle source
# File lib/gush/client.rb, line 157
def worker_report(message)
  report("gush.workers.status", message)
end
workflow_report(message) click to toggle source
# File lib/gush/client.rb, line 161
def workflow_report(message)
  report("gush.workflows.status", message)
end

Private Instance Methods

build_redis() click to toggle source
# File lib/gush/client.rb, line 216
def build_redis
  #puts "======== build redis"
  #exit 1
  opts = {url: configuration.redis_url, namespace: configuration.redis_prefix}
  #puts "redis opts == #{opts}\n\n"

  Redis.new(url: configuration.redis_url, namespace: configuration.redis_prefix)
end
build_redis_key(key) click to toggle source
# File lib/gush/client.rb, line 198
def build_redis_key(key)
  configuration.redis_prefix+":"+key
end
build_sidekiq() click to toggle source
# File lib/gush/client.rb, line 202
def build_sidekiq
  #puts "* build sidekiq"

  Sidekiq.configure_client do |config|
    config.redis = { url: configuration.redis_url, namespace: configuration.redis_prefix, queue: configuration.sidekiq_queue }
  end

  Sidekiq::Client.new

  # TODO: use connection pool
  #Sidekiq::Client.new(connection_pool)

end
connection_pool() click to toggle source
# File lib/gush/client.rb, line 227
def connection_pool
  #puts "CONN pool ------------: #{@connection_pool}"
  if !@connection_pool.nil?
    #puts "-------- pool has smth"
  else
    #puts "-------- pool is NIL"

  end

  @connection_pool ||= ConnectionPool.new(size: configuration.concurrency, timeout: 10) { build_redis }
end
report(key, message) click to toggle source
# File lib/gush/client.rb, line 192
def report(key, message)
  connection_pool.with do |redis|
    redis.publish(build_redis_key(key), Gush::JSON.encode(message))
  end
end
workflow_from_hash(hash, nodes = nil) click to toggle source
# File lib/gush/client.rb, line 179
def workflow_from_hash(hash, nodes = nil)
  flow = hash[:klass].constantize.new *hash[:arguments]
  flow.jobs = []
  flow.stopped = hash.fetch(:stopped, false)
  flow.id = hash[:id]

  (nodes || hash[:nodes]).each do |node|
    flow.jobs << Gush::Job.from_hash(flow, node)
  end

  flow
end