class Taskflow::Flow
Public Class Methods
can_launch?(klass,opts={})
click to toggle source
opts support :params
# File lib/taskflow/flow.rb, line 16 def can_launch?(klass,opts={}) opts = HashWithIndifferentAccess.new opts !Taskflow::Flow.where.not(state: 'stopped').where(klass: klass,input: opts[:params]).exists? end
launch(klass,opts={})
click to toggle source
# File lib/taskflow/flow.rb, line 21 def launch(klass,opts={}) opts = HashWithIndifferentAccess.new opts flow_klass = Kernel.const_get klass name = flow_klass.const_get 'NAME' opts[:launched_by] ||= 'task-flow-engine' flow = flow_klass.create name: name,input: opts[:params],launched_by: opts[:launched_by] if opts[:next_workflow_config] flow.update next_config: opts[:next_workflow_config] end flow.create_tflogger name: name,description: opts[:workflow_description] flow.schedule end
Public Instance Methods
resume()
click to toggle source
# File lib/taskflow/flow.rb, line 77 def resume self.tasks.where(state: 'paused',result: 'error').each do |task| task.resume end end
run(klass,opts={})
click to toggle source
opts support :name,:params
# File lib/taskflow/flow.rb, line 40 def run(klass,opts={}) obj = { klass: klass.to_s, name: opts[:name] || klass.to_s, input: opts[:params], index: self.tasks.size + 1 } task = klass.create obj.select{|k,v| v } if opts[:before] task.downstream << opts[:before] if opts[:before].is_a? Array opts[:before].each{|b| b.upstream << task} else opts[:before].upstream << task end end if opts[:after] task.upstream << opts[:after] if opts[:after].is_a? Array opts[:after].each{|d| d.downstream << task } else opts[:after].downstream << task end end if opts[:before].nil? && opts[:after].nil? && self.tasks.last self.tasks.last.downstream << task task.upstream << self.tasks.last end self.tasks << task task end
running_steps()
click to toggle source
# File lib/taskflow/flow.rb, line 35 def running_steps self.tasks.where(state: ['running','paused']) end
schedule()
click to toggle source
# File lib/taskflow/flow.rb, line 83 def schedule return if self.halt_by || self.state == 'stopped' self.update_attributes! state: 'running',started_at: Time.now if self.state == 'pending' task_list = [] self.reload.tasks.where(state: 'pending').each do |task| # 上游全部完成 if task.upstream.empty? || task.upstream.all?{|t| %w(skipped stopped).include? t.state } task_list << task.id.to_s end end task_list.each{|tid| Taskflow::Worker.perform_async self.id.to_s,tid } self end
stop!(user_id=nil)
click to toggle source
# File lib/taskflow/flow.rb, line 72 def stop!(user_id=nil) percent = self.tasks.map(&:progress).sum / self.tasks.size self.update_attributes! progress: percent,halt_by: user_id,ended_at: Time.now, state: 'stopped',result: 'warning' end
Private Instance Methods
configure_tasks()
click to toggle source
# File lib/taskflow/flow.rb, line 98 def configure_tasks begin configure sort_index 1,[] rescue=>exception self.destroy raise exception end reload end
set_default_property()
click to toggle source
# File lib/taskflow/flow.rb, line 119 def set_default_property self.klass ||= self.class.to_s self.state ||= 'pending' self.category ||= 'simple' self.input ||= {} self.progress ||= 0 end
sort_index(i,scanned)
click to toggle source
# File lib/taskflow/flow.rb, line 109 def sort_index(i,scanned) queue = self.tasks.where.not(id: scanned).select{|t| t.upstream.empty? || t.upstream.all?{|upt| scanned.include?(upt.id.to_s)}} return if queue.empty? queue.each do |task| task.update_attributes index: i scanned << task.id.to_s end sort_index i + 1,scanned end