class StatsD::Instrument::BatchedUDPSink::Dispatcher

Constants

BUFFER_CLASS
NEWLINE

Public Class Methods

new(host, port, flush_interval) click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 53
def initialize(host, port, flush_interval)
  @host = host
  @port = port
  @interrupted = false
  @flush_interval = flush_interval
  @buffer = BUFFER_CLASS.new
  @dispatcher_thread = Thread.new { dispatch }
end

Public Instance Methods

<<(datagram) click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 62
def <<(datagram)
  unless @dispatcher_thread&.alive?
    # If the dispatcher thread is dead, we assume it is because
    # the process was forked. So to avoid ending datagrams twice
    # we clear the buffer.
    @buffer.clear
    @dispatcher_thread = Thread.new { dispatch }
  end
  @buffer << datagram
  self
end
shutdown(wait = @flush_interval * 2) click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 74
def shutdown(wait = @flush_interval * 2)
  @interrupted = true
  if @dispatcher_thread&.alive?
    @dispatcher_thread.join(wait)
  else
    flush
  end
end

Private Instance Methods

dispatch() click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 102
def dispatch
  until @interrupted
    begin
      start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      flush
      next_sleep_duration = @flush_interval - (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start)

      sleep(next_sleep_duration) if next_sleep_duration > 0
    rescue => error
      report_error(error)
    end
  end

  flush
  invalidate_socket
end
flush() click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 86
def flush
  return if @buffer.empty?

  datagrams = @buffer.shift(@buffer.size)

  until datagrams.empty?
    packet = String.new(datagrams.pop, encoding: Encoding::BINARY, capacity: MAX_PACKET_SIZE)

    until datagrams.empty? || packet.bytesize + datagrams.first.bytesize + 1 > MAX_PACKET_SIZE
      packet << NEWLINE << datagrams.shift
    end

    send_packet(packet)
  end
end
invalidate_socket() click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 151
def invalidate_socket
  @socket&.close
ensure
  @socket = nil
end
report_error(error) click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 119
def report_error(error)
  StatsD.logger.error do
    "[#{self.class.name}] The dispatcher thread encountered an error #{error.class}: #{error.message}"
  end
end
send_packet(packet) click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 125
def send_packet(packet)
  retried = false
  socket.send(packet, 0)
rescue SocketError, IOError, SystemCallError => error
  StatsD.logger.debug do
    "[#{self.class.name}] Resetting connection because of #{error.class}: #{error.message}"
  end
  invalidate_socket
  if retried
    StatsD.logger.warning do
      "[#{self.class.name}] Events were dropped because of #{error.class}: #{error.message}"
    end
  else
    retried = true
    retry
  end
end
socket() click to toggle source
# File lib/statsd/instrument/batched_udp_sink.rb, line 143
def socket
  @socket ||= begin
    socket = UDPSocket.new
    socket.connect(@host, @port)
    socket
  end
end