class FFWD::FlushingOutput
Attributes
log[R]
reporter_meta[R]
Public Class Methods
new(core, log, hook, config)
click to toggle source
# File lib/ffwd/flushing_output.rb, line 32 def initialize core, log, hook, config @log = log @flush_interval = config[:flush_interval] @buffer_limit = config[:buffer_limit] @hook = hook @reporter_meta = @hook.reporter_meta @buffer = [] @pending = nil @c = nil @sub = nil core.starting do @log.info "Started" @log.info " config: #{config}" @hook.connect @sub = core.output.metric_subscribe do |metric| if @buffer.size >= @buffer_limit increment :dropped_metrics, 1 next end @buffer << metric check_timer! end end core.stopping do @log.info "Stopped" @hook.close if @sub @sub.unsubscribe @sub = nil end if @timer @timer.cancel @timer = nil end end end
Public Instance Methods
check_timer!()
click to toggle source
# File lib/ffwd/flushing_output.rb, line 119 def check_timer! return if @timer @log.debug "Setting timer to #{@flush_interval}s" @timer = EM::Timer.new(@flush_interval) do flush! end end
flush!()
click to toggle source
# File lib/ffwd/flushing_output.rb, line 79 def flush! if @timer @timer.cancel @timer = nil end if @pending @log.info "Request already in progress, dropping metrics" increment :dropped_metrics, @buffer.size @buffer.clear return end unless @hook.active? @log.error "Dropping metrics, no active connection available" increment :dropped_metrics, @buffer.size @buffer.clear return end buffer_size = @buffer.size @pending = @hook.send @buffer @pending.callback do increment :sent_metrics, buffer_size @pending = nil end @pending.errback do @log.error "Failed to submit metrics: #{@pending.error}" increment :failed_metrics, buffer_size @pending = nil end rescue => e @log.error "Error during flush", e ensure @buffer.clear end