class TopologicalInventory::Providers::Common::Operations::Source

Constants

AUTH_NOT_NECESSARY
ERROR_MESSAGES
EVENT_AVAILABILITY_STATUS
KAFKA_TOPIC_NAME
LAST_CHECKED_AT_THRESHOLD
STATUS_UNAVAILABLE

Attributes

account_number[RW]
identity[RW]
metrics[RW]
operation[RW]
params[RW]
request_context[RW]
source_id[RW]
updates_via_kafka[RW]

Public Class Methods

new(params = {}, request_context = nil, metrics = nil) click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 32
def initialize(params = {}, request_context = nil, metrics = nil)
  self.metrics           = metrics
  self.operation         = 'Source'
  self.params            = params
  self.request_context   = request_context
  self.source_id         = params['source_id']
  self.account_number    = params['external_tenant']
  self.updates_via_kafka = ENV['UPDATE_SOURCES_VIA_API'].blank?
  self.identity          = identity_by_account_number(account_number)
end

Public Instance Methods

availability_check() click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 43
def availability_check
  self.operation += '#availability_check'

  return operation_status[:error] if params_missing?

  return operation_status[:skipped] if checked_recently?

  status, error_message = connection_status

  update_source_and_subresources(status, error_message)

  logger.availability_check("Completed: Source #{source_id} is #{status}")

  operation_status[:success]
end

Private Instance Methods

application_connection_check() click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 110
def application_connection_check
  case application.availability_status
  when "available"
    [STATUS_AVAILABLE, nil]
  when "unavailable"
    [STATUS_UNAVAILABLE, "Application id #{application.id} unavailable"]
  end
end
availability_status_message(payload) click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 210
def availability_status_message(payload)
  messaging_client.publish_topic(
    :service => TopologicalInventory::Providers::Common::ClowderConfig.kafka_topic(KAFKA_TOPIC_NAME),
    :event   => EVENT_AVAILABILITY_STATUS,
    :payload => payload.to_json,
    :headers => identity
  )
rescue => err
  logger.availability_check("Failed to update #{payload[:resource_type]} id: #{payload[:resource_id]} - #{err.message}", :error)
ensure
  messaging_client&.close
end
check_time() click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 227
def check_time
  @check_time ||= Time.now.utc
end
checked_recently?() click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 79
def checked_recently?
  checked_recently = if endpoint.present?
                       endpoint.last_checked_at.present? && endpoint.last_checked_at >= LAST_CHECKED_AT_THRESHOLD.ago
                     elsif application.present?
                       application.last_checked_at.present? && application.last_checked_at >= LAST_CHECKED_AT_THRESHOLD.ago
                     end

  logger.availability_check("Skipping, last check at #{endpoint&.last_checked_at || application&.last_checked_at} [Source ID: #{source_id}] ") if checked_recently

  checked_recently
end
connection_check() click to toggle source

@return [Array<String, String|nil] - STATUS_AVAILABLE, error message

# File lib/topological_inventory/providers/common/operations/source.rb, line 120
def connection_check
  raise NotImplementedError, "#{__method__} must be implemented in a subclass"
end
connection_status() click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 91
def connection_status
  # we need either an endpoint or application to check the source.
  return [STATUS_UNAVAILABLE, ERROR_MESSAGES[:endpoint_or_application_not_found]] unless endpoint || application

  check_time
  if endpoint
    endpoint_connection_check
  elsif application
    application_connection_check
  end
end
endpoint_connection_check() click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 103
def endpoint_connection_check
  return [STATUS_UNAVAILABLE, ERROR_MESSAGES[:authentication_not_found]] unless authentication

  # call down into the operations pod implementation of `Source#connection_check`
  connection_check
end
messaging_client() click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 223
def messaging_client
  TopologicalInventory::Providers::Common::MessagingClient.default.client
end
params_missing?() click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 67
def params_missing?
  is_missing = false
  required_params.each do |attr|
    if (is_missing = params[attr].blank?)
      logger.availability_check("Missing #{attr} for the availability_check request [Source ID: #{source_id}]", :error)
      break
    end
  end

  is_missing
end
required_params() click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 63
def required_params
  %w[source_id]
end
update_application(status) click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 191
def update_application(status)
  application_update                     = ::SourcesApiClient::Application.new
  application_update.last_checked_at     = check_time
  application_update.last_available_at   = check_time if status == STATUS_AVAILABLE

  sources_api.update_application(application.id, application_update)
rescue ::SourcesApiClient::ApiError => e
  metrics&.record_error(:sources_api)
  logger.availability_check("Failed to update Application id: #{application.id} - #{e.message}", :error)
end
update_application_by_kafka(status) click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 202
def update_application_by_kafka(status)
  availability_status_message(
    :resource_type => "Application",
    :resource_id   => application.id,
    :status        => status
  )
end
update_endpoint(status, error_message) click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 158
def update_endpoint(status, error_message)
  if endpoint.nil?
    logger.availability_check("Failed to update Endpoint for Source id:#{source_id}. Endpoint not found", :error)
    return
  end

  endpoint_update = ::SourcesApiClient::Endpoint.new

  endpoint_update.availability_status       = status
  endpoint_update.availability_status_error = error_message.to_s
  endpoint_update.last_checked_at           = check_time
  endpoint_update.last_available_at         = check_time if status == STATUS_AVAILABLE

  sources_api.update_endpoint(endpoint.id, endpoint_update)
rescue ::SourcesApiClient::ApiError => e
  metrics&.record_error(:sources_api)
  logger.availability_check("Failed to update Endpoint(ID: #{endpoint.id}) - #{e.message}", :error)
end
update_endpoint_by_kafka(status, error_message) click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 177
def update_endpoint_by_kafka(status, error_message)
  if endpoint.nil?
    logger.availability_check("Failed to update Endpoint for Source id:#{source_id}. Endpoint not found", :error)
    return
  end

  availability_status_message(
    :resource_type => "Endpoint",
    :resource_id   => endpoint.id,
    :status        => status,
    :error         => error_message
  )
end
update_source(status) click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 138
def update_source(status)
  source                     = ::SourcesApiClient::Source.new
  source.availability_status = status
  source.last_checked_at     = check_time
  source.last_available_at   = check_time if status == STATUS_AVAILABLE

  sources_api.update_source(source_id, source)
rescue ::SourcesApiClient::ApiError => e
  metrics&.record_error(:sources_api)
  logger.availability_check("Failed to update Source id:#{source_id} - #{e.message}", :error)
end
update_source_and_subresources(status, error_message = nil) click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 124
def update_source_and_subresources(status, error_message = nil)
  logger.availability_check("Updating source [#{source_id}] status [#{status}] message [#{error_message}]")

  if updates_via_kafka
    update_source_by_kafka(status)
    update_endpoint_by_kafka(status, error_message) if endpoint
    update_application_by_kafka(status) if application
  else
    update_source(status)
    update_endpoint(status, error_message) if endpoint
    update_application(status) if application
  end
end
update_source_by_kafka(status) click to toggle source
# File lib/topological_inventory/providers/common/operations/source.rb, line 150
def update_source_by_kafka(status)
  availability_status_message(
    :resource_type => "Source",
    :resource_id   => source_id,
    :status        => status
  )
end