class Mmtrix::Agent::MmtrixService

Constants

CONNECTION_ERRORS

These include Errno connection errors, and all indicate that the underlying TCP connection may be in a bad state.

PROTOCOL_VERSION

Specifies the version of the agent’s communication protocol with the Mmtrix hosted site.

Attributes

agent_id[RW]
collector[R]
marshaller[R]
metric_id_cache[R]
request_timeout[RW]

Public Class Methods

new(license_key=nil, collector=control.server) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 36
def initialize(license_key=nil, collector=control.server)
  @license_key = license_key || Agent.config[:license_key]
  @collector = collector
  @request_timeout = Agent.config[:timeout]
  @metric_id_cache = {}

  @audit_logger = ::Mmtrix::Agent::AuditLogger.new
  Agent.config.register_callback(:'audit_log.enabled') do |enabled|
    @audit_logger.enabled = enabled
  end
  Agent.config.register_callback(:ssl) do |ssl|
    if !ssl
      ::Mmtrix::Agent.logger.warn("Agent is configured not to use SSL when communicating with Mmtrix's servers")
    else
      ::Mmtrix::Agent.logger.debug("Agent is configured to use SSL")
    end
  end

  Agent.config.register_callback(:marshaller) do |marshaller|
    begin
      if marshaller == 'json'
        @marshaller = JsonMarshaller.new
      else
        @marshaller = PrubyMarshaller.new
      end
    rescue LoadError
      ::Mmtrix::Agent.logger.warn("JSON marshaller requested, but the 'json' gem was not available, falling back to pruby. This will not be supported in future versions of the agent.")
      @marshaller = PrubyMarshaller.new
    end
  end
end

Public Instance Methods

agent_command_results(results) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 166
def agent_command_results(results)
  invoke_remote(:agent_command_results, [@agent_id, results])
end
analytic_event_data(data) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 174
def analytic_event_data(data)
  invoke_remote(:analytic_event_data, [@agent_id, data],
    :item_count => data.size)
end
build_metric_data_array(stats_hash) click to toggle source

The collector wants to recieve metric data in a format that’s different from how we store it internally, so this method handles the translation. It also handles translating metric names to IDs using our metric ID cache.

# File lib/mmtrix/agent/mmtrix_service.rb, line 113
def build_metric_data_array(stats_hash)
  metric_data_array = []
  stats_hash.each do |metric_spec, stats|
    # Omit empty stats as an optimization
    unless stats.is_reset?
      metric_id = metric_id_cache[metric_spec]
      metric_data = if metric_id
        Mmtrix::MetricData.new(nil, stats, metric_id)
      else
        Mmtrix::MetricData.new(metric_spec, stats, nil)
      end
      metric_data_array << metric_data
    end
  end
  metric_data_array
end
cert_file_path() click to toggle source

The path to the certificate file used to verify the SSL connection if verify_peer is enabled

# File lib/mmtrix/agent/mmtrix_service.rb, line 332
def cert_file_path
  if path_override = Mmtrix::Agent.config[:ca_bundle_path]
    Mmtrix::Agent.logger.warn("Couldn't find CA bundle from configured ca_bundle_path: #{path_override}") unless File.exist? path_override
    path_override
  else
    File.expand_path(File.join(control.mmtrix_root, 'cert', 'cacert.pem'))
  end
end
close_shared_connection() click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 240
def close_shared_connection
  if @shared_tcp_connection
    ::Mmtrix::Agent.logger.debug("Closing shared TCP connection to #{@shared_tcp_connection.address}:#{@shared_tcp_connection.port}")
    @shared_tcp_connection.finish if @shared_tcp_connection.started?
    @shared_tcp_connection = nil
  end
end
compress_request_if_needed(data) click to toggle source

We do not compress if content is smaller than 64kb. There are problems with bugs in Ruby in some versions that expose us to a risk of segfaults if we compress aggressively.

# File lib/mmtrix/agent/mmtrix_service.rb, line 187
def compress_request_if_needed(data)
  encoding = 'identity'
  if data.size > 64 * 1024
    data = Encoders::Compressed.encode(data)
    encoding = 'deflate'
  end
  check_post_size(data)
  [data, encoding]
end
connect(settings={}) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 68
def connect(settings={})
  # if host = get_redirect_host
  #   @collector = Mmtrix::Control.instance.server_from_host(host)
  # end
  response = invoke_remote(:connect, [settings])
  @agent_id = response['agent_run_id']
  response
end
create_and_start_http_connection() click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 324
def create_and_start_http_connection
  conn = create_http_connection
  start_connection(conn)
  conn
end
create_http_connection() click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 302
def create_http_connection
  if Agent.config[:proxy_host]
    ::Mmtrix::Agent.logger.debug("Using proxy server #{Agent.config[:proxy_host]}:#{Agent.config[:proxy_port]}")

    proxy = Net::HTTP::Proxy(
      Agent.config[:proxy_host],
      Agent.config[:proxy_port],
      Agent.config[:proxy_user],
      Agent.config[:proxy_pass]
    )
    conn = proxy.new(@collector.name, @collector.port)
  else
    conn = Net::HTTP.new(@collector.name, @collector.port)
  end

  setup_connection_for_ssl(conn) if Agent.config[:ssl]
  setup_connection_timeouts(conn)

  ::Mmtrix::Agent.logger.debug("Created net/http handle to #{conn.address}:#{conn.port}")
  conn
end
custom_event_data(data) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 179
def custom_event_data(data)
  invoke_remote(:custom_event_data, [@agent_id, data],
    :item_count => data.size)
end
error_data(unsent_errors) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 143
def error_data(unsent_errors)
  invoke_remote(:error_data, [@agent_id, unsent_errors],
    :item_count => unsent_errors.size)
end
establish_shared_connection() click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 233
def establish_shared_connection
  unless @shared_tcp_connection
    @shared_tcp_connection = create_and_start_http_connection
  end
  @shared_tcp_connection
end
fill_metric_id_cache(pairs_of_specs_and_ids) click to toggle source

takes an array of arrays of spec and id, adds it into the metric cache so we can save the collector some work by sending integers instead of strings the next time around

# File lib/mmtrix/agent/mmtrix_service.rb, line 97
def fill_metric_id_cache(pairs_of_specs_and_ids)
  Array(pairs_of_specs_and_ids).each do |metric_spec_hash, metric_id|
    metric_spec = MetricSpec.new(metric_spec_hash['name'],
                                 metric_spec_hash['scope'])
    metric_id_cache[metric_spec] = metric_id
  end
rescue => e
  # If we've gotten this far, we don't want this error to propagate and
  # make this post appear to have been non-successful, which would trigger
  # re-aggregation of the same metric data into the next post, so just log
  Mmtrix::Agent.logger.error("Failed to fill metric ID cache from response, error details follow ", e)
end
force_restart() click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 89
def force_restart
  reset_metric_id_cache
  close_shared_connection
end
get_agent_commands() click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 162
def get_agent_commands
  invoke_remote(:get_agent_commands, [@agent_id])
end
get_redirect_host() click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 77
def get_redirect_host
  invoke_remote(:get_redirect_host)
end
get_xray_metadata(xray_ids) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 170
def get_xray_metadata(xray_ids)
  invoke_remote(:get_xray_metadata, [@agent_id, *xray_ids])
end
has_shared_connection?() click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 248
def has_shared_connection?
  !@shared_tcp_connection.nil?
end
http_connection() click to toggle source

Return a Net::HTTP connection object to make a call to the collector. We’ll reuse the same handle for cases where we’re using keep-alive, or otherwise create a new one.

# File lib/mmtrix/agent/mmtrix_service.rb, line 266
def http_connection
  if @in_session
    establish_shared_connection
  else
    create_http_connection
  end
end
metric_data(stats_hash) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 130
def metric_data(stats_hash)
  timeslice_start = stats_hash.started_at
  timeslice_end  = stats_hash.harvested_at || Time.now
  metric_data_array = build_metric_data_array(stats_hash)
  result = invoke_remote(
    :metric_data,
    [@agent_id, timeslice_start.to_f, timeslice_end.to_f, metric_data_array],
    :item_count => metric_data_array.size
  )
  fill_metric_id_cache(result)
  result
end
profile_data(profile) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 158
def profile_data(profile)
  invoke_remote(:profile_data, [@agent_id, profile], :skip_normalization => true) || ''
end
reset_metric_id_cache() click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 85
def reset_metric_id_cache
  @metric_id_cache = {}
end
session(&block) click to toggle source

One session with the service’s endpoint. In this case the session represents 1 tcp connection which may transmit multiple HTTP requests via keep-alive.

# File lib/mmtrix/agent/mmtrix_service.rb, line 200
def session(&block)
  raise ArgumentError, "#{self.class}#shared_connection must be passed a block" unless block_given?

  begin
    t0 = Time.now
    @in_session = true
    if Mmtrix::Agent.config[:aggressive_keepalive]
      session_with_keepalive(&block)
    else
      session_without_keepalive(&block)
    end
  rescue *CONNECTION_ERRORS => e
    elapsed = Time.now - t0
    raise Mmtrix::Agent::ServerConnectionException, "Recoverable error connecting to #{@collector} after #{elapsed} seconds: #{e}"
  ensure
    @in_session = false
  end
end
session_with_keepalive(&block) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 219
def session_with_keepalive(&block)
  establish_shared_connection
  block.call
end
session_without_keepalive(&block) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 224
def session_without_keepalive(&block)
  begin
    establish_shared_connection
    block.call
  ensure
    close_shared_connection
  end
end
setup_connection_for_ssl(conn) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 274
def setup_connection_for_ssl(conn)
  # Jruby 1.6.8 requires a gem for full ssl support and will throw
  # an error when use_ssl=(true) is called and jruby-openssl isn't
  # installed
  conn.use_ssl     = true
  conn.verify_mode = OpenSSL::SSL::VERIFY_PEER
  conn.cert_store  = ssl_cert_store
rescue StandardError, LoadError
  msg = "Agent is configured to use SSL, but SSL is not available in the environment. "
  msg << "Either disable SSL in the agent configuration, or install SSL support."
  raise UnrecoverableAgentException.new(msg)
end
setup_connection_timeouts(conn) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 293
def setup_connection_timeouts(conn)
  # We use Timeout explicitly instead of this
  conn.read_timeout = nil

  if conn.respond_to?(:keep_alive_timeout) && Mmtrix::Agent.config[:aggressive_keepalive]
    conn.keep_alive_timeout = Mmtrix::Agent.config[:keep_alive_timeout]
  end
end
shutdown(time) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 81
def shutdown(time)
  invoke_remote(:shutdown, [@agent_id, time.to_i]) if @agent_id
end
sql_trace_data(sql_traces) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 153
def sql_trace_data(sql_traces)
  invoke_remote(:sql_trace_data, [sql_traces],
    :item_count => sql_traces.size)
end
ssl_cert_store() click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 252
def ssl_cert_store
  path = cert_file_path
  if !@ssl_cert_store || path != @cached_cert_store_path
    ::Mmtrix::Agent.logger.debug("Creating SSL certificate store from file at #{path}")
    @ssl_cert_store = OpenSSL::X509::Store.new
    @ssl_cert_store.add_file(path)
    @cached_cert_store_path = path
  end
  @ssl_cert_store
end
start_connection(conn) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 287
def start_connection(conn)
  Mmtrix::Agent.logger.debug("Opening TCP connection to #{conn.address}:#{conn.port}")
  Mmtrix::TimerLib.timeout(@request_timeout) { conn.start }
  conn
end
transaction_sample_data(traces) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 148
def transaction_sample_data(traces)
  invoke_remote(:transaction_sample_data, [@agent_id, traces],
    :item_count => traces.size)
end
valid_to_marshal?(data) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 341
def valid_to_marshal?(data)
  @marshaller.dump(data)
  true
rescue StandardError, SystemStackError => e
  Mmtrix::Agent.logger.warn("Unable to marshal environment report on connect.", e)
  false
end

Private Instance Methods

check_post_size(post_string) click to toggle source

Raises an UnrecoverableServerException if the post_string is longer than the limit configured in the control object

# File lib/mmtrix/agent/mmtrix_service.rb, line 479
def check_post_size(post_string)
  return if post_string.size < Agent.config[:post_size_limit]
  ::Mmtrix::Agent.logger.debug "Tried to send too much data: #{post_string.size} bytes"
  raise UnrecoverableServerException.new('413 Request Entity Too Large')
end
control() click to toggle source

A shorthand for Mmtrix::Control.instance

# File lib/mmtrix/agent/mmtrix_service.rb, line 352
def control
  Mmtrix::Control.instance
end
decompress_response(response) click to toggle source

Decompresses the response from the server, if it is gzip encoded, otherwise returns it verbatim

# File lib/mmtrix/agent/mmtrix_service.rb, line 555
def decompress_response(response)
  if response['content-encoding'] == 'gzip'
    Zlib::GzipReader.new(StringIO.new(response.body)).read
  else
    response.body
  end
end
handle_serialization_error(method, e) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 439
def handle_serialization_error(method, e)
  Mmtrix::Agent.increment_metric("Supportability/serialization_failure")
  Mmtrix::Agent.increment_metric("Supportability/serialization_failure/#{method}")
  msg = "Failed to serialize #{method} data using #{@marshaller.class.to_s}: #{e.inspect}"
  error = SerializationError.new(msg)
  error.set_backtrace(e.backtrace)
  raise error
end
invoke_remote(method, payload = [], options = {}) click to toggle source

send a message via post to the actual server. This attempts to automatically compress the data via zlib if it is large enough to be worth compressing, and handles any errors the server may return

# File lib/mmtrix/agent/mmtrix_service.rb, line 384
def invoke_remote(method, payload = [], options = {})
  # puts "method -- #{method}"
  # puts "options -- #{options}"
  # puts "payload -- #{payload}"

  start_ts = Time.now

  data, size, serialize_finish_ts = nil
  begin
    data = @marshaller.dump(payload, options)
  rescue StandardError, SystemStackError => e
    handle_serialization_error(method, e)
  end

  # puts "data -- #{data}"

  serialize_finish_ts = Time.now

  data, encoding = compress_request_if_needed(data)
  size = data.size

  mycollect = Mmtrix::Control::Server.new("mob.mmtrix.com", "8080")
  if (method == :get_redirect_host||method == :connect)
    mycollect = Mmtrix::Control::Server.new("mob.mmtrix.com", "8080")
  end

  uri = remote_method_uri(method, @marshaller.format)
  # full_uri = "#{@collector}#{uri}"
  full_uri = "#{mycollect}#{uri}"
  # puts "full_uri -- #{full_uri}"

  @audit_logger.log_request(full_uri, payload, @marshaller)


  response = send_request(:data      => data,
                          :uri       => uri,
                          :encoding  => encoding,
                          :collector => mycollect)
  # response = send_request(:data      => data,
  #                         :uri       => uri,
  #                         :encoding  => encoding,
  #                         :collector => @collector)
  # puts "response -- #{response}"
  # puts "response -- #{decompress_response(response)}"

  @marshaller.load(decompress_response(response))


ensure
  record_timing_supportability_metrics(method, start_ts, serialize_finish_ts)
  if size
    record_size_supportability_metrics(method, size, options[:item_count])
  end
end
log_response(response) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 549
def log_response(response)
  ::Mmtrix::Agent.logger.debug "Received response, status: #{response.code}, encoding: '#{response['content-encoding']}'"
end
record_size_supportability_metrics(method, size_bytes, item_count) click to toggle source

For these metrics, we use the following fields: call_count => number of times this remote method was invoked total_call_time => total size in bytes of payloads across all invocations total_exclusive_time => total size in items (e.g. unique metrics, traces, events, etc) across all invocations

The last field doesn’t make sense for all methods (e.g. get_agent_commands), so we omit it for those methods that don’t really take collections of items as arguments.

# File lib/mmtrix/agent/mmtrix_service.rb, line 467
def record_size_supportability_metrics(method, size_bytes, item_count)
  metrics = [
    "Supportability/invoke_remote_size",
    "Supportability/invoke_remote_size/#{method.to_s}"
  ]
  # we may not have an item count, in which case, just record 0 for the exclusive time
  item_count ||= 0
  Mmtrix::Agent.agent.stats_engine.tl_record_unscoped_metrics(metrics, size_bytes, item_count)
end
record_timing_supportability_metrics(method, start_ts, serialize_finish_ts) click to toggle source
# File lib/mmtrix/agent/mmtrix_service.rb, line 448
def record_timing_supportability_metrics(method, start_ts, serialize_finish_ts)
  serialize_time = serialize_finish_ts && (serialize_finish_ts - start_ts)
  duration = (Time.now - start_ts).to_f
  Mmtrix::Agent.record_metric("Supportability/invoke_remote", duration)
  Mmtrix::Agent.record_metric("Supportability/invoke_remote/#{method.to_s}", duration)
  if serialize_time
    Mmtrix::Agent.record_metric("Supportability/invoke_remote_serialize", serialize_time)
    Mmtrix::Agent.record_metric("Supportability/invoke_remote_serialize/#{method.to_s}", serialize_time)
  end
end
remote_method_uri(method, format) click to toggle source

The path on the server that we should post our data to

# File lib/mmtrix/agent/mmtrix_service.rb, line 357
def remote_method_uri(method, format)
  # params = {'run_id' => @agent_id, 'marshal_format' => format}
  # uri = "/agent_listener/#{PROTOCOL_VERSION}/#{@license_key}/#{method}"
  # uri << '?' + params.map do |k,v|
  #   next unless v
  #   "#{k}=#{v}"
  # end.compact.join('&')

  params = {'license_key' => @license_key,'run_id' => @agent_id, 'marshal_format' => format }
  if (method == :get_redirect_host||method == :connect)
    uri = "/apmagent/rubyagent/#{method}"
  else
    uri = "/workdone_rubysdk.php/#{method}"
  end

  uri << '?' + params.map do |k,v|
    next unless v
    "#{k}=#{v}"
  end.compact.join('&')

  uri
end
send_request(opts) click to toggle source

Posts to the specified server

Options:

- :uri => the path to request on the server (a misnomer of
            course)
- :encoding => the encoding to pass to the server
- :collector => a URI object that responds to the 'name' method
                  and returns the name of the collector to
                  contact
- :data => the data to send as the body of the request
# File lib/mmtrix/agent/mmtrix_service.rb, line 495
def send_request(opts)
  request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'HOST' => opts[:collector].name)


  # puts "HOST -- #{opts[:collector].name}"
  # puts "port -- #{opts[:collector].port}"
  # request.

  request['user-agent'] = user_agent
  request.content_type = "application/octet-stream"
  request.body = opts[:data]

  response     = nil
  attempts     = 0
  max_attempts = 2

  begin
    attempts += 1
    conn = http_connection
    ::Mmtrix::Agent.logger.debug "Sending request to #{opts[:collector]}#{opts[:uri]}"
    Mmtrix::TimerLib.timeout(@request_timeout) do
      response = conn.request(request)
    end
  rescue *CONNECTION_ERRORS => e
    close_shared_connection
    if attempts < max_attempts
      ::Mmtrix::Agent.logger.debug("Retrying request to #{opts[:collector]}#{opts[:uri]} after #{e}")
      retry
    else
      raise ServerConnectionException, "Recoverable error talking to #{@collector} after #{attempts} attempts: #{e}"
    end
  end

  log_response(response)

  case response
  when Net::HTTPSuccess
    true # do nothing
  when Net::HTTPUnauthorized
    raise LicenseException, 'Invalid license key, please visit support.mmtrix.com'
  when Net::HTTPServiceUnavailable
    raise ServerConnectionException, "Service unavailable (#{response.code}): #{response.message}"
  when Net::HTTPGatewayTimeOut
    raise ServerConnectionException, "Gateway timeout (#{response.code}): #{response.message}"
  when Net::HTTPRequestEntityTooLarge
    raise UnrecoverableServerException, '413 Request Entity Too Large'
  when Net::HTTPUnsupportedMediaType
    raise UnrecoverableServerException, '415 Unsupported Media Type'
  else
    raise ServerConnectionException, "Unexpected response from server (#{response.code}): #{response.message}"
  end
  response
end
user_agent() click to toggle source

Sets the user agent for connections to the server, to conform with the HTTP spec and allow for debugging. Includes the ruby version and also zlib version if available since that may cause corrupt compression if there is a problem.

# File lib/mmtrix/agent/mmtrix_service.rb, line 567
def user_agent
  ruby_description = ''
  # note the trailing space!
  ruby_description << "(ruby #{::RUBY_VERSION} #{::RUBY_PLATFORM}) " if defined?(::RUBY_VERSION) && defined?(::RUBY_PLATFORM)
  zlib_version = ''
  zlib_version << "zlib/#{Zlib.zlib_version}" if defined?(::Zlib) && Zlib.respond_to?(:zlib_version)
  "Mmtrix-RubyAgent/#{Mmtrix::VERSION::STRING} #{ruby_description}#{zlib_version}"
end