class Triglav::Agent::Base::Processor

Triglav agent processor class.

An instance is created for a `resource_uri_prefix`.

You usually do not need to customize this class, but if you want to implement your original, configure

Triglav::Agent::Configuration.processor_class

Attributes

resource_uri_prefix[R]
worker[R]

Public Class Methods

max_consecuitive_error_count() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 23
def self.max_consecuitive_error_count
  3
end
new(worker, resource_uri_prefix) click to toggle source
# File lib/triglav/agent/base/processor.rb, line 18
def initialize(worker, resource_uri_prefix)
  @worker = worker
  @resource_uri_prefix = resource_uri_prefix
end

Public Instance Methods

process() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 27
def process
  before_process
  success_count = 0
  consecutive_error_count = 0
  Parallel.each(resources, parallel_opts) do |resource|
    raise Parallel::Break if stopped?
    events = nil
    begin
      @connection_pool.with do |connection|
        monitor = monitor_class.new(connection, resource_uri_prefix, resource)
        monitor.process do |_events|
          events = _events
          $logger.info { "send_messages:#{events.map(&:to_hash).to_json}" }
          @api_client_pool.with {|api_client| api_client.send_messages(events) }
        end
      end
      @mutex.synchronize do
        success_count += 1
        consecutive_error_count = 0
      end
    rescue => e
      log_error(e)
      $logger.info { "failed_events:#{events.map(&:to_hash).to_json}" } if events
      @mutex.synchronize do
        raise TooManyError if (consecutive_error_count += 1) > self.class.max_consecuitive_error_count
      end
    end
  end
  success_count
ensure
  after_process
end
total_count() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 60
def total_count
  resources.size
end

Private Instance Methods

after_process() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 78
def after_process
  @connection_pool.shutdown {|conn| conn.close rescue nil }
  @api_client_pool.shutdown {|conn| conn.close rescue nil }
end
before_process() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 66
def before_process
  @connection_pool = ConnectionPool.new(connection_pool_opts) {
    connection_class.new(get_connection_info(resource_uri_prefix))
  }
  @api_client_pool = ConnectionPool.new(connection_pool_opts) {
    ApiClient.new # renew connection
  }
  resource_uris = resources.map {|resource| resource.uri.to_sym }
  Status.select_resource_uris!(resource_uri_prefix, resource_uris)
  @mutex = Mutex.new
end
connection_class() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 83
def connection_class
  Configuration.connection_class
end
connection_pool_opts() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 106
def connection_pool_opts
  {size: connection_pool_size, timeout: connection_pool_timeout}
end
connection_pool_size() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 118
def connection_pool_size
  $setting.dig(name, :connection_pool_size) || parallel_size
end
connection_pool_timeout() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 122
def connection_pool_timeout
  $setting.dig(name, :connection_pool_timeout) || 60
end
get_connection_info(resource_uri_prefix) click to toggle source
# File lib/triglav/agent/base/processor.rb, line 126
def get_connection_info(resource_uri_prefix)
  $setting.dig(name, :connection_info)[resource_uri_prefix]
end
log_error(e) click to toggle source
# File lib/triglav/agent/base/processor.rb, line 130
def log_error(e)
  $logger.error { "#{e.class} #{e.message} #{e.backtrace.join("\\n")}" } # one line
end
monitor_class() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 87
def monitor_class
  Configuration.monitor_class
end
name() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 91
def name
  Configuration.name
end
parallel_opts() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 102
def parallel_opts
  parallel_type == 'process' ? {in_processes: parallel_size} : {in_threads: parallel_size}
end
parallel_size() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 110
def parallel_size
  $setting.dig(name, :parallel_size) || 1
end
parallel_type() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 114
def parallel_type
  $setting.dig(name, :parallel_type) || 'thread'
end
resources() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 95
def resources
  return @resources if @resources
  @resources = ApiClient.new.list_aggregated_resources(resource_uri_prefix) || []
  $logger.debug { "resource_uri_prefix:#{resource_uri_prefix} resources.size:#{@resources.size}" }
  @resources
end
stopped?() click to toggle source
# File lib/triglav/agent/base/processor.rb, line 134
def stopped?
  worker.stopped? if worker and worker.respond_to?(:stopped?)
end