class FFWD::ProducingClient
A client implementation that delegates all work to other threads.
Public Class Methods
new(channel, producer, flush_period, event_limit, metric_limit)
click to toggle source
# File lib/ffwd/producing_client.rb, line 67 def initialize channel, producer, flush_period, event_limit, metric_limit @flush_period = flush_period @event_limit = event_limit @metric_limit = metric_limit if @flush_period <= 0 raise "Invalid flush period: #{flush_period}" end @producer = producer @producer_is_reporter = FFWD.is_reporter? producer @events = [] @metrics = [] # Pending request. @request = nil @timer = nil @subs = [] channel.starting do @timer = EM::PeriodicTimer.new(@flush_period){safer_flush!} @subs << channel.event_subscribe do |e| if @events.size >= @event_limit increment :dropped_events next end @events << e end @subs << channel.metric_subscribe do |m| if @metrics.size >= @metric_limit increment :dropped_metrics next end @metrics << m end @producer.setup end channel.stopping do if @timer @timer.cancel @timer = nil end flush! @subs.each(&:unsubscribe).clear @metrics.clear @events.clear @producer.teardown end end
Public Instance Methods
flush!()
click to toggle source
# File lib/ffwd/producing_client.rb, line 148 def flush! if @request or not @request = @producer.produce(@events, @metrics) increment :dropped_events, @events.size increment :dropped_metrics, @metrics.size return end # store buffer sizes for use in callbacks. events_size = @events.size metrics_size = @metrics.size @request.callback do increment :sent_events, events_size increment :sent_metrics, metrics_size @request = nil end @request.errback do |e| log.error "Failed to produce", e increment :failed_events, events_size increment :failed_metrics, metrics_size @request = nil end rescue => e increment :failed_events, @events.size increment :failed_metrics, @metrics.size log.error "Failed to produce", e ensure @events.clear @metrics.clear end
report!(diff) { |m| ... }
click to toggle source
Calls superclass method
FFWD::Reporter#report!
# File lib/ffwd/producing_client.rb, line 55 def report!(diff) super(diff) do |m| yield m end return unless @producer_is_reporter @producer.report!(diff) do |m| yield m end end
reporter_meta()
click to toggle source
# File lib/ffwd/producing_client.rb, line 50 def reporter_meta return {} if @producer_is_reporter @producer.class.reporter_meta.merge(@producer.reporter_meta) end
safer_flush!()
click to toggle source
Apply some heuristics to determine if we can ‘ignore’ the current flush to prevent loss of data.
Checks that if a request is pending; we have not breached the limit of allowed events.
# File lib/ffwd/producing_client.rb, line 134 def safer_flush! if @request increment :slow_requests ignore_flush = ( @events.size < @event_limit or @metrics.size < @metric_limit) return if ignore_flush end flush! end