class ElasticAPM::Transport::Base
rubocop:disable Metrics/ClassLength @api private
Constants
- WATCHER_EXECUTION_INTERVAL
- WATCHER_TIMEOUT_INTERVAL
- WORKER_JOIN_TIMEOUT
Attributes
config[R]
filters[R]
queue[R]
stopped[R]
watcher[R]
workers[R]
Public Class Methods
new(config)
click to toggle source
# File lib/elastic_apm/transport/base.rb, line 21 def initialize(config) @config = config @queue = SizedQueue.new(config.api_buffer_size) @serializers = Serializers.new(config) @filters = Filters.new(config) @stopped = Concurrent::AtomicBoolean.new @workers = Array.new(config.pool_size) @watcher_mutex = Mutex.new @worker_mutex = Mutex.new end
Public Instance Methods
add_filter(key, callback)
click to toggle source
rubocop:enable Metrics/MethodLength
# File lib/elastic_apm/transport/base.rb, line 73 def add_filter(key, callback) @filters.add(key, callback) end
start()
click to toggle source
# File lib/elastic_apm/transport/base.rb, line 37 def start debug '%s: Starting Transport', pid_str ensure_watcher_running ensure_worker_count end
stop()
click to toggle source
# File lib/elastic_apm/transport/base.rb, line 44 def stop debug '%s: Stopping Transport', pid_str @stopped.make_true stop_watcher stop_workers end
submit(resource)
click to toggle source
rubocop:disable Metrics/MethodLength
# File lib/elastic_apm/transport/base.rb, line 54 def submit(resource) if @stopped.true? warn '%s: Transport stopping, no new events accepted', pid_str return false end ensure_watcher_running queue.push(resource, true) true rescue ThreadError throttled_queue_full_warning nil rescue Exception => e error '%s: Failed adding to the transport queue: %p', pid_str, e.inspect nil end
Private Instance Methods
all_workers_alive?()
click to toggle source
# File lib/elastic_apm/transport/base.rb, line 111 def all_workers_alive? !!workers.all? { |t| t&.alive? } end
boot_worker()
click to toggle source
# File lib/elastic_apm/transport/base.rb, line 115 def boot_worker debug '%s: Booting worker...', pid_str Thread.new do Worker.new( config, queue, serializers: @serializers, filters: @filters ).work_forever end end
ensure_watcher_running()
click to toggle source
# File lib/elastic_apm/transport/base.rb, line 83 def ensure_watcher_running # pid has changed == we've forked return if @pid == Process.pid @watcher_mutex.synchronize do return if @pid == Process.pid @pid = Process.pid @watcher = Concurrent::TimerTask.execute( execution_interval: WATCHER_EXECUTION_INTERVAL, timeout_interval: WATCHER_TIMEOUT_INTERVAL ) { ensure_worker_count } end end
ensure_worker_count()
click to toggle source
# File lib/elastic_apm/transport/base.rb, line 98 def ensure_worker_count @worker_mutex.synchronize do return if all_workers_alive? return if stopped.true? @workers.map! do |thread| next thread if thread&.alive? boot_worker end end end
pid_str()
click to toggle source
# File lib/elastic_apm/transport/base.rb, line 79 def pid_str format('[PID:%s]', Process.pid) end
send_stop_messages()
click to toggle source
rubocop:enable Metrics/MethodLength
# File lib/elastic_apm/transport/base.rb, line 150 def send_stop_messages config.pool_size.times { queue.push(Worker::StopMessage.new, true) } rescue ThreadError warn 'Cannot push stop messages to worker queue as it is full' end
stop_watcher()
click to toggle source
# File lib/elastic_apm/transport/base.rb, line 156 def stop_watcher @watcher_mutex.synchronize do return if watcher.nil? || @pid != Process.pid watcher.shutdown end end
stop_workers()
click to toggle source
rubocop:disable Metrics/MethodLength
# File lib/elastic_apm/transport/base.rb, line 128 def stop_workers debug '%s: Stopping workers', pid_str send_stop_messages @worker_mutex.synchronize do workers.each do |thread| next if thread.nil? next if thread.join(WORKER_JOIN_TIMEOUT) debug( '%s: Worker did not stop in %ds, killing...', pid_str, WORKER_JOIN_TIMEOUT ) thread.kill end @workers.clear end end
throttled_queue_full_warning()
click to toggle source
# File lib/elastic_apm/transport/base.rb, line 163 def throttled_queue_full_warning (@queue_full_log ||= Util::Throttle.new(5) do warn( '%s: Queue is full (%i items), skipping…', pid_str, config.api_buffer_size ) end).call end