class Taskflow::Flow

Public Class Methods

can_launch?(klass,opts={}) click to toggle source

opts support :params

# File lib/taskflow/flow.rb, line 16
def can_launch?(klass,opts={})
    opts = HashWithIndifferentAccess.new opts
    !Taskflow::Flow.where.not(state: 'stopped').where(klass: klass,input: opts[:params]).exists?
end
launch(klass,opts={}) click to toggle source
# File lib/taskflow/flow.rb, line 21
def launch(klass,opts={})
    opts = HashWithIndifferentAccess.new opts
    flow_klass = Kernel.const_get klass
    name = flow_klass.const_get 'NAME'
    opts[:launched_by] ||= 'task-flow-engine'
    flow = flow_klass.create name: name,input: opts[:params],launched_by: opts[:launched_by]
    if opts[:next_workflow_config]
        flow.update next_config: opts[:next_workflow_config]
    end
    flow.create_tflogger name: name,description: opts[:workflow_description]
    flow.schedule
end

Public Instance Methods

resume() click to toggle source
# File lib/taskflow/flow.rb, line 77
def resume
    self.tasks.where(state: 'paused',result: 'error').each do |task|
        task.resume
    end
end
run(klass,opts={}) click to toggle source

opts support :name,:params

# File lib/taskflow/flow.rb, line 40
def run(klass,opts={})
    obj = {
        klass: klass.to_s,
        name: opts[:name] || klass.to_s,
        input: opts[:params],
        index: self.tasks.size + 1
    }
    task = klass.create obj.select{|k,v| v }
    if opts[:before]
        task.downstream << opts[:before]
        if opts[:before].is_a? Array
            opts[:before].each{|b| b.upstream << task}
        else
            opts[:before].upstream << task
        end
    end
    if opts[:after]
        task.upstream << opts[:after]
        if opts[:after].is_a? Array
            opts[:after].each{|d| d.downstream << task }
        else
            opts[:after].downstream << task
        end
    end
    if opts[:before].nil? && opts[:after].nil? && self.tasks.last
        self.tasks.last.downstream << task
        task.upstream << self.tasks.last
    end
    self.tasks << task
    task
end
running_steps() click to toggle source
# File lib/taskflow/flow.rb, line 35
def running_steps
    self.tasks.where(state: ['running','paused'])
end
schedule() click to toggle source
# File lib/taskflow/flow.rb, line 83
def schedule
    return if self.halt_by || self.state == 'stopped'
    self.update_attributes! state: 'running',started_at: Time.now if self.state == 'pending'
    task_list = []
    self.reload.tasks.where(state: 'pending').each do |task|
        # 上游全部完成
        if task.upstream.empty? || task.upstream.all?{|t| %w(skipped stopped).include? t.state }
            task_list << task.id.to_s
        end
    end
    task_list.each{|tid| Taskflow::Worker.perform_async self.id.to_s,tid }
    self
end
stop!(user_id=nil) click to toggle source
# File lib/taskflow/flow.rb, line 72
def stop!(user_id=nil)
    percent = self.tasks.map(&:progress).sum / self.tasks.size
    self.update_attributes! progress: percent,halt_by: user_id,ended_at: Time.now, state: 'stopped',result: 'warning'
end

Private Instance Methods

configure_tasks() click to toggle source
# File lib/taskflow/flow.rb, line 98
def configure_tasks
    begin
        configure
        sort_index  1,[]
    rescue=>exception
        self.destroy
        raise exception
    end
    reload
end
set_default_property() click to toggle source
# File lib/taskflow/flow.rb, line 119
def set_default_property
    self.klass ||= self.class.to_s
    self.state ||= 'pending'
    self.category ||= 'simple'
    self.input ||= {}
    self.progress ||= 0
end
sort_index(i,scanned) click to toggle source
# File lib/taskflow/flow.rb, line 109
def sort_index(i,scanned)
    queue = self.tasks.where.not(id: scanned).select{|t| t.upstream.empty? || t.upstream.all?{|upt| scanned.include?(upt.id.to_s)}}
    return if queue.empty?
    queue.each do |task|
        task.update_attributes index: i
        scanned << task.id.to_s
    end
    sort_index i + 1,scanned
end