class FFWD::ProducingClient

A client implementation that delegates all work to other threads.

Public Class Methods

new(channel, producer, flush_period, event_limit, metric_limit) click to toggle source
# File lib/ffwd/producing_client.rb, line 67
def initialize channel, producer, flush_period, event_limit, metric_limit
  @flush_period = flush_period
  @event_limit = event_limit
  @metric_limit = metric_limit

  if @flush_period <= 0
    raise "Invalid flush period: #{flush_period}"
  end

  @producer = producer
  @producer_is_reporter = FFWD.is_reporter? producer

  @events = []
  @metrics = []

  # Pending request.
  @request = nil
  @timer = nil

  @subs = []

  channel.starting do
    @timer = EM::PeriodicTimer.new(@flush_period){safer_flush!}

    @subs << channel.event_subscribe do |e|
      if @events.size >= @event_limit
        increment :dropped_events
        next
      end

      @events << e
    end

    @subs << channel.metric_subscribe do |m|
      if @metrics.size >= @metric_limit
        increment :dropped_metrics
        next
      end

      @metrics << m
    end

    @producer.setup
  end

  channel.stopping do
    if @timer
      @timer.cancel
      @timer = nil
    end

    flush!

    @subs.each(&:unsubscribe).clear

    @metrics.clear
    @events.clear

    @producer.teardown
  end
end

Public Instance Methods

flush!() click to toggle source
# File lib/ffwd/producing_client.rb, line 148
def flush!
  if @request or not @request = @producer.produce(@events, @metrics)
    increment :dropped_events, @events.size
    increment :dropped_metrics, @metrics.size
    return
  end

  # store buffer sizes for use in callbacks.
  events_size = @events.size
  metrics_size = @metrics.size

  @request.callback do
    increment :sent_events, events_size
    increment :sent_metrics, metrics_size
    @request = nil
  end

  @request.errback do |e|
    log.error "Failed to produce", e
    increment :failed_events, events_size
    increment :failed_metrics, metrics_size
    @request = nil
  end
rescue => e
  increment :failed_events, @events.size
  increment :failed_metrics, @metrics.size
  log.error "Failed to produce", e
ensure
  @events.clear
  @metrics.clear
end
report!(diff) { |m| ... } click to toggle source
Calls superclass method FFWD::Reporter#report!
# File lib/ffwd/producing_client.rb, line 55
def report!(diff)
  super(diff) do |m|
    yield m
  end

  return unless @producer_is_reporter

  @producer.report!(diff) do |m|
    yield m
  end
end
reporter_meta() click to toggle source
# File lib/ffwd/producing_client.rb, line 50
def reporter_meta
  return {} if @producer_is_reporter
  @producer.class.reporter_meta.merge(@producer.reporter_meta)
end
safer_flush!() click to toggle source

Apply some heuristics to determine if we can ‘ignore’ the current flush to prevent loss of data.

Checks that if a request is pending; we have not breached the limit of allowed events.

# File lib/ffwd/producing_client.rb, line 134
def safer_flush!
  if @request
    increment :slow_requests

    ignore_flush = (
      @events.size < @event_limit or
      @metrics.size < @metric_limit)

    return if ignore_flush
  end

  flush!
end