class StatfulClient

Statful Client Instance

@attr_reader config [Hash] Current client config

Attributes

buffer[RW]
config[R]
logger[RW]

Public Class Methods

new(config = {}) click to toggle source

Initialize the client

@param [Hash] config Client bootstrap configuration @option config [String] :host Destination host required @option config [String] :port Destination port required @option config [String] :transport Transport protocol, one of (udp or http) required @option config [Integer] :timeout Timeout for http transport @option config [String] :token Authentication account token for http transport @option config [String] :app Global metric app tag @option config [TrueClass/FalseClass] :dryrun Enable dry-run mode @option config [Object] :logger Logger instance that supports debug (if dryrun is enabled) and error methods @option config [Hash] :tags Global list of metric tags @option config [Integer] :sample_rate Global sample rate (as a percentage), between: (1-100) @option config [String] :namespace Global default namespace @option config [Integer] :flush_size Buffer flush upper size limit @option config [Integer] :thread_pool_size Thread pool upper size limit @return [Object] The Statful client

# File lib/client.rb, line 33
def initialize(config = {})
  user_config = MyHash[config].symbolize_keys

  if !user_config.has_key?(:transport) || !%w(udp http).include?(user_config[:transport])
    raise ArgumentError.new('Transport is missing or invalid')
  end

  if user_config[:transport] == 'http'
    raise ArgumentError.new('Token is missing') if user_config[:token].nil?
  end

  if user_config.has_key?(:sample_rate) && !user_config[:sample_rate].between?(1, 100)
    raise ArgumentError.new('Sample rate is not within range (1-100)')
  end

  default_config = {
    :host => 'api.statful.com',
    :port => 443,
    :transport => 'http',
    :tags => {},
    :sample_rate => 100,
    :namespace => 'application',
    :flush_size => 5,
    :thread_pool_size => 5
  }

  @config = default_config.merge(user_config)
  @logger = @config[:logger]

  @buffer = MyQueue.new
  @pool = Concurrent::FixedThreadPool.new(@config[:thread_pool_size])

  @http = Net::HTTP.new(@config[:host], @config[:port])
  @http.use_ssl = true # must enforce use of ssl, otherwise it will raise EOFError: end of file reached

  self
end

Public Instance Methods

counter(name, value, options = {}) click to toggle source

Sends a counter

@param name [String] Name of the counter @param value [Numeric] Increment/Decrement value, this will be truncated with `to_int` @param [Hash] options The options to apply to the metric @option options [Hash] :tags Tags to associate to the metric @option options [Array<String>] :agg List of aggregations to be applied by the Statful @option options [Integer] :agg_freq Aggregation frequency in seconds @option options [String] :namespace Namespace of the metric

# File lib/client.rb, line 107
def counter(name, value, options = {})
  tags = @config[:tags]
  tags = tags.merge(options[:tags]) unless options[:tags].nil?

  aggregations = %w(sum count)
  aggregations.concat(options[:agg]) unless options[:agg].nil?

  opts = {
    :agg_freq => 10,
    :namespace => 'application'
  }.merge(MyHash[options].symbolize_keys)

  opts[:tags] = tags
  opts[:agg] = aggregations

  _put("counter.#{name}", opts[:tags], value.to_i, opts[:agg], opts[:agg_freq], @config[:sample_rate], opts[:namespace])
end
flush_metrics() click to toggle source

Flush metrics buffer

# File lib/client.rb, line 154
def flush_metrics
  flush
end
gauge(name, value, options = {}) click to toggle source

Sends a gauge

@param name [String] Name of the gauge @param value [Numeric] Value of the metric @param [Hash] options The options to apply to the metric @option options [Hash] :tags Tags to associate to the metric @option options [Array<String>] :agg List of aggregations to be applied by Statful @option options [Integer] :agg_freq Aggregation frequency in seconds @option options [String] :namespace Namespace of the metric

# File lib/client.rb, line 134
def gauge(name, value, options = {})
  tags = @config[:tags]
  tags = tags.merge(options[:tags]) unless options[:tags].nil?

  aggregations = %w(last)
  aggregations.concat(options[:agg]) unless options[:agg].nil?

  opts = {
    :agg_freq => 10,
    :namespace => 'application'
  }.merge(MyHash[options].symbolize_keys)

  opts[:tags] = tags
  opts[:agg] = aggregations

  _put("gauge.#{name}", opts[:tags], value, opts[:agg], opts[:agg_freq], @config[:sample_rate], opts[:namespace])
end
new() click to toggle source
# File lib/client.rb, line 12
def new
  self
end
put(metric, value, options = {}) click to toggle source

Adds a new metric to the buffer

@private @param metric [String] Name of the metric, ex: `response_time` @param value [Numeric] Value of the metric @param [Hash] options The options to apply to the metric @param options tags [Hash] Tags to associate to the metric @param options agg [Array<String>] List of aggregations to be applied by Statful @param options agg_freq [Integer] Aggregation frequency in seconds @param options sample_rate [Integer] Sampling rate, between: (1-100) @param options namespace [String] Namespace of the metric @param options timestamp [Integer] Timestamp of the metric

# File lib/client.rb, line 170
def put(metric, value, options = {})
  tags = @config[:tags]
  tags = tags.merge(options[:tags]) unless options[:tags].nil?

  agg = options[:agg].nil? ? [] : options[:agg]

  sample_rate = options[:sample_rate].nil? ? 100 : options[:sample_rate]
  namespace = options[:namespace].nil? ? 'application' : options[:namespace]

  _put(metric, tags, value, agg, options[:agg_freq], sample_rate, namespace, options[:timestamp])
end
timer(name, value, options = {}) click to toggle source

Sends a timer

@param name [String] Name of the timer @param value [Numeric] Value of the metric @param [Hash] options The options to apply to the metric @option options [Hash] :tags Tags to associate to the metric @option options [Array<String>] :agg List of aggregations to be applied by Statful @option options [Integer] :agg_freq Aggregation frequency in seconds @option options [String] :namespace Namespace of the metric

# File lib/client.rb, line 80
def timer(name, value, options = {})
  tags = @config[:tags].merge({:unit => 'ms'})
  tags = tags.merge(options[:tags]) unless options[:tags].nil?

  aggregations = %w(avg p90 count)
  aggregations.concat(options[:agg]) unless options[:agg].nil?

  opts = {
    :agg_freq => 10,
    :namespace => 'application'
  }.merge(MyHash[options].symbolize_keys)

  opts[:tags] = tags
  opts[:agg] = aggregations

  _put("timer.#{name}", opts[:tags], value, opts[:agg], opts[:agg_freq], @config[:sample_rate], opts[:namespace])
end

Private Instance Methods

_put(metric, tags, value, agg = [], agg_freq = 10, sample_rate = 100, namespace = 'application', timestamp = nil) click to toggle source

Adds a new metric to the buffer

@private @param metric [String] Name of the metric, ex: `response_time` @param value [Numeric] Value of the metric @param tags [Hash] Tags to associate to the metric @param agg [Array<String>] List of aggregations to be applied by Statful @param agg_freq [Integer] Aggregation frequency in seconds @param sample_rate [Integer] Sampling rate, between: (1-100) @param namespace [String] Namespace of the metric @param timestamp [Integer] Timestamp of the metric

# File lib/client.rb, line 198
def _put(metric, tags, value, agg = [], agg_freq = 10, sample_rate = 100, namespace = 'application', timestamp = nil)
  metric_name = "#{namespace}.#{metric}"
  sample_rate_normalized = sample_rate / 100
  timestamp = Time.now.to_i if timestamp.nil?

  @config.has_key?(:app) ? merged_tags = tags.merge({:app => @config[:app]}) : merged_tags = tags

  if Random.new.rand(1..100)*0.01 <= sample_rate_normalized
    flush_line = merged_tags.keys.inject(metric_name) { |previous, tag|
      "#{previous},#{tag.to_s}=#{merged_tags[tag]}"
    }

    flush_line += " #{value} #{timestamp}"

    unless agg.empty?
      agg.push(agg_freq)
      flush_line += " #{agg.join(',')}"
    end

    flush_line += sample_rate ? " #{sample_rate}" : ''

    put_raw(flush_line)
  end
end
flush() click to toggle source

Flushed the metrics to the Statful via UDP

@private

# File lib/client.rb, line 237
def flush
  unless @buffer.empty?
    message = @buffer.to_a.join("\n")

    # Handle socket errors by just logging if we have a logger instantiated
    # We could eventually save the buffer state but that would require us to manage the buffer size etc.
    begin
      @logger.debug("Flushing metrics: #{message}") unless @logger.nil?

      if !@config.has_key?(:dryrun) || !@config[:dryrun]
        transport_send(message)
      end
    rescue SocketError => ex
      @logger.debug("Statful: #{ex} on #{@config[:host]}:#{@config[:port]}") unless @logger.nil?
      false
    ensure
      @buffer.clear
    end
  end
end
http_transport(message) click to toggle source

Sends message via http transport

@private @param message :nocov:

# File lib/client.rb, line 279
def http_transport(message)
  headers = {
    'Content-Type' => 'application/json',
    'M-Api-Token' => @config[:token]
  }

  @pool.post do
    begin
      response = @http.send_request('PUT', '/tel/v2.0/metrics', message, headers)

      if response.code != '201'
        @logger.debug("Failed to flush message via http with: #{response.code} - #{response.msg}") unless @logger.nil?
      end
    rescue StandardError => ex
      @logger.debug("Statful: #{ex} on #{@config[:host]}:#{@config[:port]}") unless @logger.nil?
      false
    end
  end
end
put_raw(metric_lines) click to toggle source

Add raw metrics directly into the flush buffer

@private @param metric_lines

# File lib/client.rb, line 227
def put_raw(metric_lines)
  @buffer.push(metric_lines)
  if @buffer.size >= @config[:flush_size]
    flush
  end
end
transport_send(message) click to toggle source

Delegate flushing messages to the proper transport

@private @param message

# File lib/client.rb, line 262
def transport_send(message)
  case @config[:transport]
    when 'http'
      http_transport(message)
    when 'udp'
      udp_transport(message)
    else
      @logger.debug("Failed to flush message due to invalid transport: #{@config[:transport]}") unless @logger.nil?
  end
end
udp_socket() click to toggle source

Return a new or existing udp socket

@private :nocov:

# File lib/client.rb, line 312
def udp_socket
  Thread.current[:statful_socket] ||= UDPSocket.new(Addrinfo.udp(@config[:host], @config[:port]).afamily)
end
udp_transport(message) click to toggle source

Sends message via udp transport

@private @param message :nocov:

# File lib/client.rb, line 304
def udp_transport(message)
  udp_socket.send(message)
end