class Attr::Gather::Workflow::TaskGraph
@api private
Attributes
tasks_hash[R]
Public Class Methods
new(tasks: [])
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 19 def initialize(tasks: []) @tasks_hash = {} tasks.each { |t| self << t } end
Public Instance Methods
<<(hash)
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 24 def <<(hash) name, depends_on = hash.values_at :name, :depends_on task = build_task(name, depends_on) validate_for_insert!(task) registered_tasks.each do |t| tasks_hash[t] << task if t.depends_on?(task) tasks_hash[t].uniq! end tasks_hash[task] = all_dependencies_for_task(task) end
each_batch() { |batch| ... }
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 43 def each_batch return enum_for(:each_batch) unless block_given? to_execute = tsort until to_execute.empty? batch = to_execute.take_while do |task| task.fullfilled_given_remaining_tasks?(to_execute) end to_execute -= batch validate_finishable!(batch, to_execute) yield batch end end
runnable_tasks()
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 37 def runnable_tasks tsort.take_while do |task| task.fullfilled_given_remaining_tasks?(registered_tasks) end end
to_dot(preview: false)
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 67 def to_dot(preview: false) serializer = DotSerializer.new(self) preview ? serializer.preview : serializer.to_dot end
to_h()
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 63 def to_h tasks_hash end
Private Instance Methods
all_dependencies_for_task(input_task)
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 106 def all_dependencies_for_task(input_task) registered_tasks.select { |task| input_task.depends_on?(task) } end
build_task(name, depends_on)
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 74 def build_task(name, depends_on) deps = depends_on.map do |dep_name| registered_tasks.find do |task| task.name == dep_name end end Task.new(name: name, depends_on: deps) end
depended_on_tasks_exist?(task)
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 114 def depended_on_tasks_exist?(task) task.depends_on.all? { |t| registered_tasks.include?(t) } end
registered_tasks()
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 110 def registered_tasks tasks_hash.keys end
tsort_each_child(node, &blk)
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 84 def tsort_each_child(node, &blk) to_h[node].each(&blk) end
tsort_each_node(&blk)
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 88 def tsort_each_node(&blk) to_h.each_key(&blk) end
validate_finishable!(batch, to_execute)
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 92 def validate_finishable!(batch, to_execute) return unless batch.empty? && !to_execute.empty? # TODO: statically verify this raise UnfinishableError, 'task dependencies are not fulfillable' end
validate_for_insert!(task)
click to toggle source
# File lib/attr/gather/workflow/task_graph.rb, line 99 def validate_for_insert!(task) return if depended_on_tasks_exist?(task) raise InvalidTaskDepedencyError, "could not find a matching task for #{task.name}" end