class TopologicalInventory::Providers::Common::Collector

Attributes

collector_threads[RW]
finished[RW]
limits[RW]
poll_time[RW]
queue[RW]
source[RW]
standalone_mode[RW]

Public Class Methods

new(source, default_limit: 1_000, poll_time: 30, standalone_mode: true) click to toggle source

@param poll_time [Integer] Waiting between collecting loops. Irrelevant for standalone_mode: true @param standalone_mode [Boolean] T/F if collector is created by collectors_pool

# File lib/topological_inventory/providers/common/collector.rb, line 15
def initialize(source, default_limit: 1_000, poll_time: 30, standalone_mode: true)
  self.collector_threads = Concurrent::Map.new
  self.finished          = Concurrent::AtomicBoolean.new(false)
  self.poll_time         = poll_time
  self.limits            = Hash.new(default_limit)
  self.queue             = Queue.new
  self.source            = source
  self.standalone_mode   = standalone_mode
end

Public Instance Methods

collect!() click to toggle source
# File lib/topological_inventory/providers/common/collector.rb, line 25
def collect!
  start_collector_threads

  until finished? do
    ensure_collector_threads

    notices = []
    notices << queue.pop until queue.empty?

    targeted_refresh(notices) unless notices.empty?

    standalone_mode ? sleep(poll_time) : stop
  end
end
stop() click to toggle source
# File lib/topological_inventory/providers/common/collector.rb, line 40
def stop
  finished.value = true
end

Protected Instance Methods

collector_thread(_connection, _entity_type) click to toggle source

Thread's main for collecting one entity type's data

# File lib/topological_inventory/providers/common/collector.rb, line 90
def collector_thread(_connection, _entity_type)
  raise NotImplementedError
end
connection_for_entity_type(_entity_type) click to toggle source

Connection to endpoint for each entity type

# File lib/topological_inventory/providers/common/collector.rb, line 85
def connection_for_entity_type(_entity_type)
  raise NotImplementedError
end
default_refresh_type() click to toggle source
# File lib/topological_inventory/providers/common/collector.rb, line 172
def default_refresh_type
  'full-refresh'
end
endpoint_types() click to toggle source

Should be overriden by subclass Entity types collected from endpoints

# File lib/topological_inventory/providers/common/collector.rb, line 59
def endpoint_types
  %w()
end
ensure_collector_threads() click to toggle source
# File lib/topological_inventory/providers/common/collector.rb, line 71
def ensure_collector_threads
  start_collector_threads
end
entity_types() click to toggle source
# File lib/topological_inventory/providers/common/collector.rb, line 53
def entity_types
  endpoint_types.flat_map { |endpoint| send("#{endpoint}_entity_types") }
end
finished?() click to toggle source
# File lib/topological_inventory/providers/common/collector.rb, line 49
def finished?
  finished.value
end
ingress_api_client() click to toggle source
# File lib/topological_inventory/providers/common/collector.rb, line 176
def ingress_api_client
  TopologicalInventoryIngressApiClient::DefaultApi.new
end
inventory_name() click to toggle source
# File lib/topological_inventory/providers/common/collector.rb, line 164
def inventory_name
  "Default"
end
log_external_url(url) click to toggle source
# File lib/topological_inventory/providers/common/collector.rb, line 180
def log_external_url(url)
  logger.info("[EXTERNAL URL] #{url}")
end
save_inventory(collections, inventory_name, schema, refresh_state_uuid = nil, refresh_state_part_uuid = nil, refresh_state_part_collected_at = nil, refresh_state_part_sent_at = Time.now.utc, refresh_type = default_refresh_type) click to toggle source

@param refresh_state_part_collected_at [Time] when this payload is collected (for [Core]:RefreshStatePart) @param refresh_state_part_sent_at [Time] when this payload is sent (for [Core]:RefreshStatePart)

# File lib/topological_inventory/providers/common/collector.rb, line 101
def save_inventory(collections,
                   inventory_name,
                   schema,
                   refresh_state_uuid = nil,
                   refresh_state_part_uuid = nil,
                   refresh_state_part_collected_at = nil,
                   refresh_state_part_sent_at = Time.now.utc,
                   refresh_type = default_refresh_type)
  return 0 if collections.empty?

  SaveInventory::Saver.new(:client => ingress_api_client, :logger => logger).save(
    :inventory => TopologicalInventoryIngressApiClient::Inventory.new(
      :name                            => inventory_name,
      :schema                          => TopologicalInventoryIngressApiClient::Schema.new(:name => schema),
      :source                          => source,
      :collections                     => collections,
      :refresh_state_uuid              => refresh_state_uuid,
      :refresh_state_part_uuid         => refresh_state_part_uuid,
      :refresh_state_part_collected_at => refresh_state_part_collected_at,
      :refresh_state_part_sent_at      => refresh_state_part_sent_at,
      :refresh_type                    => refresh_type
    )
  )
rescue => e
  response_body    = e.response_body if e.respond_to? :response_body
  response_headers = e.response_headers if e.respond_to? :response_headers
  logger.error("Error when sending payload to Ingress API. Error message: #{e.message}. Body: #{response_body}. Header: #{response_headers}")
  raise e
end
schema_name() click to toggle source
# File lib/topological_inventory/providers/common/collector.rb, line 168
def schema_name
  "Default"
end
start_collector_thread(entity_type) click to toggle source
# File lib/topological_inventory/providers/common/collector.rb, line 75
def start_collector_thread(entity_type)
  connection = connection_for_entity_type(entity_type)
  return if connection.nil?

  Thread.new do
    collector_thread(connection, entity_type)
  end
end
start_collector_threads() click to toggle source
# File lib/topological_inventory/providers/common/collector.rb, line 63
def start_collector_threads
  entity_types.each do |entity_type|
    next if collector_threads[entity_type]&.alive?

    collector_threads[entity_type] = start_collector_thread(entity_type)
  end
end
sweep_inventory(inventory_name, schema, refresh_state_uuid, total_parts, sweep_scope, refresh_state_started_at = nil, refresh_state_sent_at = Time.now.utc, refresh_type = default_refresh_type) click to toggle source

@param refresh_state_started_at [Time] when collecting of this entity type is started (for [Core]:RefreshState) @param refresh_state_sent_at [Time] when this payload is sent (for [Core]:RefreshState)

# File lib/topological_inventory/providers/common/collector.rb, line 133
def sweep_inventory(inventory_name,
                    schema,
                    refresh_state_uuid,
                    total_parts,
                    sweep_scope,
                    refresh_state_started_at = nil,
                    refresh_state_sent_at = Time.now.utc,
                    refresh_type = default_refresh_type)
  return if !total_parts || sweep_scope.empty?

  SaveInventory::Saver.new(:client => ingress_api_client, :logger => logger).save(
    :inventory => TopologicalInventoryIngressApiClient::Inventory.new(
      :name                     => inventory_name,
      :schema                   => TopologicalInventoryIngressApiClient::Schema.new(:name => schema),
      :source                   => source,
      :collections              => [],
      :refresh_state_uuid       => refresh_state_uuid,
      :total_parts              => total_parts,
      :sweep_scope              => sweep_scope,
      :refresh_state_started_at => refresh_state_started_at,
      :refresh_state_sent_at    => refresh_state_sent_at,
      :refresh_type             => refresh_type
    )
  )
rescue => e
  response_body    = e.response_body if e.respond_to? :response_body
  response_headers = e.response_headers if e.respond_to? :response_headers
  logger.error("Error when sending payload to Ingress API. Error message: #{e.message}. Body: #{response_body}. Header: #{response_headers}")
  raise e
end
targeted_refresh(notices) click to toggle source

@optional Listen to notices from threads

# File lib/topological_inventory/providers/common/collector.rb, line 96
def targeted_refresh(notices)
end