module Mmtrix::Agent::Agent::InstanceMethods
Holds all the methods defined on Mmtrix::Agent::Agent
instances
Attributes
manages agent commands we receive from the collector, and the handlers
cross application tracing ids and encoding
error collector is a simple collection of recorded errors
Global events dispatcher. This will provides our primary mechanism for agent-wide events, such as finishing configuration, error notification and request before/after from Rack
.
Responsbile for restarting the harvest thread
builder for JS agent scripts to inject
GC::Profiler.total_time is not monotonic so we wrap it.
holds a proc that is used to obfuscate sql statements
whether we should record raw, obfuscated, or no sql
service for communicating with collector
the statistics engine that holds all the timeslice data
Transaction
and metric renaming rules as provided by the collector on connect. The former are applied during txns, the latter during harvest.
the transaction sampler that handles recording transactions
Public Instance Methods
This method should be called in a forked process after a fork. It assumes the parent process initialized the agent, but does not assume the agent started.
The call is idempotent, but not re-entrant.
-
It clears any metrics carried over from the parent process
-
Restarts the sampler thread if necessary
-
Initiates a new agent run and worker loop unless that was done in the parent process and
:force_reconnect
is not true
Options:
-
:force_reconnect => true
to force the spawned process to establish a new connection, such as when forking a long running process. The default is false–it will only connect to the server if the parent had not connected. -
:keep_retrying => false
if we try to initiate a new connection, this tells me to only try it once so this method returns quickly if there is some kind of latency with the server.
# File lib/mmtrix/agent/agent.rb, line 161 def after_fork(options={}) needs_restart = false @after_fork_lock.synchronize do needs_restart = @harvester.needs_restart? @harvester.mark_started end return if !needs_restart || !Agent.config[:agent_enabled] || !Agent.config[:monitor_mode] || disconnected? ::Mmtrix::Agent.logger.debug "Starting the worker thread in #{Process.pid} (parent #{Process.ppid}) after forking." channel_id = options[:report_to_channel] install_pipe_service(channel_id) if channel_id # Clear out locks and stats left over from parent process reset_objects_with_locks drop_buffered_data setup_and_start_agent(options) end
Check to see if the agent should start, returning true
if it should.
# File lib/mmtrix/agent/agent.rb, line 509 def agent_should_start? return false if already_started? || disabled? if defer_for_delayed_job? ::Mmtrix::Agent.logger.debug "Deferring startup for DelayedJob" return false end if defer_for_resque? ::Mmtrix::Agent.logger.debug "Deferring startup for Resque in case it daemonizes" return false end unless app_name_configured? Mmtrix::Agent.logger.error "No application name configured.", "The Agent cannot start without at least one. Please check your ", "mmtrix.yml and ensure that it is valid and has at least one ", "value set for app_name in the #{Mmtrix::Control.instance.env} ", "environment." return false end return true end
# File lib/mmtrix/agent/agent.rb, line 503 def defer_for_delayed_job? Mmtrix::Agent.config[:dispatcher] == :delayed_job && !Mmtrix::DelayedJobInjection.worker_name end
Clear out the metric data, errors, and transaction traces, etc.
# File lib/mmtrix/agent/agent.rb, line 548 def drop_buffered_data @stats_engine.reset! @error_collector.reset! @transaction_sampler.reset! @transaction_event_aggregator.reset! @custom_event_aggregator.reset! @sql_sampler.reset! end
# File lib/mmtrix/agent/agent.rb, line 569 def flush_pipe_data if connected? && @service.is_a?(::Mmtrix::Agent::PipeService) transmit_data transmit_event_data end end
# File lib/mmtrix/agent/agent.rb, line 185 def install_pipe_service(channel_id) @service = Mmtrix::Agent::PipeService.new(channel_id) if connected? @connected_pid = Process.pid else ::Mmtrix::Agent.logger.debug("Child process #{Process.pid} not reporting to non-connected parent (process #{Process.ppid}).") @service.shutdown(Time.now) disconnect end end
# File lib/mmtrix/agent/agent.rb, line 901 def merge_data_for_endpoint(endpoint, data) if data && !data.empty? container_for_endpoint(endpoint).merge!(data) end rescue => e Mmtrix::Agent.logger.error("Error while merging #{endpoint} data from child: ", e) end
Pop the current trace execution status. Restore trace execution status to what it was before we pushed the current flag.
# File lib/mmtrix/agent/agent.rb, line 274 def pop_trace_execution_flag #THREAD_LOCAL_ACCESS TransactionState.tl_get.pop_traced end
Push flag indicating whether we should be tracing in this thread. This uses a stack which allows us to disable tracing children of a transaction without affecting the tracing of the whole transaction
# File lib/mmtrix/agent/agent.rb, line 268 def push_trace_execution_flag(should_trace=false) #THREAD_LOCAL_ACCESS TransactionState.tl_get.push_traced(should_trace) end
Clear out state for any objects that we know lock from our parents This is necessary for cases where we’re in a forked child and Ruby might be holding locks for background thread that aren’t there anymore.
# File lib/mmtrix/agent/agent.rb, line 565 def reset_objects_with_locks @stats_engine = Mmtrix::Agent::StatsEngine.new end
Deprecated, and not part of the public API, but here for backwards compatibility because some 3rd-party gems call it. @deprecated
# File lib/mmtrix/agent/agent.rb, line 560 def reset_stats; drop_buffered_data; end
# File lib/mmtrix/agent/agent.rb, line 216 def revert_to_default_configuration Mmtrix::Agent.config.remove_config_type(:manual) Mmtrix::Agent.config.remove_config_type(:server) end
Sets a thread local variable as to whether we should or should not record sql in the current thread. Returns the previous value, if there is one
# File lib/mmtrix/agent/agent.rb, line 247 def set_record_sql(should_record) #THREAD_LOCAL_ACCESS state = TransactionState.tl_get prev = state.record_sql state.record_sql = should_record prev.nil? || prev end
Sets a thread local variable as to whether we should or should not record transaction traces in the current thread. Returns the previous value, if there is one
# File lib/mmtrix/agent/agent.rb, line 257 def set_record_tt(should_record) #THREAD_LOCAL_ACCESS state = TransactionState.tl_get prev = state.record_tt state.record_tt = should_record prev.nil? || prev end
Attempt a graceful shutdown of the agent, flushing any remaining data.
# File lib/mmtrix/agent/agent.rb, line 203 def shutdown return unless started? ::Mmtrix::Agent.logger.info "Starting Agent shutdown" stop_event_loop trap_signals_for_litespeed untraced_graceful_disconnect revert_to_default_configuration @started = nil Control.reset end
Logs a bunch of data and starts the agent, if needed
# File lib/mmtrix/agent/agent.rb, line 535 def start return unless agent_should_start? log_startup check_config_and_start_agent log_version_and_pid events.subscribe(:finished_configuring) do log_ignore_url_regexes end end
True if we have initialized and completed ‘start’
# File lib/mmtrix/agent/agent.rb, line 197 def started? @started end
# File lib/mmtrix/agent/agent.rb, line 221 def stop_event_loop @event_loop.stop if @event_loop end
# File lib/mmtrix/agent/agent.rb, line 225 def trap_signals_for_litespeed # if litespeed, then ignore all future SIGUSR1 - it's # litespeed trying to shut us down if Agent.config[:dispatcher] == :litespeed Signal.trap("SIGUSR1", "IGNORE") Signal.trap("SIGTERM", "IGNORE") end end
# File lib/mmtrix/agent/agent.rb, line 234 def untraced_graceful_disconnect begin Mmtrix::Agent.disable_all_tracing do graceful_disconnect end rescue => e ::Mmtrix::Agent.logger.error e end end
Private Instance Methods
# File lib/mmtrix/agent/agent.rb, line 1061 def check_for_and_handle_agent_commands begin @agent_command_router.check_for_and_handle_agent_commands rescue ForceRestartException, ForceDisconnectException raise rescue ServerConnectionException => e log_remote_unavailable(:get_agent_commands, e) rescue => e Mmtrix::Agent.logger.info("Error during check_for_and_handle_agent_commands, will retry later: ", e) end end
Connect
to the server and validate the license. If successful, connected? returns true when finished. If not successful, you can keep calling this. Return false if we could not establish a connection with the server and we should not retry, such as if there’s a bad license key.
Set keep_retrying=false to disable retrying and return asap, such as when invoked in the foreground. Otherwise this runs until a successful connection is made, or the server rejects us.
-
:keep_retrying => false
to only try to connect once, and return with the connection set to nil. This ensures we may try again later (default true). -
force_reconnect => true
if you want to establish a new connection to the server before running the worker loop. This means you get a separate agent run andMmtrix
sees it as a separate instance (default is false).
# File lib/mmtrix/agent/agent.rb, line 927 def connect(options={}) defaults = { :keep_retrying => Agent.config[:keep_retrying], :force_reconnect => Agent.config[:force_reconnect] } opts = defaults.merge(options) return unless should_connect?(opts[:force_reconnect]) ::Mmtrix::Agent.logger.debug "Connecting Process to Mmtrix: #$0" query_server_for_configuration @connected_pid = $$ @connect_state = :connected rescue Mmtrix::Agent::ForceDisconnectException => e handle_force_disconnect(e) rescue Mmtrix::Agent::LicenseException => e handle_license_error(e) rescue Mmtrix::Agent::UnrecoverableAgentException => e handle_unrecoverable_agent_error(e) rescue StandardError, Timeout::Error, Mmtrix::Agent::ServerConnectionException => e log_error(e) if opts[:keep_retrying] note_connect_failure ::Mmtrix::Agent.logger.info "Will re-attempt in #{connect_retry_period} seconds" sleep connect_retry_period retry else disconnect end rescue Exception => e ::Mmtrix::Agent.logger.error "Exception of unexpected type during Agent#connect():", e raise end
# File lib/mmtrix/agent/agent.rb, line 890 def container_for_endpoint(endpoint) case endpoint when :metric_data then @stats_engine when :transaction_sample_data then @transaction_sampler when :error_data then @error_collector when :analytic_event_data then @transaction_event_aggregator when :custom_event_data then @custom_event_aggregator when :sql_trace_data then @sql_sampler end end
A shorthand for Mmtrix::Control.instance
# File lib/mmtrix/agent/agent.rb, line 709 def control Mmtrix::Control.instance end
Delegates to the control class to determine the root directory of this project
# File lib/mmtrix/agent/agent.rb, line 973 def determine_home_directory control.root end
Who am I? Well, this method can tell you your hostname.
# File lib/mmtrix/agent/agent.rb, line 963 def determine_host Mmtrix::Agent::Hostname.get end
This method contacts the server to send remaining data and let the server know that the agent is shutting down - this allows us to do things like accurately set the end of the lifetime of the process
If this process comes from a parent process, it will not disconnect, so that the parent process can continue to send data
# File lib/mmtrix/agent/agent.rb, line 1124 def graceful_disconnect if connected? begin @service.request_timeout = 10 @events.notify(:before_shutdown) transmit_data transmit_event_data if @connected_pid == $$ && !@service.kind_of?(Mmtrix::Agent::MmtrixService) ::Mmtrix::Agent.logger.debug "Sending Mmtrix service agent run shutdown message" @service.shutdown(Time.now.to_f) else ::Mmtrix::Agent.logger.debug "This agent connected from parent process #{@connected_pid}--not sending shutdown" end ::Mmtrix::Agent.logger.debug "Graceful disconnect complete" rescue Timeout::Error, StandardError => e ::Mmtrix::Agent.logger.debug "Error when disconnecting #{e.class.name}: #{e.message}" end else ::Mmtrix::Agent.logger.debug "Bypassing graceful disconnect - agent not connected" end end
# File lib/mmtrix/agent/agent.rb, line 1056 def harvest_and_send_analytic_event_data harvest_and_send_from_container(@transaction_event_aggregator, :analytic_event_data) harvest_and_send_from_container(@custom_event_aggregator, :custom_event_data) end
# File lib/mmtrix/agent/agent.rb, line 1052 def harvest_and_send_errors harvest_and_send_from_container(@error_collector, :error_data) end
# File lib/mmtrix/agent/agent.rb, line 1048 def harvest_and_send_for_agent_commands harvest_and_send_from_container(@agent_command_router, :profile_data) end
Harvests data from the given container, sends it to the named endpoint on the service, and automatically merges back in upon a recoverable failure.
The given container should respond to:
#harvest! returns an enumerable collection of data items to be sent to the collector. #reset! drop any stored data and reset to a clean state. #merge!(items) merge the given items back into the internal buffer of the container, so that they may be harvested again later.
# File lib/mmtrix/agent/agent.rb, line 994 def harvest_and_send_from_container(container, endpoint) items = harvest_from_container(container, endpoint) send_data_to_endpoint(endpoint, items, container) unless items.empty? end
# File lib/mmtrix/agent/agent.rb, line 1034 def harvest_and_send_slowest_sql harvest_and_send_from_container(@sql_sampler, :sql_trace_data) end
# File lib/mmtrix/agent/agent.rb, line 1029 def harvest_and_send_timeslice_data Mmtrix::Agent::BusyCalculator.harvest_busy harvest_and_send_from_container(@stats_engine, :metric_data) end
This handles getting the transaction traces and then sending them across the wire. This includes gathering SQL explanations, stripping out stack traces, and normalizing SQL. note that we explain only the sql statements whose nodes’ execution times exceed our threshold (to avoid unnecessary overhead of running explains on fast queries.)
# File lib/mmtrix/agent/agent.rb, line 1044 def harvest_and_send_transaction_traces harvest_and_send_from_container(@transaction_sampler, :transaction_sample_data) end
# File lib/mmtrix/agent/agent.rb, line 999 def harvest_from_container(container, endpoint) items = [] begin items = container.harvest! rescue => e Mmtrix::Agent.logger.error("Failed to harvest #{endpoint} data, resetting. Error: ", e) container.reset! end items end
# File lib/mmtrix/agent/agent.rb, line 967 def local_host @local_host ||= determine_host end
# File lib/mmtrix/agent/agent.rb, line 1010 def send_data_to_endpoint(endpoint, items, container) Mmtrix::Agent.logger.debug("Sending #{items.size} items to #{endpoint}") begin @service.send(endpoint, items) rescue ForceRestartException, ForceDisconnectException raise rescue SerializationError => e Mmtrix::Agent.logger.warn("Failed to serialize data for #{endpoint}, discarding. Error: ", e) rescue UnrecoverableServerException => e Mmtrix::Agent.logger.warn("#{endpoint} data was rejected by remote service, discarding. Error: ", e) rescue ServerConnectionException => e log_remote_unavailable(endpoint, e) container.merge!(items) rescue => e Mmtrix::Agent.logger.info("Unable to send #{endpoint} data, will try again later. Error: ", e) container.merge!(items) end end
Try to launch the worker thread and connect to the server.
See connect
for a description of connection_options.
# File lib/mmtrix/agent/agent.rb, line 696 def start_worker_thread(connection_options = {}) if disable = Mmtrix::Agent.config[:disable_harvest_thread] Mmtrix::Agent.logger.info "Not starting Ruby Agent worker thread because :disable_harvest_thread is #{disable}" return end ::Mmtrix::Agent.logger.debug "Creating Ruby Agent worker thread." @worker_thread = Mmtrix::Agent::Threading::AgentThread.create('Worker Loop') do deferred_work!(connection_options) end end
# File lib/mmtrix/agent/agent.rb, line 1097 def transmit_data now = Time.now ::Mmtrix::Agent.logger.debug "Sending data to Mmtrix Service" @events.notify(:before_harvest) @service.session do # use http keep-alive harvest_and_send_errors harvest_and_send_transaction_traces harvest_and_send_slowest_sql harvest_and_send_timeslice_data check_for_and_handle_agent_commands harvest_and_send_for_agent_commands end ensure Mmtrix::Agent::Database.close_connections duration = (Time.now - now).to_f Mmtrix::Agent.record_metric('Supportability/Harvest', duration) end
# File lib/mmtrix/agent/agent.rb, line 1079 def transmit_event_data transmit_single_data_type(:harvest_and_send_analytic_event_data, "TransactionEvent") end
# File lib/mmtrix/agent/agent.rb, line 1083 def transmit_single_data_type(harvest_method, supportability_name) now = Time.now msg = "Sending #{harvest_method.to_s.gsub("harvest_and_send_", "")} to Mmtrix Service" ::Mmtrix::Agent.logger.debug msg @service.session do # use http keep-alive self.send(harvest_method) end ensure duration = (Time.now - now).to_f Mmtrix::Agent.record_metric("Supportability/#{supportability_name}Harvest", duration) end