class Librato::Collector::Aggregator

maintains storage of timing and measurement type measurements

Constants

SEPARATOR

Attributes

default_tags[R]

Public Class Methods

new(options={}) click to toggle source
# File lib/librato/collector/aggregator.rb, line 16
def initialize(options={})
  @cache = Librato::Metrics::Aggregator.new(prefix: options[:prefix])
  @percentiles = {}
  @lock = Mutex.new
  @default_tags = options.fetch(:default_tags, {})
end

Public Instance Methods

[](key) click to toggle source
# File lib/librato/collector/aggregator.rb, line 23
def [](key)
  fetch(key)
end
delete_all() click to toggle source

clear all stored values

# File lib/librato/collector/aggregator.rb, line 45
def delete_all
  @lock.synchronize { clear_storage }
end
fetch(key, options={}) click to toggle source

retrieve current value of a metric/source/percentage. this exists primarily for debugging/testing and isn't called routinely.

# File lib/librato/collector/aggregator.rb, line 29
def fetch(key, options={})
  return nil if @cache.empty?
  return fetch_percentile(key, options) if options[:percentile]
  measurements = nil
  tags = options[:tags] || @default_tags
  @lock.synchronize { measurements = @cache.queued[:measurements] }
  measurements.each do |metric|
    if metric[:name] == key.to_s
      return metric if !tags && !metric[:tags]
      return metric if tags == metric[:tags]
    end
  end
  nil
end
flush_to(queue, opts={}) click to toggle source

transfer all measurements to queue and reset internal status

# File lib/librato/collector/aggregator.rb, line 50
def flush_to(queue, opts={})
  queued = nil
  @lock.synchronize do
    return if @cache.empty?
    queued = @cache.queued
    flush_percentiles(queue, opts) unless @percentiles.empty?
    clear_storage unless opts[:preserve]
  end
  queue.merge!(queued) if queued
end
measure(*args) { || ... } click to toggle source

@example Simple measurement

measure 'sources_returned', sources.length

@example Simple timing in milliseconds

timing 'twitter.lookup', 2.31

@example Block-based timing

timing 'db.query' do
  do_my_query
end

@example Custom source

measure 'user.all_orders', user.order_count, :source => user.id
# File lib/librato/collector/aggregator.rb, line 75
def measure(*args, &block)
  options = {}
  event = args[0].to_s
  returned = nil

  # handle block or specified argument
  if block_given?
    start = Time.now
    returned = yield
    value = ((Time.now - start) * 1000.0).to_i
  elsif args[1]
    value = args[1]
  else
    raise "no value provided"
  end

  # detect options hash if present
  if args.length > 1 and args[-1].respond_to?(:each)
    options = args[-1]
  end

  percentiles = Array(options[:percentile])
  source = options[:source]
  tags_option = options[:tags]
  tags_option = { source: source } if source && !tags_option
  tags =
    if tags_option && options[:inherit_tags]
      @default_tags.merge(tags_option)
    elsif tags_option
      tags_option
    else
      @default_tags
    end

  @lock.synchronize do
    payload = { value: value }
    payload.merge!({ tags: tags }) if tags
    @cache.add event => payload

    percentiles.each do |perc|
      store = fetch_percentile_store(event, payload)
      store[:reservoir] << value
      track_percentile(store, perc)
    end
  end
  returned
end
Also aliased as: timing
timing(*args, &block)
Alias for: measure

Private Instance Methods

clear_storage() click to toggle source
# File lib/librato/collector/aggregator.rb, line 126
def clear_storage
  @cache.clear
  @percentiles.each do |key, val|
    val[:reservoir].clear
    val[:percs].clear
  end
end
fetch_percentile(key, options) click to toggle source
# File lib/librato/collector/aggregator.rb, line 134
def fetch_percentile(key, options)
  store = fetch_percentile_store(key, options)
  return nil unless store
  store[:reservoir].percentile(options[:percentile])
end
fetch_percentile_store(event, options) click to toggle source
# File lib/librato/collector/aggregator.rb, line 140
def fetch_percentile_store(event, options)
  keyname = event

  if options[:tags]
    keyname = Librato::Metrics::Util.build_key_for(keyname, options[:tags])
  end

  @percentiles[keyname] ||= {
    name: event,
    reservoir: Hetchy::Reservoir.new(size: 1000),
    percs: Set.new
  }
  @percentiles[keyname].merge!({ tags: options[:tags] }) if options && options[:tags]
  @percentiles[keyname]
end
flush_percentiles(queue, opts) click to toggle source
# File lib/librato/collector/aggregator.rb, line 156
def flush_percentiles(queue, opts)
  @percentiles.each do |key, val|
    val[:percs].each do |perc|
      perc_name = perc.to_s[0,5].gsub('.','')
      payload =
        if val[:tags]
          { value: val[:reservoir].percentile(perc), tags: val[:tags] }
        else
          val[:reservoir].percentile(perc)
        end
      queue.add "#{val[:name]}.p#{perc_name}" => payload
    end
  end
end
track_percentile(store, perc) click to toggle source
# File lib/librato/collector/aggregator.rb, line 171
def track_percentile(store, perc)
  if perc < 0.0 || perc > 100.0
    raise InvalidPercentile, "Percentiles must be between 0.0 and 100.0"
  end
  store[:percs].add(perc)
end