class Mmtrix::Agent::TransactionEventAggregator
Constants
- APDEX_PERF_ZONE_KEY
- CAT_ALTERNATE_PATH_HASHES_KEY
- CAT_PATH_HASH_KEY
- CAT_REFERRING_PATH_HASH_KEY
- CAT_TRIP_ID_KEY
- DURATION_KEY
- EMPTY_HASH
To avoid allocations when we have empty custom or agent attributes
- GUID_KEY
- NAME_KEY
- OVERVIEW_SPECS
- REFERRING_TRANSACTION_GUID_KEY
- SAMPLE_TYPE
The type field of the sample
- SYNTHETICS_JOB_ID_KEY
- SYNTHETICS_MONITOR_ID_KEY
- SYNTHETICS_RESOURCE_ID_KEY
- TIMESTAMP_KEY
- TYPE_KEY
Strings for static keys of the sample structure
Public Class Methods
map_metric(metric_name, to_add={})
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 172 def self.map_metric(metric_name, to_add={}) to_add.values.each(&:freeze) mappings = OVERVIEW_SPECS.fetch(metric_name, {}) mappings.merge!(to_add) OVERVIEW_SPECS[metric_name] = mappings end
new( event_listener )
click to toggle source
Calls superclass method
Object::new
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 36 def initialize( event_listener ) super() @enabled = false @notified_full = false @samples = ::Mmtrix::Agent::SampledBuffer.new(Mmtrix::Agent.config[:'analytics_events.max_samples_stored']) @synthetics_samples = ::Mmtrix::Agent::SyntheticsEventBuffer.new(Mmtrix::Agent.config[:'synthetics.events_limit']) event_listener.subscribe( :transaction_finished, &method(:on_transaction_finished) ) self.register_config_callbacks end
Public Instance Methods
append_cat_alternate_path_hashes(sample, payload)
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 233 def append_cat_alternate_path_hashes(sample, payload) if payload.include?(:cat_alternate_path_hashes) sample[CAT_ALTERNATE_PATH_HASHES_KEY] = payload[:cat_alternate_path_hashes].sort.join(',') end end
append_event(event)
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 156 def append_event(event) main_event, _ = event if main_event.include?(SYNTHETICS_RESOURCE_ID_KEY) # Try adding to synthetics buffer. If anything is rejected, give it a # shot in the main transaction events (where it may get sampled) _, rejected = @synthetics_samples.append_with_reject(event) if rejected @samples.append(rejected) end else @samples.append(event) end end
append_metrics(txn_metrics, sample)
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 199 def append_metrics(txn_metrics, sample) if txn_metrics OVERVIEW_SPECS.each do |(name, extracted_values)| if txn_metrics.has_key?(name) stat = txn_metrics[name] extracted_values.each do |value_name, key_name| sample[key_name] = stat.send(value_name) end end end end end
create_agent_attributes(attributes)
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 255 def create_agent_attributes(attributes) if attributes agent_attributes = attributes.agent_attributes_for(Mmtrix::Agent::AttributeFilter::DST_TRANSACTION_EVENTS) agent_attributes = agent_attributes agent_attributes.freeze else EMPTY_HASH end end
create_custom_attributes(attributes)
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 245 def create_custom_attributes(attributes) if attributes custom_attributes = attributes.custom_attributes_for(Mmtrix::Agent::AttributeFilter::DST_TRANSACTION_EVENTS) custom_attributes = custom_attributes custom_attributes.freeze else EMPTY_HASH end end
create_main_event(payload)
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 212 def create_main_event(payload) sample = { TIMESTAMP_KEY => float(payload[:start_timestamp]), NAME_KEY => string(payload[:name]), DURATION_KEY => float(payload[:duration]), TYPE_KEY => SAMPLE_TYPE, } append_metrics(payload[:metrics], sample) optionally_append(GUID_KEY, :guid, sample, payload) optionally_append(REFERRING_TRANSACTION_GUID_KEY, :referring_transaction_guid, sample, payload) optionally_append(CAT_TRIP_ID_KEY, :cat_trip_id, sample, payload) optionally_append(CAT_PATH_HASH_KEY, :cat_path_hash, sample, payload) optionally_append(CAT_REFERRING_PATH_HASH_KEY, :cat_referring_path_hash, sample, payload) optionally_append(APDEX_PERF_ZONE_KEY, :apdex_perf_zone, sample, payload) optionally_append(SYNTHETICS_RESOURCE_ID_KEY, :synthetics_resource_id, sample, payload) optionally_append(SYNTHETICS_JOB_ID_KEY, :synthetics_job_id, sample, payload) optionally_append(SYNTHETICS_MONITOR_ID_KEY, :synthetics_monitor_id, sample, payload) append_cat_alternate_path_hashes(sample, payload) sample end
harvest!()
click to toggle source
Clear any existing samples, reset the last sample time, and return the previous set of samples. (Synchronized)
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 81 def harvest! old_samples, sample_count, request_count, synthetics_dropped = reset! record_sampling_rate(request_count, sample_count) if @enabled record_dropped_synthetics(synthetics_dropped) old_samples end
merge!(old_samples)
click to toggle source
Merge samples back into the buffer, for example after a failed transmission to the collector. (Synchronized)
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 90 def merge!(old_samples) self.synchronize do old_samples.each { |s| append_event(s) } end end
notify_full()
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 138 def notify_full Mmtrix::Agent.logger.debug "Transaction event capacity of #{@samples.capacity} reached, beginning sampling" @notified_full = true end
on_transaction_finished(payload)
click to toggle source
Event handler for the :transaction_finished event.
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 144 def on_transaction_finished(payload) return unless @enabled attributes = payload[:attributes] main_event = create_main_event(payload) custom_attributes = create_custom_attributes(attributes) agent_attributes = create_agent_attributes(attributes) self.synchronize { append_event([main_event, custom_attributes, agent_attributes]) } notify_full if !@notified_full && @samples.full? end
optionally_append(sample_key, payload_key, sample, payload)
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 239 def optionally_append(sample_key, payload_key, sample, payload) if payload.include?(payload_key) sample[sample_key] = string(payload[payload_key]) end end
record_dropped_synthetics(synthetics_dropped)
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 113 def record_dropped_synthetics(synthetics_dropped) return unless synthetics_dropped > 0 Mmtrix::Agent.logger.debug("Synthetics transaction event limit (#{@samples.capacity}) reached. Further synthetics events this harvest period dropped.") engine = Mmtrix::Agent.instance.stats_engine engine.tl_record_supportability_metric_count("TransactionEventAggregator/synthetics_events_dropped", synthetics_dropped) end
record_sampling_rate(request_count, sample_count)
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 96 def record_sampling_rate(request_count, sample_count) #THREAD_LOCAL_ACCESS request_count_lifetime = @samples.seen_lifetime sample_count_lifetime = @samples.captured_lifetime Mmtrix::Agent.logger.debug("Sampled %d / %d (%.1f %%) requests this cycle, %d / %d (%.1f %%) since startup" % [ sample_count, request_count, (sample_count.to_f / request_count * 100.0), sample_count_lifetime, request_count_lifetime, (sample_count_lifetime.to_f / request_count_lifetime * 100.0) ]) engine = Mmtrix::Agent.instance.stats_engine engine.tl_record_supportability_metric_count("TransactionEventAggregator/requests", request_count) engine.tl_record_supportability_metric_count("TransactionEventAggregator/samples", sample_count) end
register_config_callbacks()
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 122 def register_config_callbacks Mmtrix::Agent.config.register_callback(:'analytics_events.max_samples_stored') do |max_samples| Mmtrix::Agent.logger.debug "TransactionEventAggregator max_samples set to #{max_samples}" self.synchronize { @samples.capacity = max_samples } end Mmtrix::Agent.config.register_callback(:'synthetics.events_limit') do |max_samples| Mmtrix::Agent.logger.debug "TransactionEventAggregator limit for synthetics events set to #{max_samples}" self.synchronize { @synthetics_samples.capacity = max_samples } end Mmtrix::Agent.config.register_callback(:'analytics_events.enabled') do |enabled| @enabled = enabled end end
reset!()
click to toggle source
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 59 def reset! sample_count, request_count, synthetics_dropped = 0 old_samples = nil self.synchronize do sample_count = @samples.size request_count = @samples.num_seen synthetics_dropped = @synthetics_samples.num_dropped old_samples = @samples.to_a + @synthetics_samples.to_a @samples.reset! @synthetics_samples.reset! @notified_full = false end [old_samples, sample_count, request_count, synthetics_dropped] end
samples()
click to toggle source
Fetch a copy of the sampler’s gathered samples. (Synchronized)
# File lib/mmtrix/agent/transaction_event_aggregator.rb, line 55 def samples return self.synchronize { @samples.to_a.concat(@synthetics_samples.to_a) } end