class DTK::State::Component::Attribute::Influxdb::SemanticType
Attributes
client[R]
content_to_write[RW]
crd_content[R]
expanded_semantictype_spec[R]
name[R]
namespace[R]
Public Class Methods
new(name, namespace)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 9 def initialize(name, namespace) @name = name @namespace = namespace @client = Client.new @crd_content = get(name, namespace) @content_to_write = [] @expanded_spec = ::DTK::CrdClient.get_kubeclient({}).get_semantictype(name, namespace).expandedSpec @expanded_semantictype_spec = expand(crd_content.to_h[:properties], @expanded_spec) end
Public Instance Methods
partial_write_update(component_and_attribute, path, field_name, field_value)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 34 def partial_write_update(component_and_attribute, path, field_name, field_value) parent, child = validate_parameter(path) component_id, attribute_name = component_and_attribute.split('/') # getting previous value for given parameters previous_value = {} begin flux_query = 'from(bucket:"' + @client.connection_parameters[:bucket] + '") |> range(start:-5) |> filter(fn:(r) => r._measurement == "' + attribute_name + "_" + child[:type] + '") |> filter(fn: (r) => r.parent == "' + parent[:name] + '") |> filter(fn: (r) => r.name == "' + child[:name] + '")|> last()' result = @client.query(query: flux_query) previous_value = result.values.map(&:records).flatten.map(&:values) rescue => e raise "Partial write could not be completed. Previous point for given parameters not found!. Error: #{e}" end update_current(previous_value[0], get_path_to_object(path), field_name, field_value) end
write_semantictype_inventory(inventory, component_id)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 19 def write_semantictype_inventory(inventory, component_id) get_influxdb_properties(inventory) content_to_write.each do |point| point[:tags].merge!({ component_id: component_id, attribute_name: @name }) @client.write_point({ name: point[:name], tags: point[:tags], fields: point[:fields], time: (Time.new.to_f * 1000).to_i }) end rescue => e raise "#{name} inventory write failed. Error: #{e}" end
Private Instance Methods
expand(crd_content, expanded_spec)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 51 def expand(crd_content, expanded_spec) if expanded_spec.nil? expanded_spec = {} populate_semantictypes(crd_content, expanded_spec) ::DTK::CrdClient.get_kubeclient({}).merge_patch_semantictype(name, { expandedSpec: expanded_spec }, namespace) end expanded_spec end
get(name, namespace, opts = {})
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 60 def get(name, namespace, opts = {}) semantictype = ::DTK::CrdClient.get_kubeclient(opts).get_semantictype(name, namespace) semantictype.spec[:openAPIV3Schema] rescue => e raise "SemanticType attribute with name '#{name}' not found on the cluster!. Error: #{e.inspect}" end
get_influxdb_properties(inventory, parent_type = [:top], parent_name = nil)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 107 def get_influxdb_properties(inventory, parent_type = [:top], parent_name = nil) content_to_write = [] properties = { } inventory.each_pair do |key, value| if value.class.to_s == 'Array' inventory[key].each do |element| get_influxdb_properties(element, parent_type.push(key), inventory[:name]) end else properties[key] = value end end resolve_property(parent_type, parent_name, properties) parent_type.pop "Attribute successfully validated!" end
get_partial_definition(path)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 192 def get_partial_definition(path) i = 0 definition = {} semantictype_crd = crd_content.to_h[:properties] expanded_spec = expanded_semantictype_spec.to_h while i < path.length if path[i].to_sym == :top semantictype_crd.each_pair do |key, value| if key == :required definition[key] = value else basic, semantictype = value[:type].split(':') semantictype.nil? ? definition[key] = value : definition[key] = semantictype_crd[key] if basic != 'array' end end else definition = {} expanded_spec[path[i].to_sym].each_pair do |key, value| definition[key] = value unless value[:type].nil? end expanded_spec = expanded_spec[path[i].to_sym] end i+=1 end definition end
get_path_to_object(parameter)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 219 def get_path_to_object(parameter) path = ['top'] array = parameter.split('/') array.each do |element| path.push(element.split(':')[1]) end path end
populate_semantictypes(crd, expanded_spec, parent = nil)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 67 def populate_semantictypes(crd, expanded_spec, parent = nil) crd.each_pair do |key, value| basic, semantictype = value[:type].split(':') if semantictype expanded_spec[key] = get(basic.to_s, namespace).to_h[:properties] crd[key] = get(basic.to_s, namespace).to_h[:properties] temporary_expanded_spec = populate_semantictypes(crd[key], expanded_spec[key], basic) end if basic == 'array' basic, semantictype = value[:items][:type].split(':') if semantictype expanded_spec[key] = get(basic.to_s, namespace).to_h[:properties] crd[key] = get(basic.to_s, namespace).to_h[:properties] temporary_expanded_spec = populate_semantictypes(crd[key], expanded_spec[key], basic) end end temporary_expanded_spec ||= {} unless temporary_expanded_spec.empty? || parent.nil? ::DTK::CrdClient.get_kubeclient({}).merge_patch_semantictype(parent, { expandedSpec: { key => temporary_expanded_spec } }, namespace) end end expanded_spec end
resolve_property(parent_type, parent_name, properties)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 124 def resolve_property(parent_type, parent_name, properties) definition = get_partial_definition(parent_type) request = get_tags_and_fields(definition, properties) validate_request(definition, request) request[:name] = name + "_" + parent_type.last.to_s request[:tags][:parent] = parent_name unless parent_name.nil? content_to_write.push(request) end
update_current(previous_value, path, field_name, field_value)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 91 def update_current(previous_value, path, field_name, field_value) tags = {} previous_value.each_pair do |key, value| tags[key] = value if key[0..0] != '_' && key != 'result' && key != 'table' end fields = Hash.new fields[field_name.to_sym] = field_value validate_fields(get_partial_definition(path), fields) @client.write_point({ name: previous_value['_measurement'], tags: tags, fields: fields, time: (Time.new.to_f * 1000).to_i }) end
validate_fields(partial_definition, fields)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 160 def validate_fields(partial_definition, fields) partial_definition.each_pair do |key, value| next if key == :required || value[:metric] == (false || nil) if fields[key].nil? raise "Field #{key} is missing. Validation of request failed!" elsif value[:type].capitalize != fields[key].class.to_s raise "Defined type for SemanticType attribute property '#{key}' is #{value[:type].capitalize}, #{fields[key].class} provided" end end end
validate_parameter(parameter)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 228 def validate_parameter(parameter) array_of_parameters = [] parameter.split('/').each_with_index do |param, index| name, type = param.split(':') raise unless name && type array_of_parameters.push({ name: name, type: type }) end array_of_parameters rescue => e raise "Could not resolve parameter '#{parameter}'. It should be in format: 'parent:type/child:type'" end
validate_request(partial_definition, request)
click to toggle source
# File lib/state/component/providers/influxdb/semantictype.rb, line 133 def validate_request(partial_definition, request) validate_tags(partial_definition, request[:tags]) validate_fields(partial_definition, request[:fields]) rescue => e raise e end