class DTK::State::Component::Attribute::Influxdb::Measurement

Constants

InfluxdbEncoding

Attributes

client[R]
name[R]

Public Class Methods

new(name, client) click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 9
def initialize(name, client)
  @name   = name
  @client = client
end

Public Instance Methods

flux_filter(params_hash) click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 14
def flux_filter(params_hash)
  filter = ''
  params_hash.each_pair do |key,value|
    filter += "|> filter(fn: (r) => r.#{key} == \"#{value}\")"
  end
  filter
end
get_last_point(params_hash = {}) click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 22
def get_last_point(params_hash = {})
  check_params_hash(params_hash)
  flux_query = 'from(bucket:"' + client.connection_parameters[:bucket] + '") |> range(start:-5) |> filter(fn: (r) => r._measurement == "' + name.to_s + '")' + flux_filter(params_hash) + ' |> last()'  +   '|> drop(columns: ["_start", "_stop", "_field", "_measurement", "attribute_name", "assembly_name", "task_id", "component_name", "namespace"])'
  result  = self.client.query(query: flux_query)
  result.values.map(&:records).flatten.map(&:values)
rescue => e
  raise "Failed while getting last attribute point. Error: #{e}"
end
get_required_tags(namespace, component_name, assembly_name, attribute_name) click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 31
def get_required_tags(namespace, component_name, assembly_name, attribute_name)
  required_tags = {
    namespace: namespace,
    component_name: component_name,
    assembly_name: assembly_name,
    attribute_name: attribute_name,
    task_id: "1"
  }
end

Private Instance Methods

check_params_hash(params_hash = {}) click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 67
def check_params_hash(params_hash = {})
  checked_params_hash = {}
  self.required_params.each do |name|
    unless value = params_hash[name]
      fail "Missing parameter '#{name}'"
    end
    fail_if_illegal_tag_value(name, value)
    checked_params_hash[name] = value 
  end
  checked_params_hash
end
fail_if_illegal_tag_value(name, value) click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 96
def fail_if_illegal_tag_value(name, value)
  unless LEGAL_TAG_CLASSES.include?(value.class)
    fail "Parameter '#{name}' has an illegal type, legal types are #{LEGAL_TAG_CLASSES.join(', ')}"
  end
end
get_string_params(array) click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 86
def get_string_params(array)
  string_array = {}
  array.each_pair do |key, value|
    string_array[key.to_s] = value
  end
  string_array
end
influxdb_encoding(params_hash, timestamp) click to toggle source

This default encoding can be overwritten by its children, for example attribute might make teh emasurement name be “#{self.name}_#{params” if that turns to be more efficient indexing

# File lib/state/component/providers/influxdb/measurement.rb, line 82
def influxdb_encoding(params_hash, timestamp)
  InfluxdbEncoding.new(self.name, params_hash, timestamp)
end
write_point(value, params_hash = {}, timestamp) click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 49
def write_point(value, params_hash = {}, timestamp)
  # The encode function we allow us to use different mappings of name, params_hash
  # to the influxdb actual measurement name and tags.
  begin
    influxdb_encoding = influxdb_encoding(params_hash, timestamp)
    timestamp = timestamp.nil? ? Time.now : timestamp
    data = {
      name: name.to_s,
      tags:   get_string_params(influxdb_encoding.tags),
      fields: { value: value },
      time: (timestamp.to_f * 1000).to_i
    }
    self.client.write_point(data)
  rescue => error
    fail "write_point error: #{error}"
  end
end