class FFWD::FlushingOutput

Attributes

log[R]
reporter_meta[R]

Public Class Methods

new(core, log, hook, config) click to toggle source
# File lib/ffwd/flushing_output.rb, line 32
def initialize core, log, hook, config
  @log = log
  @flush_interval = config[:flush_interval]
  @buffer_limit = config[:buffer_limit]
  @hook = hook
  @reporter_meta = @hook.reporter_meta

  @buffer = []
  @pending = nil
  @c = nil

  @sub = nil

  core.starting do
    @log.info "Started"
    @log.info "  config: #{config}"

    @hook.connect

    @sub = core.output.metric_subscribe do |metric|
      if @buffer.size >= @buffer_limit
        increment :dropped_metrics, 1
        next
      end

      @buffer << metric
      check_timer!
    end
  end

  core.stopping do
    @log.info "Stopped"

    @hook.close

    if @sub
      @sub.unsubscribe
      @sub = nil
    end

    if @timer
      @timer.cancel
      @timer = nil
    end
  end
end

Public Instance Methods

check_timer!() click to toggle source
# File lib/ffwd/flushing_output.rb, line 119
def check_timer!
  return if @timer

  @log.debug "Setting timer to #{@flush_interval}s"

  @timer = EM::Timer.new(@flush_interval) do
    flush!
  end
end
flush!() click to toggle source
# File lib/ffwd/flushing_output.rb, line 79
def flush!
  if @timer
    @timer.cancel
    @timer = nil
  end

  if @pending
    @log.info "Request already in progress, dropping metrics"
    increment :dropped_metrics, @buffer.size
    @buffer.clear
    return
  end

  unless @hook.active?
    @log.error "Dropping metrics, no active connection available"
    increment :dropped_metrics, @buffer.size
    @buffer.clear
    return
  end

  buffer_size = @buffer.size

  @pending = @hook.send @buffer

  @pending.callback do
    increment :sent_metrics, buffer_size
    @pending = nil
  end

  @pending.errback do
    @log.error "Failed to submit metrics: #{@pending.error}"
    increment :failed_metrics, buffer_size
    @pending = nil
  end
rescue => e
  @log.error "Error during flush", e
ensure
  @buffer.clear
end