class DTK::State::WorkflowInstance

Constants

WORKFLOW_INSTANCE_CRD_VERSION

Attributes

assembly[R]
attribute_type_info[R]
attributes[R]
name[R]
namespace[R]
status[R]
workflow[RW]
workflow_template[R]

Public Class Methods

find_action(id, workflow = @workflow) click to toggle source
# File lib/state/workflow_instance.rb, line 134
def self.find_action(id, workflow = @workflow)
  action = nil
  subtasks = workflow[:subtasks]
  subtasks.each do |subtask|
    if(subtask.id.to_s == id.to_s)
      action = subtask
      break
    elsif subtask[:subtasks]
      action = find_action(id, subtask)
      break unless action.nil?
    end
  end
  action
end
get(namespace, name, opts = {}) click to toggle source
# File lib/state/workflow_instance.rb, line 29
def self.get(namespace, name, opts = {})
  opts[:apiVersion] = WORKFLOW_INSTANCE_CRD_VERSION
  workflow_instance = ::DTK::CrdClient.get_kubeclient(opts).get_workflow_instance(name, namespace)
  WorkflowInstance.new(namespace, name, workflow_instance)
end
get_action_attributes(namespace, name, action_id, opts = {}) click to toggle source
# File lib/state/workflow_instance.rb, line 81
def self.get_action_attributes(namespace, name, action_id, opts = {})
  workflow_instance = get(namespace, name, opts)
  action = WorkflowInstance.find_action(action_id, workflow_instance.workflow)
  return nil unless action
  attributes = action[:attributes] || {}
  attributes.to_h
end
get_attributes(namespace, name, opts = {}) click to toggle source
# File lib/state/workflow_instance.rb, line 76
def self.get_attributes(namespace, name, opts = {})
  workflow_instance = get(namespace, name, opts)
  workflow_instance.attributes.to_h
end
get_with_influx_data(namespace, workflow_instance_name, opts = {}) click to toggle source
# File lib/state/workflow_instance.rb, line 35
def self.get_with_influx_data(namespace, workflow_instance_name, opts = {})
  workflow_instance = get(namespace, workflow_instance_name, opts)
  return unless workflow_instance

  workflow_instance.workflow[:subtasks].each do |subtask|
    component_name, action_name = subtask[:component].split('.')
    assembly_name = workflow_instance.assembly[:name]

    executable_action = ::DTK::State::ExecutableAction.get(namespace, assembly_name, component_name, action_name, opts)
    attr_type_info = executable_action.attribute_type_info

    attr_type_info.each do |attr_info|
      if attr_info.temporal
        attribute_name = attr_info.name
        influxdb = ::DTK::State::Component::Attribute::Influxdb.new(:attributes)
        influxdb_attribute = influxdb.get(namespace, component_name, assembly_name, attribute_name, opts)
        if valid_attribute = influxdb_attribute.first
          value = valid_attribute['_value']
          subtask[:attributes][attribute_name] = value
        end
      end
    end
  end

  workflow_instance
end
new(namespace, name, crd_content) click to toggle source
# File lib/state/workflow_instance.rb, line 12
def initialize(namespace, name, crd_content)
  @name        = name
  @namespace   = namespace

  @api_version    = crd_content.apiVersion
  @kind           = crd_content.kind
  @metadata       = crd_content.metadata

  @references        = crd_content.references
  @assembly          = @references.assembly
  @workflow_template = @references.workflow

  @attributes        = crd_content.spec.attributes || {}
  @status            = crd_content.spec.status || {}
  @workflow          = crd_content.spec.workflow || {}
end
patchError!(patches, message, action_index_steps) click to toggle source
# File lib/state/workflow_instance.rb, line 111
def self.patchError!(patches, message, action_index_steps)
  errorPatch = {
    "op" => "add",
    "path" => "/spec/status/steps/#{action_index_steps}/errorMsg",
    "value" => message
  }
  patches << errorPatch
end
update_action_level_result_attributes(namespace, name, attributes, action_id, opts = {}) click to toggle source
# File lib/state/workflow_instance.rb, line 89
def self.update_action_level_result_attributes(namespace, name, attributes, action_id, opts = {})
  return "Dynamic attributes do not exist for action with id #{@action_id}, nothing to update" if attributes.nil? || attributes.empty?
  attributes.delete_if { |key, value| value.nil? || value.to_s.strip == '' }
  opts[:apiVersion] = WORKFLOW_INSTANCE_CRD_VERSION
  workflow_instance = ::DTK::CrdClient.get_kubeclient(opts).get_workflow_instance(name, namespace)
  workflow = workflow_instance[:spec][:workflow]

  action = WorkflowInstance.find_action(action_id, workflow)
  action[:attributes] = {} if !action[:attributes]
  attributes.each do |attr_name, attr_val|
    action[:attributes][attr_name.to_sym] = {} unless action[:attributes][attr_name.to_sym]
    unless action[:attributes][attr_name.to_sym][:hidden]
      if attr_val.is_a? Hash
        action[:attributes][attr_name.to_sym][:value] = attr_val[:value] || attr_val
      else
        action[:attributes][attr_name.to_sym][:value] = attr_val
      end
    end
  end
  ::DTK::CrdClient.get_kubeclient(opts).update_workflow_instance(workflow_instance)
end
update_action_status(namespace, name, parent_id, action_id, status, error_message = "", opts = {}) click to toggle source
# File lib/state/workflow_instance.rb, line 120
def self.update_action_status(namespace, name, parent_id, action_id, status, error_message = "", opts = {})
  opts[:apiVersion] = WORKFLOW_INSTANCE_CRD_VERSION
  workflow_instance = ::DTK::CrdClient.get_kubeclient(opts).get_workflow_instance(name, namespace)
  steps = workflow_instance[:spec][:status][:steps]
  action_index_steps = steps.find_index { |action| action[:id].eql? action_id }
  patch = [{
    "op" => "replace",
    "path" => "/spec/status/steps/#{action_index_steps}/state",
    "value" => status
  }]
  patchError!(patch, error_message, action_index_steps) unless error_message.empty? || error_message.nil?
  ::DTK::CrdClient.get_kubeclient(opts).json_patch_workflow_instance(name, patch, namespace)
end

Public Instance Methods

attribute_metadata() click to toggle source
# File lib/state/workflow_instance.rb, line 153
def attribute_metadata
  attributes         = @attributes.to_hash
  attr_type_info     = get_workflow_template.attribute_type_info
  attribute_metadata = {}

  attr_type_info.each do |attr_info|
    attr_info_hash = attr_info.to_hash
    attribute_name = attr_info_hash[:name].to_sym

    if attribute = attributes[attribute_name]
      if attribute.is_a?(String)
        attribute = { value: attribute }
      end

      attribute_metadata[attribute_name] = attr_info_hash.merge(attribute)
    end
  end

  attribute_metadata
end
attribute_values() click to toggle source
# File lib/state/workflow_instance.rb, line 174
def attribute_values
  attribute_with_values = {}
  @attributes.each_pair do |name, content|
    attribute_with_values.merge!(name => content[:value])
  end
  attribute_with_values
end
get_workflow_template(opts = {}) click to toggle source
# File lib/state/workflow_instance.rb, line 149
def get_workflow_template(opts = {})
  Workflow.get(@workflow_template.namespace, @workflow_template.name, opts)
end
to_hash() click to toggle source
# File lib/state/workflow_instance.rb, line 62
def to_hash
  {
    apiVersion: @api_version,
    kind: @kind,
    metadata: filter_metadata(@metadata),
    references: @references.to_hash,
    spec: {
      attributes: @attributes.to_hash,
      status: @status.to_hash,
      workflow: @workflow.to_hash
    }
  }
end