class Librato::Metrics::Queue

Attributes

skip_measurement_times[RW]

Public Class Methods

new(opts={}) click to toggle source

@option opts [Integer] :autosubmit_count If set the queue will auto-submit any time it hits this number of measurements. @option opts [Integer] :autosubmit_interval If set the queue will auto-submit if the given number of seconds has passed when a new metric is added. @option opts [Boolean] :clear_failures Should the queue remove any queued measurements from its queue if it runs into problems with a request? (default: false) @option opts [Client] :client The client object to use to connect to Metrics. (default: Librato::Metrics.client) @option opts [Time|Integer] :measure_time A default measure_time to use for measurements added. @option opts [String] :prefix If set will apply the given prefix to all metric names of measurements added. @option opts [Boolean] :skip_measurement_times If true will not assign measurement_time to each measure as they are added. @option opts [String] :source The default source to use for measurements added.

# File lib/librato/metrics/queue.rb, line 18
def initialize(opts={})
  @queued = {}
  @autosubmit_count = opts[:autosubmit_count]
  @skip_measurement_times = opts[:skip_measurement_times]
  setup_common_options(opts)
end

Public Instance Methods

add(measurements) click to toggle source

Add a metric entry to the metric set:

@param [Hash] measurements measurements to add @return [Queue] returns self

# File lib/librato/metrics/queue.rb, line 29
def add(measurements)
  measurements.each do |key, value|
    multidimensional = has_tags?
    if value.respond_to?(:each)
      validate_parameters(value)
      metric = value
      metric[:name] = key.to_s
      type = metric.delete(:type) || metric.delete('type') || 'gauge'
    else
      metric = {name: key.to_s, value: value}
      type = :gauge
    end
    if @prefix
      metric[:name] = "#{@prefix}.#{metric[:name]}"
    end
    multidimensional = true if metric[:tags] || metric[:time]
    type = ("#{type}s").to_sym
    time_key = multidimensional ? :time : :measure_time
    metric[:time] = metric.delete(:measure_time) if multidimensional && metric[:measure_time]

    if metric[time_key]
      metric[time_key] = metric[time_key].to_i
      check_measure_time(metric)
    elsif !skip_measurement_times
      metric[time_key] = epoch_time
    end
    if multidimensional
      @queued[:measurements] ||= []
      @queued[:measurements] << metric
    else
      @queued[type] ||= []
      @queued[type] << metric
    end
  end
  submit_check
  self
end
clear() click to toggle source

Remove all queued metrics

# File lib/librato/metrics/queue.rb, line 83
def clear
  @queued = {}
end
Also aliased as: flush
counters() click to toggle source

Currently queued counters

@return [Array]

# File lib/librato/metrics/queue.rb, line 70
def counters
  @queued[:counters] || []
end
empty?() click to toggle source

Are any metrics currently queued?

@return Boolean

# File lib/librato/metrics/queue.rb, line 77
def empty?
  gauges.empty? && counters.empty? && measurements.empty?
end
flush()
Alias for: clear
gauges() click to toggle source

Currently queued gauges

@return Array

# File lib/librato/metrics/queue.rb, line 91
def gauges
  @queued[:gauges] || []
end
length()
Alias for: size
measurements() click to toggle source
# File lib/librato/metrics/queue.rb, line 95
def measurements
  @queued[:measurements] || []
end
merge!(mergeable) click to toggle source

Combines queueable measures from the given object into this queue.

@example Merging queues for more performant submission

queue1.merge!(queue2)
queue1.submit  # submits combined contents

@return self

# File lib/librato/metrics/queue.rb, line 107
def merge!(mergeable)
  if mergeable.respond_to?(:queued)
    to_merge = mergeable.queued
  elsif mergeable.respond_to?(:has_key?)
    to_merge = mergeable
  else
    raise NotMergeable
  end
  Metrics::PLURAL_TYPES.each do |type|
    if to_merge[type]
      payload = reconcile(to_merge[type], to_merge[:source])
      if @queued[type]
        @queued[type] += payload
      else
        @queued[type] = payload
      end
    end
  end

  if to_merge[:measurements]
    payload = reconcile(to_merge[:measurements], to_merge[:tags])
    if @queued[:measurements]
      @queued[:measurements] += payload
    else
      @queued[:measurements] = payload
    end
  end

  submit_check
  self
end
queued() click to toggle source

All currently queued metrics

@return Hash

# File lib/librato/metrics/queue.rb, line 142
def queued
  return {} if @queued.empty?
  globals = {}
  time = has_tags? ? :time : :measure_time
  globals[time] = @time if @time
  globals[:source] = @source if @source
  globals[:tags] = @tags if has_tags?
  @queued.merge(globals)
end
size() click to toggle source

Count of metrics currently queued

@return Integer

# File lib/librato/metrics/queue.rb, line 155
def size
  self.queued.inject(0) { |result, data| result + data.last.size }
end
Also aliased as: length

Private Instance Methods

check_measure_time(data) click to toggle source
# File lib/librato/metrics/queue.rb, line 162
def check_measure_time(data)
  time_keys = [:measure_time, :time]

  if time_keys.any? { |key| data[key] && data[key] < Metrics::MIN_MEASURE_TIME }
    raise InvalidMeasureTime, "Measure time for submitted metric (#{data}) is invalid."
  end
end
reconcile(measurements, val) click to toggle source
# File lib/librato/metrics/queue.rb, line 170
def reconcile(measurements, val)
  arr = val.is_a?(Hash) ? [@tags, :tags] : [@source, :source]
  return measurements if !val || val == arr.first
  measurements.map! do |measurement|
    unless measurement[arr.last]
      measurement[arr.last] = val
    end
    measurement
  end
  measurements
end
submit_check() click to toggle source
# File lib/librato/metrics/queue.rb, line 182
def submit_check
  autosubmit_check # in Processor
  if @autosubmit_count && self.length >= @autosubmit_count
    self.submit
  end
end