class Mmtrix::Agent::TransactionEventAggregator

Constants

APDEX_PERF_ZONE_KEY
CAT_ALTERNATE_PATH_HASHES_KEY
CAT_PATH_HASH_KEY
CAT_REFERRING_PATH_HASH_KEY
CAT_TRIP_ID_KEY
DURATION_KEY
EMPTY_HASH

To avoid allocations when we have empty custom or agent attributes

GUID_KEY
NAME_KEY
OVERVIEW_SPECS
REFERRING_TRANSACTION_GUID_KEY
SAMPLE_TYPE

The type field of the sample

SYNTHETICS_JOB_ID_KEY
SYNTHETICS_MONITOR_ID_KEY
SYNTHETICS_RESOURCE_ID_KEY
TIMESTAMP_KEY
TYPE_KEY

Strings for static keys of the sample structure

Public Class Methods

map_metric(metric_name, to_add={}) click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 172
def self.map_metric(metric_name, to_add={})
  to_add.values.each(&:freeze)

  mappings = OVERVIEW_SPECS.fetch(metric_name, {})
  mappings.merge!(to_add)

  OVERVIEW_SPECS[metric_name] = mappings
end
new( event_listener ) click to toggle source
Calls superclass method Object::new
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 36
def initialize( event_listener )
  super()

  @enabled       = false
  @notified_full = false

  @samples            = ::Mmtrix::Agent::SampledBuffer.new(Mmtrix::Agent.config[:'analytics_events.max_samples_stored'])
  @synthetics_samples = ::Mmtrix::Agent::SyntheticsEventBuffer.new(Mmtrix::Agent.config[:'synthetics.events_limit'])

  event_listener.subscribe( :transaction_finished, &method(:on_transaction_finished) )
  self.register_config_callbacks
end

Public Instance Methods

append_cat_alternate_path_hashes(sample, payload) click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 233
def append_cat_alternate_path_hashes(sample, payload)
  if payload.include?(:cat_alternate_path_hashes)
    sample[CAT_ALTERNATE_PATH_HASHES_KEY] = payload[:cat_alternate_path_hashes].sort.join(',')
  end
end
append_event(event) click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 156
def append_event(event)
  main_event, _ = event

  if main_event.include?(SYNTHETICS_RESOURCE_ID_KEY)
    # Try adding to synthetics buffer. If anything is rejected, give it a
    # shot in the main transaction events (where it may get sampled)
    _, rejected = @synthetics_samples.append_with_reject(event)

    if rejected
      @samples.append(rejected)
    end
  else
    @samples.append(event)
  end
end
append_metrics(txn_metrics, sample) click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 199
def append_metrics(txn_metrics, sample)
  if txn_metrics
    OVERVIEW_SPECS.each do |(name, extracted_values)|
      if txn_metrics.has_key?(name)
        stat = txn_metrics[name]
        extracted_values.each do |value_name, key_name|
          sample[key_name] = stat.send(value_name)
        end
      end
    end
  end
end
create_agent_attributes(attributes) click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 255
def create_agent_attributes(attributes)
  if attributes
    agent_attributes = attributes.agent_attributes_for(Mmtrix::Agent::AttributeFilter::DST_TRANSACTION_EVENTS)
    agent_attributes = agent_attributes
    agent_attributes.freeze
  else
    EMPTY_HASH
  end
end
create_custom_attributes(attributes) click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 245
def create_custom_attributes(attributes)
  if attributes
    custom_attributes = attributes.custom_attributes_for(Mmtrix::Agent::AttributeFilter::DST_TRANSACTION_EVENTS)
    custom_attributes = custom_attributes
    custom_attributes.freeze
  else
    EMPTY_HASH
  end
end
create_main_event(payload) click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 212
def create_main_event(payload)
  sample = {
    TIMESTAMP_KEY => float(payload[:start_timestamp]),
    NAME_KEY      => string(payload[:name]),
    DURATION_KEY  => float(payload[:duration]),
    TYPE_KEY      => SAMPLE_TYPE,
  }
  append_metrics(payload[:metrics], sample)
  optionally_append(GUID_KEY,                       :guid, sample, payload)
  optionally_append(REFERRING_TRANSACTION_GUID_KEY, :referring_transaction_guid, sample, payload)
  optionally_append(CAT_TRIP_ID_KEY,                :cat_trip_id, sample, payload)
  optionally_append(CAT_PATH_HASH_KEY,              :cat_path_hash, sample, payload)
  optionally_append(CAT_REFERRING_PATH_HASH_KEY,    :cat_referring_path_hash, sample, payload)
  optionally_append(APDEX_PERF_ZONE_KEY,            :apdex_perf_zone, sample, payload)
  optionally_append(SYNTHETICS_RESOURCE_ID_KEY,     :synthetics_resource_id, sample, payload)
  optionally_append(SYNTHETICS_JOB_ID_KEY,          :synthetics_job_id, sample, payload)
  optionally_append(SYNTHETICS_MONITOR_ID_KEY,      :synthetics_monitor_id, sample, payload)
  append_cat_alternate_path_hashes(sample, payload)
  sample
end
harvest!() click to toggle source

Clear any existing samples, reset the last sample time, and return the previous set of samples. (Synchronized)

# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 81
def harvest!
  old_samples, sample_count, request_count, synthetics_dropped = reset!
  record_sampling_rate(request_count, sample_count) if @enabled
  record_dropped_synthetics(synthetics_dropped)
  old_samples
end
merge!(old_samples) click to toggle source

Merge samples back into the buffer, for example after a failed transmission to the collector. (Synchronized)

# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 90
def merge!(old_samples)
  self.synchronize do
    old_samples.each { |s| append_event(s) }
  end
end
notify_full() click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 138
def notify_full
  Mmtrix::Agent.logger.debug "Transaction event capacity of #{@samples.capacity} reached, beginning sampling"
  @notified_full = true
end
on_transaction_finished(payload) click to toggle source

Event handler for the :transaction_finished event.

# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 144
def on_transaction_finished(payload)
  return unless @enabled

  attributes = payload[:attributes]
  main_event = create_main_event(payload)
  custom_attributes = create_custom_attributes(attributes)
  agent_attributes  = create_agent_attributes(attributes)

  self.synchronize { append_event([main_event, custom_attributes, agent_attributes]) }
  notify_full if !@notified_full && @samples.full?
end
optionally_append(sample_key, payload_key, sample, payload) click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 239
def optionally_append(sample_key, payload_key, sample, payload)
  if payload.include?(payload_key)
    sample[sample_key] = string(payload[payload_key])
  end
end
record_dropped_synthetics(synthetics_dropped) click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 113
def record_dropped_synthetics(synthetics_dropped)
  return unless synthetics_dropped > 0

  Mmtrix::Agent.logger.debug("Synthetics transaction event limit (#{@samples.capacity}) reached. Further synthetics events this harvest period dropped.")

  engine = Mmtrix::Agent.instance.stats_engine
  engine.tl_record_supportability_metric_count("TransactionEventAggregator/synthetics_events_dropped", synthetics_dropped)
end
record_sampling_rate(request_count, sample_count) click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 96
def record_sampling_rate(request_count, sample_count) #THREAD_LOCAL_ACCESS
  request_count_lifetime = @samples.seen_lifetime
  sample_count_lifetime = @samples.captured_lifetime
  Mmtrix::Agent.logger.debug("Sampled %d / %d (%.1f %%) requests this cycle, %d / %d (%.1f %%) since startup" % [
    sample_count,
    request_count,
    (sample_count.to_f / request_count * 100.0),
    sample_count_lifetime,
    request_count_lifetime,
    (sample_count_lifetime.to_f / request_count_lifetime * 100.0)
  ])

  engine = Mmtrix::Agent.instance.stats_engine
  engine.tl_record_supportability_metric_count("TransactionEventAggregator/requests", request_count)
  engine.tl_record_supportability_metric_count("TransactionEventAggregator/samples", sample_count)
end
register_config_callbacks() click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 122
def register_config_callbacks
  Mmtrix::Agent.config.register_callback(:'analytics_events.max_samples_stored') do |max_samples|
    Mmtrix::Agent.logger.debug "TransactionEventAggregator max_samples set to #{max_samples}"
    self.synchronize { @samples.capacity = max_samples }
  end

  Mmtrix::Agent.config.register_callback(:'synthetics.events_limit') do |max_samples|
    Mmtrix::Agent.logger.debug "TransactionEventAggregator limit for synthetics events set to #{max_samples}"
    self.synchronize { @synthetics_samples.capacity = max_samples }
  end

  Mmtrix::Agent.config.register_callback(:'analytics_events.enabled') do |enabled|
    @enabled = enabled
  end
end
reset!() click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 59
def reset!
  sample_count, request_count, synthetics_dropped = 0
  old_samples = nil

  self.synchronize do
    sample_count = @samples.size
    request_count = @samples.num_seen

    synthetics_dropped = @synthetics_samples.num_dropped

    old_samples = @samples.to_a + @synthetics_samples.to_a
    @samples.reset!
    @synthetics_samples.reset!

    @notified_full = false
  end

  [old_samples, sample_count, request_count, synthetics_dropped]
end
samples() click to toggle source

Fetch a copy of the sampler’s gathered samples. (Synchronized)

# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 55
def samples
  return self.synchronize { @samples.to_a.concat(@synthetics_samples.to_a) }
end