class TopologicalInventory::Providers::Common::Collector
Attributes
Public Class Methods
@param poll_time
[Integer] Waiting between collecting loops. Irrelevant for standalone_mode
: true @param standalone_mode
[Boolean] T/F if collector is created by collectors_pool
# File lib/topological_inventory/providers/common/collector.rb, line 15 def initialize(source, default_limit: 1_000, poll_time: 30, standalone_mode: true) self.collector_threads = Concurrent::Map.new self.finished = Concurrent::AtomicBoolean.new(false) self.poll_time = poll_time self.limits = Hash.new(default_limit) self.queue = Queue.new self.source = source self.standalone_mode = standalone_mode end
Public Instance Methods
# File lib/topological_inventory/providers/common/collector.rb, line 25 def collect! start_collector_threads until finished? do ensure_collector_threads notices = [] notices << queue.pop until queue.empty? targeted_refresh(notices) unless notices.empty? standalone_mode ? sleep(poll_time) : stop end end
# File lib/topological_inventory/providers/common/collector.rb, line 40 def stop finished.value = true end
Protected Instance Methods
Thread's main for collecting one entity type's data
# File lib/topological_inventory/providers/common/collector.rb, line 90 def collector_thread(_connection, _entity_type) raise NotImplementedError end
Connection to endpoint for each entity type
# File lib/topological_inventory/providers/common/collector.rb, line 85 def connection_for_entity_type(_entity_type) raise NotImplementedError end
# File lib/topological_inventory/providers/common/collector.rb, line 172 def default_refresh_type 'full-refresh' end
Should be overriden by subclass Entity types collected from endpoints
# File lib/topological_inventory/providers/common/collector.rb, line 59 def endpoint_types %w() end
# File lib/topological_inventory/providers/common/collector.rb, line 71 def ensure_collector_threads start_collector_threads end
# File lib/topological_inventory/providers/common/collector.rb, line 53 def entity_types endpoint_types.flat_map { |endpoint| send("#{endpoint}_entity_types") } end
# File lib/topological_inventory/providers/common/collector.rb, line 49 def finished? finished.value end
# File lib/topological_inventory/providers/common/collector.rb, line 176 def ingress_api_client TopologicalInventoryIngressApiClient::DefaultApi.new end
# File lib/topological_inventory/providers/common/collector.rb, line 164 def inventory_name "Default" end
# File lib/topological_inventory/providers/common/collector.rb, line 180 def log_external_url(url) logger.info("[EXTERNAL URL] #{url}") end
@param refresh_state_part_collected_at [Time] when this payload is collected (for [Core]:RefreshStatePart) @param refresh_state_part_sent_at [Time] when this payload is sent (for [Core]:RefreshStatePart)
# File lib/topological_inventory/providers/common/collector.rb, line 101 def save_inventory(collections, inventory_name, schema, refresh_state_uuid = nil, refresh_state_part_uuid = nil, refresh_state_part_collected_at = nil, refresh_state_part_sent_at = Time.now.utc, refresh_type = default_refresh_type) return 0 if collections.empty? SaveInventory::Saver.new(:client => ingress_api_client, :logger => logger).save( :inventory => TopologicalInventoryIngressApiClient::Inventory.new( :name => inventory_name, :schema => TopologicalInventoryIngressApiClient::Schema.new(:name => schema), :source => source, :collections => collections, :refresh_state_uuid => refresh_state_uuid, :refresh_state_part_uuid => refresh_state_part_uuid, :refresh_state_part_collected_at => refresh_state_part_collected_at, :refresh_state_part_sent_at => refresh_state_part_sent_at, :refresh_type => refresh_type ) ) rescue => e response_body = e.response_body if e.respond_to? :response_body response_headers = e.response_headers if e.respond_to? :response_headers logger.error("Error when sending payload to Ingress API. Error message: #{e.message}. Body: #{response_body}. Header: #{response_headers}") raise e end
# File lib/topological_inventory/providers/common/collector.rb, line 168 def schema_name "Default" end
# File lib/topological_inventory/providers/common/collector.rb, line 75 def start_collector_thread(entity_type) connection = connection_for_entity_type(entity_type) return if connection.nil? Thread.new do collector_thread(connection, entity_type) end end
# File lib/topological_inventory/providers/common/collector.rb, line 63 def start_collector_threads entity_types.each do |entity_type| next if collector_threads[entity_type]&.alive? collector_threads[entity_type] = start_collector_thread(entity_type) end end
@param refresh_state_started_at [Time] when collecting of this entity type is started (for [Core]:RefreshState) @param refresh_state_sent_at [Time] when this payload is sent (for [Core]:RefreshState)
# File lib/topological_inventory/providers/common/collector.rb, line 133 def sweep_inventory(inventory_name, schema, refresh_state_uuid, total_parts, sweep_scope, refresh_state_started_at = nil, refresh_state_sent_at = Time.now.utc, refresh_type = default_refresh_type) return if !total_parts || sweep_scope.empty? SaveInventory::Saver.new(:client => ingress_api_client, :logger => logger).save( :inventory => TopologicalInventoryIngressApiClient::Inventory.new( :name => inventory_name, :schema => TopologicalInventoryIngressApiClient::Schema.new(:name => schema), :source => source, :collections => [], :refresh_state_uuid => refresh_state_uuid, :total_parts => total_parts, :sweep_scope => sweep_scope, :refresh_state_started_at => refresh_state_started_at, :refresh_state_sent_at => refresh_state_sent_at, :refresh_type => refresh_type ) ) rescue => e response_body = e.response_body if e.respond_to? :response_body response_headers = e.response_headers if e.respond_to? :response_headers logger.error("Error when sending payload to Ingress API. Error message: #{e.message}. Body: #{response_body}. Header: #{response_headers}") raise e end
@optional Listen to notices from threads
# File lib/topological_inventory/providers/common/collector.rb, line 96 def targeted_refresh(notices) end