class FFWD::TCP::FlushingConnect

A TCP connection implementation that buffers events and metrics in batches over a time window and calls ‘send_all’ on the connection.

Constants

DEFAULT_EVENT_LIMIT

defaults for buffered connections. maximum amount of events to buffer up.

DEFAULT_FORCED_FLUSH_FACTOR

percent of maximum events/metrics which will cause a flush.

DEFAULT_METRIC_LIMIT

maximum amount of metrics to buffer up.

Attributes

log[R]

Public Class Methods

new(core, log, connection, config) click to toggle source
# File lib/ffwd/protocol/tcp/flushing_connect.rb, line 58
def initialize(core, log, connection, config)
  @log = log
  @c = connection

  flush_period = config[:flush_period]
  ignored = config[:ignored]
  forced_flush_factor = config[:forced_flush_factor]
  event_limit = config[:event_limit]
  metric_limit = config[:metric_limit]

  @event_buffer = []
  @metric_buffer = []
  @timer = nil
  @subs = []

  core.starting do
    @c.connect

    @timer = EM::PeriodicTimer.new(flush_period){flush!}

    unless ignored.include? :events
      event_consumer = setup_consumer(
        @event_buffer, event_limit, forced_flush_factor, :dropped_events)
      @subs << core.output.event_subscribe(&event_consumer)
    end

    unless ignored.include? :metrics
      metric_consumer = setup_consumer(
        @metric_buffer, metric_limit, forced_flush_factor, :dropped_metrics)
      @subs << core.output.metric_subscribe(&metric_consumer)
    end
  end

  core.stopping do
    @c.disconnect

    if @timer
      @timer.cancel
      @timer = nil
    end

    @subs.each(&:unsubscribe).clear
  end
end
prepare(opts) click to toggle source
# File lib/ffwd/protocol/tcp/flushing_connect.rb, line 51
def self.prepare opts
  opts[:forced_flush_factor] ||= DEFAULT_FORCED_FLUSH_FACTOR
  opts[:event_limit] ||= DEFAULT_EVENT_LIMIT
  opts[:metric_limit] ||= DEFAULT_METRIC_LIMIT
  opts
end

Public Instance Methods

flush!() click to toggle source
# File lib/ffwd/protocol/tcp/flushing_connect.rb, line 103
def flush!
  if @event_buffer.empty? and @metric_buffer.empty?
    return
  end

  unless @c.writable?
    increment :dropped_events, @event_buffer.size
    increment :dropped_metrics, @metric_buffer.size
    return
  end

  @c.send_all @event_buffer, @metric_buffer
  increment :sent_events, @event_buffer.size
  increment :sent_metrics, @metric_buffer.size
rescue => e
  log.error "Failed to flush buffers", e

  log.error "The following data could not be flushed:"

  @event_buffer.each_with_index do |event, i|
    log.error "##{i}: #{event.to_h}"
  end

  @metric_buffer.each_with_index do |metric, i|
    log.error "##{i}: #{metric.to_h}"
  end

  increment :failed_events, @event_buffer.size
  increment :failed_metrics, @metric_buffer.size
ensure
  @event_buffer.clear
  @metric_buffer.clear
end
reporter_meta() click to toggle source
# File lib/ffwd/protocol/tcp/flushing_connect.rb, line 47
def reporter_meta
  @c.reporter_meta
end

Private Instance Methods

setup_consumer(buffer, drop_limit, forced_flush_factor, statistics_key) click to toggle source
# File lib/ffwd/protocol/tcp/flushing_connect.rb, line 139
def setup_consumer buffer, drop_limit, forced_flush_factor, statistics_key
  forced_flush_limit = drop_limit * forced_flush_factor

  proc do |e|
    if buffer.size >= drop_limit
      increment statistics_key, 1
      next
    end

    buffer << e

    if buffer.size >= forced_flush_limit
      increment :forced_flush, 1
      flush!
    end
  end
end