class DTK::State::Component::Attribute::Influxdb::SemanticType

Attributes

client[R]
content_to_write[RW]
crd_content[R]
expanded_semantictype_spec[R]
name[R]
namespace[R]

Public Class Methods

new(name, namespace) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 9
def initialize(name, namespace)
  @name        = name
  @namespace   = namespace
  @client = Client.new
  @crd_content = get(name, namespace)
  @content_to_write = []
  @expanded_spec = ::DTK::CrdClient.get_kubeclient({}).get_semantictype(name, namespace).expandedSpec
  @expanded_semantictype_spec = expand(crd_content.to_h[:properties], @expanded_spec)
end

Public Instance Methods

partial_write_update(component_and_attribute, path, field_name, field_value) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 34
def partial_write_update(component_and_attribute, path, field_name, field_value)
  parent, child = validate_parameter(path)
  component_id, attribute_name = component_and_attribute.split('/')
  # getting previous value for given parameters
  previous_value = {}
  begin
    flux_query = 'from(bucket:"' + @client.connection_parameters[:bucket] + '") |> range(start:-5) |> filter(fn:(r) => r._measurement == "' + attribute_name + "_" + child[:type] + '") |> filter(fn: (r) => r.parent == "' + parent[:name] + '") |> filter(fn: (r) => r.name == "' + child[:name] + '")|> last()'
    result = @client.query(query: flux_query)
    previous_value = result.values.map(&:records).flatten.map(&:values)
  rescue => e
    raise "Partial write could not be completed. Previous point for given parameters not found!. Error: #{e}"
  end
  update_current(previous_value[0], get_path_to_object(path), field_name, field_value)
end
write_semantictype_inventory(inventory, component_id) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 19
def write_semantictype_inventory(inventory, component_id)
  get_influxdb_properties(inventory)
  content_to_write.each do |point|
    point[:tags].merge!({ component_id: component_id, attribute_name: @name })
    @client.write_point({
                          name: point[:name],
                          tags: point[:tags],
                          fields: point[:fields],
                          time: (Time.new.to_f * 1000).to_i
                        })
  end
rescue => e
  raise "#{name} inventory write failed. Error: #{e}"
end

Private Instance Methods

expand(crd_content, expanded_spec) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 51
def expand(crd_content, expanded_spec)
  if expanded_spec.nil?
    expanded_spec = {}
    populate_semantictypes(crd_content, expanded_spec)
    ::DTK::CrdClient.get_kubeclient({}).merge_patch_semantictype(name, { expandedSpec: expanded_spec }, namespace)
  end
  expanded_spec
end
get(name, namespace, opts = {}) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 60
def get(name, namespace, opts = {})
  semantictype = ::DTK::CrdClient.get_kubeclient(opts).get_semantictype(name, namespace)
  semantictype.spec[:openAPIV3Schema]
rescue => e
  raise "SemanticType attribute with name '#{name}' not found  on the cluster!. Error: #{e.inspect}"
end
get_influxdb_properties(inventory, parent_type = [:top], parent_name = nil) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 107
def get_influxdb_properties(inventory, parent_type = [:top], parent_name = nil)
  content_to_write = []
  properties = { }
  inventory.each_pair do |key, value|
    if value.class.to_s == 'Array'
      inventory[key].each do |element|
        get_influxdb_properties(element, parent_type.push(key), inventory[:name])
      end
    else
      properties[key] = value
    end
  end
  resolve_property(parent_type, parent_name, properties)
  parent_type.pop
  "Attribute successfully validated!"
end
get_partial_definition(path) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 192
def get_partial_definition(path)
  i = 0
  definition = {}
  semantictype_crd = crd_content.to_h[:properties]
  expanded_spec = expanded_semantictype_spec.to_h
  while i < path.length
    if path[i].to_sym == :top
      semantictype_crd.each_pair do |key, value|
        if key == :required
          definition[key] = value
        else
          basic, semantictype = value[:type].split(':')
          semantictype.nil? ? definition[key] = value : definition[key] = semantictype_crd[key] if basic != 'array'
        end
      end
    else
      definition = {}
      expanded_spec[path[i].to_sym].each_pair do |key, value|
        definition[key] = value unless value[:type].nil?
      end
      expanded_spec = expanded_spec[path[i].to_sym]
    end
    i+=1
  end
  definition
end
get_path_to_object(parameter) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 219
def get_path_to_object(parameter)
  path = ['top']
  array = parameter.split('/')
  array.each do |element|
    path.push(element.split(':')[1])
  end
  path
end
get_tags_and_fields(partial_definition, properties) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 140
def get_tags_and_fields(partial_definition, properties)
  tags = { }
  fields = { }
  properties.each_pair do |key, value|
    if partial_definition[key].nil?
      raise "Property '#{key}' not found in the definition of attribute"
    else
      if partial_definition[key][:metric].nil? || partial_definition[key][:metric] == false
        tags[key] = value
      else
        fields[key] = value
      end
    end
  end
  {
    tags: tags,
    fields: fields
  }
end
populate_semantictypes(crd, expanded_spec, parent = nil) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 67
def populate_semantictypes(crd, expanded_spec, parent = nil)
  crd.each_pair do |key, value|
    basic, semantictype = value[:type].split(':')
    if semantictype
      expanded_spec[key] = get(basic.to_s, namespace).to_h[:properties]
      crd[key] = get(basic.to_s, namespace).to_h[:properties]
      temporary_expanded_spec = populate_semantictypes(crd[key], expanded_spec[key], basic)
    end
    if basic == 'array'
      basic, semantictype = value[:items][:type].split(':')
      if semantictype
        expanded_spec[key] = get(basic.to_s, namespace).to_h[:properties]
        crd[key] = get(basic.to_s, namespace).to_h[:properties]
        temporary_expanded_spec = populate_semantictypes(crd[key], expanded_spec[key], basic)
      end
    end
    temporary_expanded_spec ||= {}
    unless temporary_expanded_spec.empty? || parent.nil?
      ::DTK::CrdClient.get_kubeclient({}).merge_patch_semantictype(parent, { expandedSpec: { key => temporary_expanded_spec } }, namespace)
    end
  end
  expanded_spec
end
resolve_property(parent_type, parent_name, properties) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 124
def resolve_property(parent_type, parent_name, properties)
  definition = get_partial_definition(parent_type)
  request = get_tags_and_fields(definition, properties)
  validate_request(definition, request)
  request[:name] = name + "_" + parent_type.last.to_s
  request[:tags][:parent] = parent_name unless parent_name.nil?
  content_to_write.push(request)
end
update_current(previous_value, path, field_name, field_value) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 91
def update_current(previous_value, path, field_name, field_value)
  tags = {}
  previous_value.each_pair do |key, value|
    tags[key] = value if key[0..0] != '_' && key != 'result' && key != 'table'
  end
  fields = Hash.new
  fields[field_name.to_sym] = field_value
  validate_fields(get_partial_definition(path), fields)
  @client.write_point({
                        name: previous_value['_measurement'],
                        tags: tags,
                        fields: fields,
                        time: (Time.new.to_f * 1000).to_i
                      })
end
validate_fields(partial_definition, fields) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 160
def validate_fields(partial_definition, fields)
  partial_definition.each_pair do |key, value|
    next if key == :required || value[:metric] == (false || nil)
    if fields[key].nil?
      raise "Field #{key} is missing. Validation of request failed!"
    elsif value[:type].capitalize != fields[key].class.to_s
      raise "Defined type for SemanticType attribute property '#{key}' is #{value[:type].capitalize}, #{fields[key].class} provided"
    end
  end
end
validate_parameter(parameter) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 228
def validate_parameter(parameter)
  array_of_parameters = []
  parameter.split('/').each_with_index do |param, index|
    name, type = param.split(':')
    raise unless name && type

    array_of_parameters.push({
                                name: name,
                                type: type
                             })
  end
  array_of_parameters
rescue => e
  raise "Could not resolve parameter '#{parameter}'. It should be in format: 'parent:type/child:type'"
end
validate_request(partial_definition, request) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 133
def validate_request(partial_definition, request)
  validate_tags(partial_definition, request[:tags])
  validate_fields(partial_definition, request[:fields])
rescue => e
  raise e
end
validate_tags(partial_definition, tags) click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 171
def validate_tags(partial_definition, tags)
  partial_definition.each_pair do |key, value|
    next if key == :required || value[:metric] == true
    if tags[key].nil?
      if value[:default].nil?
        raise "Property #{key} is missing. Validation of request failed!"
      else
        tags[key] = value[:default]
      end
    else
      type = tags[key].class
      type = 'Boolean' if type == TrueClass || type == FalseClass
      if value[:type].capitalize == type.to_s
        next
      else
        raise "Defined type for SemanticType attribute property '#{key}' is #{value[:type].capitalize}, #{type} provided"
      end
    end
  end
end