class Triglav::Agent::Base::Processor
Triglav
agent processor class.
An instance is created for a `resource_uri_prefix`.
You usually do not need to customize this class, but if you want to implement your original, configure
Triglav::Agent::Configuration.processor_class
Attributes
resource_uri_prefix[R]
worker[R]
Public Class Methods
max_consecuitive_error_count()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 23 def self.max_consecuitive_error_count 3 end
new(worker, resource_uri_prefix)
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 18 def initialize(worker, resource_uri_prefix) @worker = worker @resource_uri_prefix = resource_uri_prefix end
Public Instance Methods
process()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 27 def process before_process success_count = 0 consecutive_error_count = 0 Parallel.each(resources, parallel_opts) do |resource| raise Parallel::Break if stopped? events = nil begin @connection_pool.with do |connection| monitor = monitor_class.new(connection, resource_uri_prefix, resource) monitor.process do |_events| events = _events $logger.info { "send_messages:#{events.map(&:to_hash).to_json}" } @api_client_pool.with {|api_client| api_client.send_messages(events) } end end @mutex.synchronize do success_count += 1 consecutive_error_count = 0 end rescue => e log_error(e) $logger.info { "failed_events:#{events.map(&:to_hash).to_json}" } if events @mutex.synchronize do raise TooManyError if (consecutive_error_count += 1) > self.class.max_consecuitive_error_count end end end success_count ensure after_process end
total_count()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 60 def total_count resources.size end
Private Instance Methods
after_process()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 78 def after_process @connection_pool.shutdown {|conn| conn.close rescue nil } @api_client_pool.shutdown {|conn| conn.close rescue nil } end
before_process()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 66 def before_process @connection_pool = ConnectionPool.new(connection_pool_opts) { connection_class.new(get_connection_info(resource_uri_prefix)) } @api_client_pool = ConnectionPool.new(connection_pool_opts) { ApiClient.new # renew connection } resource_uris = resources.map {|resource| resource.uri.to_sym } Status.select_resource_uris!(resource_uri_prefix, resource_uris) @mutex = Mutex.new end
connection_class()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 83 def connection_class Configuration.connection_class end
connection_pool_opts()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 106 def connection_pool_opts {size: connection_pool_size, timeout: connection_pool_timeout} end
connection_pool_size()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 118 def connection_pool_size $setting.dig(name, :connection_pool_size) || parallel_size end
connection_pool_timeout()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 122 def connection_pool_timeout $setting.dig(name, :connection_pool_timeout) || 60 end
get_connection_info(resource_uri_prefix)
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 126 def get_connection_info(resource_uri_prefix) $setting.dig(name, :connection_info)[resource_uri_prefix] end
log_error(e)
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 130 def log_error(e) $logger.error { "#{e.class} #{e.message} #{e.backtrace.join("\\n")}" } # one line end
monitor_class()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 87 def monitor_class Configuration.monitor_class end
name()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 91 def name Configuration.name end
parallel_opts()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 102 def parallel_opts parallel_type == 'process' ? {in_processes: parallel_size} : {in_threads: parallel_size} end
parallel_size()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 110 def parallel_size $setting.dig(name, :parallel_size) || 1 end
parallel_type()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 114 def parallel_type $setting.dig(name, :parallel_type) || 'thread' end
resources()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 95 def resources return @resources if @resources @resources = ApiClient.new.list_aggregated_resources(resource_uri_prefix) || [] $logger.debug { "resource_uri_prefix:#{resource_uri_prefix} resources.size:#{@resources.size}" } @resources end
stopped?()
click to toggle source
# File lib/triglav/agent/base/processor.rb, line 134 def stopped? worker.stopped? if worker and worker.respond_to?(:stopped?) end