class Instrumental::Agent

Constants

BACKOFF
CONNECT_TIMEOUT
DEFAULT_FREQUENCY
EXIT_FLUSH_TIMEOUT
HOSTNAME
MAX_AGGREGATOR_SIZE
MAX_BUFFER
MAX_RECONNECT_DELAY
REPLY_TIMEOUT
RESOLUTION_FAILURES_BEFORE_WAITING
RESOLUTION_WAIT
RESOLVE_TIMEOUT
VALID_FREQUENCIES

Attributes

aggregator_queue[RW]
connection[R]
dns_resolutions[RW]
enabled[R]
frequency[RW]
host[RW]
last_connect_at[RW]
port[RW]
secure[R]
sender_queue[RW]
synchronous[RW]

Public Class Methods

logger() click to toggle source
# File lib/instrumental/agent.rb, line 37
def self.logger
  if !@logger
    @logger = Logger.new(STDERR)
    @logger.level = Logger::WARN
  end
  @logger
end
logger=(l) click to toggle source
# File lib/instrumental/agent.rb, line 33
def self.logger=(l)
  @logger = l
end
new(api_key, options = {}) click to toggle source

Sets up a connection to the collector.

Instrumental::Agent.new(API_KEY)
Instrumental::Agent.new(API_KEY, :collector => 'hostname:port')
# File lib/instrumental/agent.rb, line 49
def initialize(api_key, options = {})
  # symbolize options keys
  options.replace(
    options.inject({}) { |m, (k, v)| m[(k.to_sym rescue k) || k] = v; m }
  )

  # defaults
  # host:        collector.instrumentalapp.com
  # port:        8001
  # enabled:     true
  # synchronous: false
  # frequency:   10
  # secure:      true
  # verify:      true
  @api_key         = api_key
  @host, @port     = options[:collector].to_s.split(':')
  @host          ||= 'collector.instrumentalapp.com'
  requested_secure = options[:secure] == true
  desired_secure   = options[:secure].nil? ? allows_secure? : !!options[:secure]
  if !allows_secure? && desired_secure
    logger.warn "Cannot connect to Instrumental via encrypted transport, SSL not available"
    if requested_secure
      options[:enabled] = false
      logger.error "You requested secure protocol to connect to Instrumental, but it is not available on this system (OpenSSL is not defined). Connecting to Instrumental has been disabled."
    end
    desired_secure = false
  end
  @secure          = desired_secure
  @verify_cert     = options[:verify_cert].nil? ? true : !!options[:verify_cert]
  default_port     = @secure ? 8001 : 8000
  @port            = (@port || default_port).to_i
  @enabled         = options.has_key?(:enabled) ? !!options[:enabled] : true
  @synchronous     = !!options[:synchronous]

  if options.has_key?(:frequency)
    self.frequency = options[:frequency]
  else
    self.frequency = DEFAULT_FREQUENCY
  end

  @metrician       = options[:metrician].nil? ? true : !!options[:metrician]
  @pid             = Process.pid
  @allow_reconnect = true
  @dns_resolutions = 0
  @last_connect_at = 0

  @start_worker_mutex = Mutex.new
  @aggregator_queue = Queue.new
  @sender_queue = Queue.new


  setup_cleanup_at_exit if @enabled

  if @metrician
    Metrician.activate(self)
  end
end

Public Instance Methods

cleanup() click to toggle source

Called when a process is exiting to give it some extra time to push events to the service. An at_exit handler is automatically registered for this method, but can be called manually in cases where at_exit is bypassed like Resque workers.

# File lib/instrumental/agent.rb, line 263
def cleanup
  if running?
    logger.info "Cleaning up agent, aggregator_size: #{@aggregator_queue.size}, thread_running: #{@aggregator_thread.alive?}"
    logger.info "Cleaning up agent, queue size: #{@sender_queue.size}, thread running: #{@sender_thread.alive?}"
    @allow_reconnect = false
    begin
      with_timeout(EXIT_FLUSH_TIMEOUT) do
        @aggregator_queue << ['exit']
        @aggregator_thread.join
        @sender_queue << ['exit']
        @sender_thread.join
      end
    rescue Timeout::Error
      total_size = @sender_queue&.size.to_i +
                   @aggregator_queue&.size.to_i +
                   @event_aggregator&.size.to_i

      if total_size > 0
        logger.error "Timed out working agent thread on exit, dropping #{total_size} metrics"
      else
        logger.error "Timed out Instrumental Agent, exiting"
      end
    end
  end
end
connected?() click to toggle source
# File lib/instrumental/agent.rb, line 204
def connected?
  @socket && !@socket.closed?
end
enabled?() click to toggle source
# File lib/instrumental/agent.rb, line 200
def enabled?
  @enabled
end
flush(allow_reconnect = false) click to toggle source

Synchronously flush all pending metrics out to the server By default will not try to reconnect to the server if a connection failure happens during the flush, though you may optionally override this behavior by passing true.

agent.flush
# File lib/instrumental/agent.rb, line 193
def flush(allow_reconnect = false)
  queue_message('flush', {
    :synchronous => true,
    :allow_reconnect => allow_reconnect
  }) if running?
end
frequency=(frequency) click to toggle source
# File lib/instrumental/agent.rb, line 216
def frequency=(frequency)
  freq = frequency.to_i
  if !VALID_FREQUENCIES.include?(freq)
    logger.warn "Frequency must be a value that divides evenly into 60: 1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30, or 60."
    # this will make all negative numbers and nils into 0s
    freq = VALID_FREQUENCIES.select{ |f| f < freq }.max.to_i
  end

  @frequency = if(@synchronous)
                 logger.warn "Synchronous and Frequency should not be enabled at the same time! Defaulting to synchronous mode."
                 0
               else
                 freq
               end
end
gauge(metric, value, time = Time.now, count = 1) click to toggle source

Store a gauge for a metric, optionally at a specific time.

agent.gauge('load', 1.23)
# File lib/instrumental/agent.rb, line 110
def gauge(metric, value, time = Time.now, count = 1)
  if valid?(metric, value, time, count) &&
     send_command(Instrumental::Command.new("gauge".freeze, metric, value, time, count))
    # tempted to "gauge" this to a symbol? Don't. Frozen strings are very fast,
    # and later we're going to to_s every one of these anyway.
    value
  else
    nil
  end
rescue Exception => e
  report_exception(e)
  nil
end
increment(metric, value = 1, time = Time.now, count = 1) click to toggle source

Increment a metric, optionally more than one or at a specific time.

agent.increment('users')
# File lib/instrumental/agent.rb, line 160
def increment(metric, value = 1, time = Time.now, count = 1)
  if valid?(metric, value, time, count) &&
     send_command(Instrumental::Command.new("increment".freeze, metric, value, time, count))
    value
  else
    nil
  end
rescue Exception => e
  report_exception(e)
  nil
end
logger() click to toggle source
# File lib/instrumental/agent.rb, line 212
def logger
  @logger || self.class.logger
end
logger=(logger) click to toggle source
# File lib/instrumental/agent.rb, line 208
def logger=(logger)
  @logger = logger
end
notice(note, time = Time.now, duration = 0) click to toggle source

Send a notice to the server (deploys, downtime, etc.)

agent.notice('A notice')
# File lib/instrumental/agent.rb, line 175
def notice(note, time = Time.now, duration = 0)
  if valid_note?(note)
    send_command(Instrumental::Notice.new(note, time, duration))
    note
  else
    nil
  end
rescue Exception => e
  report_exception(e)
  nil
end
stop() click to toggle source

Stopping the agent will immediately stop all communication to Instrumental. If you call this and submit another metric, the agent will start again.

Calling stop will cause all metrics waiting to be sent to be discarded. Don't call it unless you are expecting this behavior.

agent.stop

# File lib/instrumental/agent.rb, line 241
def stop
  disconnect
  if @sender_thread
    @sender_thread.kill
    @sender_thread = nil
  end
  if @aggregator_thread
    @aggregator_thread.kill
    @aggregator_thread = nil
  end
  if @sender_queue
    @sender_queue.clear
  end
  if @aggregator_queue
    @aggregator_queue.clear
  end
end
time(metric, multiplier = 1) { || ... } click to toggle source

Store the duration of a block in a metric. multiplier can be used to scale the duration to desired unit or change the duration in some meaningful way.

agent.time('response_time') do
  # potentially slow stuff
end

agent.time('response_time_in_ms', 1000) do
  # potentially slow stuff
end

ids = [1, 2, 3]
agent.time('find_time_per_post', 1 / ids.size.to_f) do
  Post.find(ids)
end
# File lib/instrumental/agent.rb, line 140
def time(metric, multiplier = 1)
  start = Time.now
  begin
    result = yield
  ensure
    finish = Time.now
    duration = finish - start
    gauge(metric, duration * multiplier, start)
  end
  result
end
time_ms(metric, &block) click to toggle source

Calls time and changes durations into milliseconds.

# File lib/instrumental/agent.rb, line 153
def time_ms(metric, &block)
  time(metric, 1000, &block)
end

Private Instance Methods

allows_secure?() click to toggle source
# File lib/instrumental/agent.rb, line 668
def allows_secure?
  defined?(OpenSSL)
end
disconnect() click to toggle source
# File lib/instrumental/agent.rb, line 650
def disconnect
  if connected?
    logger.info "Disconnecting..."
    begin
      with_timeout(EXIT_FLUSH_TIMEOUT) do
        flush_socket(@socket)
      end
    rescue Timeout::Error
      logger.info "Timed out flushing socket..."
    end
    @socket.close
  end
rescue Exception => e
  logger.error "Error closing socket, #{e.message}"
ensure
  @socket = nil
end
flush_socket(socket) click to toggle source
# File lib/instrumental/agent.rb, line 644
def flush_socket(socket)
  socket.flush
rescue Exception => e
  logger.error "Error flushing socket, #{e.message}"
end
ipv4_address_for_host(host, port, moment_to_connect = Time.now.to_i) click to toggle source
# File lib/instrumental/agent.rb, line 325
def ipv4_address_for_host(host, port, moment_to_connect = Time.now.to_i)
  self.dns_resolutions  = dns_resolutions + 1
  time_since_last_connect = moment_to_connect - last_connect_at
  if dns_resolutions < RESOLUTION_FAILURES_BEFORE_WAITING || time_since_last_connect >= RESOLUTION_WAIT
    self.last_connect_at = moment_to_connect
    with_timeout(RESOLVE_TIMEOUT) do
      address  = Resolv.getaddresses(host).select { |address| address =~ Resolv::IPv4::Regex }.first
      self.dns_resolutions = 0
      address
    end
  end
rescue Exception => e
  logger.warn "Couldn't resolve address for #{host}:#{port}"
  report_exception(e)
  nil
end
open_socket(sockaddr_in, secure, verify_cert) click to toggle source
# File lib/instrumental/agent.rb, line 461
def open_socket(sockaddr_in, secure, verify_cert)
  sock = Socket.new(Socket::PF_INET, Socket::SOCK_STREAM, 0)
  sock.connect(sockaddr_in)
  if secure
    context = OpenSSL::SSL::SSLContext.new
    if verify_cert
      context.set_params(:verify_mode => OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT)
    else
      context.set_params(:verify_mode => OpenSSL::SSL::VERIFY_NONE)
    end
    ssl_socket = OpenSSL::SSL::SSLSocket.new(sock, context)
    ssl_socket.sync_close = true
    ssl_socket.connect
    sock = ssl_socket
  end
  sock
end
queue_message(message, options = {}) click to toggle source
# File lib/instrumental/agent.rb, line 360
def queue_message(message, options = {})
  return message unless enabled?

  # imagine it's a reverse merge, but with fewer allocations
  options[:allow_reconnect] = @allow_reconnect unless options.has_key?(:allow_reconnect)

  if options.delete(:synchronous)
    options[:sync_resource] ||= ConditionVariable.new
    @sync_mutex.synchronize {
      queue = message == "flush" ? @aggregator_queue : @sender_queue
      queue << [message, options]
      options[:sync_resource].wait(@sync_mutex)
    }
  elsif frequency.to_i == 0
    @sender_queue << [message, options]
  else
    @aggregator_queue << [message, options]
  end
  message
end
report_exception(e) click to toggle source
# File lib/instrumental/agent.rb, line 320
def report_exception(e)
  # puts "--- Exception of type #{e.class} occurred:\n#{e.message}\n#{e.backtrace.join("\n")}"
  logger.error "Exception of type #{e.class} occurred:\n#{e.message}\n#{e.backtrace.join("\n")}"
end
report_invalid_metric(metric) click to toggle source
# File lib/instrumental/agent.rb, line 310
def report_invalid_metric(metric)
  increment "agent.invalid_metric"
  logger.warn "Invalid metric #{metric}"
end
report_invalid_value(metric, value) click to toggle source
# File lib/instrumental/agent.rb, line 315
def report_invalid_value(metric, value)
  increment "agent.invalid_value"
  logger.warn "Invalid value #{value.inspect} for #{metric}"
end
run_aggregator_loop() click to toggle source
# File lib/instrumental/agent.rb, line 479
def run_aggregator_loop
  # if the sender queue is some level of full, should we keep aggregating until it empties out?
  # what does this mean for aggregation slices - aggregating to nearest frequency will
  # make the object needlessly larger, when minute resolution is what we have on the server
  begin
    loop do
      now = Time.now.to_i
      time_to_wait = if frequency == 0
                       0
                     else
                       next_frequency = (now - (now % frequency)) + frequency
                       time_to_wait = [(next_frequency - Time.now.to_f), 0].max
                     end

      command_and_args, command_options = if @event_aggregator&.size.to_i > MAX_AGGREGATOR_SIZE
                                            logger.info "Aggregator full, flushing early with #{MAX_AGGREGATOR_SIZE} metrics."
                                            command_and_args, command_options = ['forward', {}]
                                          else
                                            begin
                                              with_timeout(time_to_wait) do
                                                @aggregator_queue.pop
                                              end
                                            rescue Timeout::Error
                                              ['forward', {}]
                                            end
                                          end
      if command_and_args
        case command_and_args
        when 'exit'
          if !@event_aggregator.nil?
            @sender_queue << @event_aggregator
            @event_aggregator = nil
          end
          logger.info "Exiting, #{@aggregator_queue.size} commands remain"
          return true
        when 'flush'
          if !@event_aggregator.nil?
            @sender_queue << @event_aggregator
            @event_aggregator = nil
          end
          @sender_queue << ['flush', command_options]
        when 'forward'
          if !@event_aggregator.nil?
            next if @sender_queue.size > 0 && @sender_queue.num_waiting < 1
            @sender_queue << @event_aggregator
            @event_aggregator = nil
          end
        when Notice
          @sender_queue << [command_and_args, command_options]
        else
          @event_aggregator = EventAggregator.new(frequency: @frequency) if @event_aggregator.nil?

          logger.debug "Sending: #{command_and_args} to aggregator"
          @event_aggregator.put(command_and_args)
        end
        command_and_args = nil
        command_options = nil
      end
    end
  rescue Exception => err
    report_exception(err)
  end
end
run_sender_loop() click to toggle source
# File lib/instrumental/agent.rb, line 543
def run_sender_loop
  @failures = 0
  begin
    logger.info "connecting to collector"
    command_and_args = nil
    command_options = nil
    with_timeout(CONNECT_TIMEOUT) do
      @socket = open_socket(@sockaddr_in, @secure, @verify_cert)
    end
    logger.info "connected to collector at #{host}:#{port}"
    hello_options = {
      "version" => "ruby/instrumental_agent/#{VERSION}",
      "hostname" => HOSTNAME,
      "pid" => Process.pid,
      "runtime" => "#{defined?(RUBY_ENGINE) ? RUBY_ENGINE : "ruby"}/#{RUBY_VERSION}p#{RUBY_PATCHLEVEL}",
      "platform" => RUBY_PLATFORM
    }.to_a.flatten.map { |v| v.to_s.gsub(/\s+/, "_") }.join(" ")

    send_with_reply_timeout "hello #{hello_options}"
    send_with_reply_timeout "authenticate #{@api_key}"

    loop do
      command_and_args, command_options = @sender_queue.pop
      if command_and_args
        sync_resource = command_options && command_options[:sync_resource]
        test_connection
        case command_and_args
        when 'exit'
          logger.info "Exiting, #{@sender_queue.size} commands remain"
          return true
        when 'flush'
          release_resource = true
        when EventAggregator
          command_and_args.values.values.each do |command|
            logger.debug "Sending: #{command}"
            @socket.puts command
          end
        else
          logger.debug "Sending: #{command_and_args}"
          @socket.puts command_and_args
        end
        command_and_args = nil
        command_options = nil
        if sync_resource
          @sync_mutex.synchronize do
            sync_resource.signal
          end
        end
      end
    end
  rescue Exception => err
    allow_reconnect = @allow_reconnect
    case err
    when EOFError
    # nop
    when Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::EADDRINUSE, Timeout::Error, OpenSSL::SSL::SSLError
      # If the connection has been refused by Instrumental
      # or we cannot reach the server
      # or the connection state of this socket is in a race
      # or SSL is not functioning properly for some reason
      logger.error "unable to connect to Instrumental, hanging up with #{@sender_queue.size} messages remaining"
      logger.debug "Exception: #{err.inspect}\n#{err.backtrace.join("\n")}"
      allow_reconnect = false
    else
      report_exception(err)
    end
    if allow_reconnect == false ||
       (command_options && command_options[:allow_reconnect] == false)
      logger.info "Not trying to reconnect"
      @failures = 0
      return
    end
    if command_and_args
      logger.debug "requeueing: #{command_and_args}"
      @sender_queue << command_and_args
    end
    disconnect
    @failures += 1
    delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min
    logger.error "disconnected, #{@failures} failures in a row, reconnect in #{delay}..."
    sleep delay
    retry
  ensure
    disconnect
  end
end
running?() click to toggle source
# File lib/instrumental/agent.rb, line 636
def running?
  !@sender_thread.nil? &&
    !@aggregator_thread.nil? &&
    @pid == Process.pid &&
    @sender_thread.alive? &&
    @aggregator_thread.alive?
end
send_command(command) click to toggle source
# File lib/instrumental/agent.rb, line 342
def send_command(command)
  return logger.debug(command.to_s) unless enabled?
  start_workers
  critical_queue = frequency.to_i == 0 ? @sender_queue : @aggregator_queue
  if critical_queue && critical_queue.size < MAX_BUFFER
    @queue_full_warning = false
    logger.debug "Queueing: #{command.to_s}"
    queue_message(command, { :synchronous => @synchronous })
  else
    if !@queue_full_warning
      @queue_full_warning = true
      logger.warn "Queue full(#{critical_queue.size}), dropping commands..."
    end
    logger.debug "Dropping command, queue full(#{critical_queue.size}): #{command.to_s}"
    nil
  end
end
send_with_reply_timeout(message) click to toggle source
# File lib/instrumental/agent.rb, line 451
def send_with_reply_timeout(message)
  @socket.puts message
  with_timeout(REPLY_TIMEOUT) do
    response = @socket.gets
    if response.to_s.chomp != "ok"
      raise "Bad Response #{response.inspect} to #{message.inspect}"
    end
  end
end
setup_cleanup_at_exit() click to toggle source
# File lib/instrumental/agent.rb, line 630
def setup_cleanup_at_exit
  at_exit do
    cleanup
  end
end
start_workers() click to toggle source
# File lib/instrumental/agent.rb, line 404
def start_workers
  # NOTE: We need a mutex around both `running?` and thread creation,
  # otherwise we could create too many threads.
  # Return early and queue the message if another thread is
  # starting the worker.
  return if !@start_worker_mutex.try_lock
  begin
    return if running?
    return unless enabled?
    disconnect
    address = ipv4_address_for_host(@host, @port)
    if address
      new_pid = if @pid != Process.pid
                  @pid = Process.pid
                  true
                else
                  false
                end

      @sync_mutex = Mutex.new
      @failures = 0
      @sockaddr_in = Socket.pack_sockaddr_in(@port, address)

      logger.info "Starting aggregator thread"
      if !@aggregator_thread&.alive?
        if new_pid
          @event_aggregator = nil
          @aggregator_queue = Queue.new
        end
        @aggregator_thread = Thread.new do
          run_aggregator_loop
        end
      end

      if !@sender_thread&.alive?
        logger.info "Starting sender thread"
        @sender_queue = Queue.new if new_pid
        @sender_thread = Thread.new do
          run_sender_loop
        end
      end
    end
  ensure
    @start_worker_mutex.unlock
  end
end
test_connection() click to toggle source
# File lib/instrumental/agent.rb, line 396
def test_connection
  begin
    @socket.read_nonblock(1)
  rescue *wait_exceptions
    # noop
  end
end
valid?(metric, value, time, count) click to toggle source
# File lib/instrumental/agent.rb, line 299
def valid?(metric, value, time, count)
  valid_metric = metric =~ /^([\d\w\-_]+\.)*[\d\w\-_]+$/i
  valid_value  = value.to_s =~ /^-?\d+(\.\d+)?(e-\d+)?$/

  return true if valid_metric && valid_value

  report_invalid_metric(metric) unless valid_metric
  report_invalid_value(metric, value) unless valid_value
  false
end
valid_note?(note) click to toggle source
# File lib/instrumental/agent.rb, line 295
def valid_note?(note)
  note !~ /[\n\r]/
end
wait_exceptions() click to toggle source
# File lib/instrumental/agent.rb, line 381
def wait_exceptions
  classes = [Errno::EAGAIN]
  if defined?(IO::EAGAINWaitReadable)
    classes << IO::EAGAINWaitReadable
  end
  if defined?(IO::EWOULDBLOCKWaitReadable)
    classes << IO::EWOULDBLOCKWaitReadable
  end
  if defined?(IO::WaitReadable)
    classes << IO::WaitReadable
  end
  classes
end
with_timeout(time) { || ... } click to toggle source
# File lib/instrumental/agent.rb, line 291
def with_timeout(time, &block)
  InstrumentalTimeout.timeout(time) { yield }
end