class StackifyRubyAPM::Worker
@api private
Attributes
config[R]
messages[R]
pending_transactions[R]
Public Class Methods
new(config, messages, pending_transactions, trace_logger)
click to toggle source
# File lib/stackify_apm/worker.rb, line 29 def initialize(config, messages, pending_transactions, trace_logger) @config = config @messages = messages @pending_transactions = pending_transactions @trace_logger = trace_logger @serializers = Struct.new(:transactions, :errors).new( Serializers::Transactions.new(config), Serializers::Errors.new(config) ) end
Public Instance Methods
run_forever()
click to toggle source
Collects and sends the transactions
# File lib/stackify_apm/worker.rb, line 45 def run_forever @timer_task = build_timer_task.execute while (msg = messages.pop) case msg when ErrorMsg post_error msg when FlushMsg collect_and_send_transactions when StopMsg # empty collected transactions before exiting collect_and_send_transactions stop! end end end
Private Instance Methods
build_timer_task()
click to toggle source
flush_interval_seconds - interval with which transactions should be sent to the APM. The running task responsible for initiating the transactions sending
# File lib/stackify_apm/worker.rb, line 72 def build_timer_task Concurrent::TimerTask.new(execution_interval: config.flush_interval_seconds) do messages.push(FlushMsg.new) end end
collect_and_send_transactions()
click to toggle source
rubocop:disable Lint/RescueException Collects, builds, and sends transactions via HTTP/TraceLogger
# File lib/stackify_apm/worker.rb, line 81 def collect_and_send_transactions return if pending_transactions.empty? transactions = collect_batched_transactions debug '[Worker] collect_and_send_transactions() transactions : ' + transactions.inspect.to_s if ENV['STACKIFY_TRANSPORT_LOG_LEVEL'] == '0' begin @trace_logger.post(transactions) rescue ::Exception => e error '[Worker] Collect_and_send_transactions error:' error e.inspect error e.backtrace.join("\n") nil end end
collect_batched_transactions()
click to toggle source
rubocop:disable Lint/HandleExceptions
# File lib/stackify_apm/worker.rb, line 104 def collect_batched_transactions batch = [] begin while (transaction = pending_transactions.pop(true)) && batch.length <= config.max_queue_size batch << transaction end rescue ThreadError # queue empty end batch end
post_error(_msg)
click to toggle source
rubocop:enable Lint/RescueException
# File lib/stackify_apm/worker.rb, line 96 def post_error(_msg) return if pending_transactions.empty? # transactions = collect_batched_transactions # payload = @serializers.errors.build_all([msg.error]) # @trace_logger.post(payload, transactions[0]) end
stop!()
click to toggle source
# File lib/stackify_apm/worker.rb, line 64 def stop! @timer_task && @timer_task.shutdown Thread.exit end