class TopologicalInventory::Providers::Common::CollectorsPool

Constants

COLLECTOR_POLL_TIME
SECRET_FILENAME

Attributes

collector_poll_time[RW]
collector_status[RW]
collectors[RW]
config_name[RW]
metrics[RW]
secrets[RW]
thread_pool[RW]
updated_at[RW]

Public Class Methods

new(config_name, metrics, collector_poll_time: COLLECTOR_POLL_TIME, thread_pool_size: 2) click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 10
def initialize(config_name, metrics, collector_poll_time: COLLECTOR_POLL_TIME, thread_pool_size: 2)
  self.config_name         = config_name
  self.collector_status    = Concurrent::Map.new
  self.metrics             = metrics
  self.collector_poll_time = collector_poll_time
  self.secrets             = nil
  self.thread_pool         = Concurrent::FixedThreadPool.new(thread_pool_size)
  self.updated_at          = {}
end

Public Instance Methods

run!() click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 20
def run!
  loop do
    reload_config
    reload_secrets

    # Secret is deployed just after config map, we should wait for it
    queue_collectors if secrets_newer_than_config?

    sleep(5)
  end
end
stop!() click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 32
def stop!
  collectors.each_value(&:stop)

  thread_pool.shutdown
  # Wait for end of collectors to ensure metrics are stopped after them
  thread_pool.wait_for_termination
end

Protected Instance Methods

new_collector(source, source_secret) click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 130
def new_collector(source, source_secret)
  raise NotImplementedError, "#{__method__} must be implemented in a subclass"
end
path_to_config() click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 117
def path_to_config
  raise NotImplementedError, "#{__method__} must be implemented in a subclass"
end
path_to_secrets() click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 121
def path_to_secrets
  raise NotImplementedError, "#{__method__} must be implemented in a subclass"
end
queue_collectors() click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 65
def queue_collectors
  ::Settings.sources.to_a.each do |source|
    # Skip if collector is running/queued or just finished
    next if queued_or_updated_recently?(source)

    # Check if secrets for this source are present
    next if (source_secret = secrets_for_source(source)).nil?

    # Check if necessary endpoint/auth data are not blank (provider specific)
    next unless source_valid?(source, source_secret)

    collector_status[source.source] = {:status => :queued}
    # Add source to collector's queue
    thread_pool.post do
      begin
        collector = new_collector(source, source_secret)
        collector.collect!
      ensure
        collector_status[source.source] = {:status => :ready, :last_updated_at => Time.now}
      end
    end
  end
end
queued_or_updated_recently?(source) click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 89
def queued_or_updated_recently?(source)
  return false if collector_status[source.source].nil?
  return true if collector_status[source.source][:status] == :queued

  if (last_updated_at = collector_status[source.source][:last_updated_at]).nil?
    # should never happen
    last_updated_at = Time.now
    collector_status[source.source] = {:status => :ready, :last_updated_at => last_updated_at}
  end

  last_updated_at > Time.now - collector_poll_time.to_i
end
reload_config() click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 45
def reload_config
  config_file = File.join(path_to_config, "#{sanitize_filename(config_name)}.yml")
  raise "Configuration file #{config_file} doesn't exist" unless File.exist?(config_file)

  ::Config.load_and_set_settings(config_file)
end
reload_secrets() click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 52
def reload_secrets
  path = File.join(path_to_secrets, SECRET_FILENAME)
  raise "Secrets file missing at #{path}" unless File.exists?(path)
  file         = File.read(path)
  self.secrets = JSON.parse(file)
end
sanitize_filename(filename) click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 125
def sanitize_filename(filename)
  # Remove any character that aren't 0-9, A-Z, or a-z, / or -
  filename.gsub(/[^0-9A-Z\/\-]/i, '_')
end
secrets_for_source(source) click to toggle source

@param [Hash] source from Settings @return [Hash|nil] {“username”:, “password”:}

# File lib/topological_inventory/providers/common/collectors_pool.rb, line 61
def secrets_for_source(source)
  secrets[source.source]
end
secrets_newer_than_config?() click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 102
def secrets_newer_than_config?
  return false if ::Settings.updated_at.nil? || secrets["updated_at"].nil?

  updated_at[:config] = Time.parse(::Settings.updated_at)
  updated_at[:secret] = Time.parse(secrets["updated_at"])

  logger.info("Reloading Sources data => Config [updated_at: #{updated_at[:config].to_s}], Secrets [updated at: #{updated_at[:secret]}]") if updated_at[:config] <= updated_at[:secret]

  updated_at[:config] <= updated_at[:secret]
end
source_valid?(source, secret) click to toggle source
# File lib/topological_inventory/providers/common/collectors_pool.rb, line 113
def source_valid?(source, secret)
  true
end