class LaunchDarkly::EventProcessor

@private

Public Class Methods

new(sdk_key, config, client = nil, diagnostic_accumulator = nil, test_properties = nil) click to toggle source
# File lib/ldclient-rb/events.rb, line 93
def initialize(sdk_key, config, client = nil, diagnostic_accumulator = nil, test_properties = nil)
  raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil?  # see LDClient constructor comment on sdk_key
  @logger = config.logger
  @inbox = SizedQueue.new(config.capacity < 100 ? 100 : config.capacity)
  @flush_task = Concurrent::TimerTask.new(execution_interval: config.flush_interval) do
    post_to_inbox(FlushMessage.new)
  end
  @flush_task.execute
  @users_flush_task = Concurrent::TimerTask.new(execution_interval: config.user_keys_flush_interval) do
    post_to_inbox(FlushUsersMessage.new)
  end
  @users_flush_task.execute
  if !diagnostic_accumulator.nil?
    interval = test_properties && test_properties.has_key?(:diagnostic_recording_interval) ?
      test_properties[:diagnostic_recording_interval] :
      config.diagnostic_recording_interval
    @diagnostic_event_task = Concurrent::TimerTask.new(execution_interval: interval) do
      post_to_inbox(DiagnosticEventMessage.new)
    end
    @diagnostic_event_task.execute
  else
    @diagnostic_event_task = nil
  end
  @stopped = Concurrent::AtomicBoolean.new(false)
  @inbox_full = Concurrent::AtomicBoolean.new(false)

  event_sender = test_properties && test_properties.has_key?(:event_sender) ?
    test_properties[:event_sender] :
    Impl::EventSender.new(sdk_key, config, client ? client : Util.new_http_client(config.events_uri, config))

  EventDispatcher.new(@inbox, sdk_key, config, diagnostic_accumulator, event_sender)
end

Public Instance Methods

add_event(event) click to toggle source
# File lib/ldclient-rb/events.rb, line 126
def add_event(event)
  event[:creationDate] = Impl::Util.current_time_millis
  post_to_inbox(EventMessage.new(event))
end
flush() click to toggle source
# File lib/ldclient-rb/events.rb, line 131
def flush
  # flush is done asynchronously
  post_to_inbox(FlushMessage.new)
end
stop() click to toggle source
# File lib/ldclient-rb/events.rb, line 136
def stop
  # final shutdown, which includes a final flush, is done synchronously
  if @stopped.make_true
    @flush_task.shutdown
    @users_flush_task.shutdown
    @diagnostic_event_task.shutdown if !@diagnostic_event_task.nil?
    # Note that here we are not calling post_to_inbox, because we *do* want to wait if the inbox
    # is full; an orderly shutdown can't happen unless these messages are received.
    @inbox << FlushMessage.new
    stop_msg = StopMessage.new
    @inbox << stop_msg
    stop_msg.wait_for_completion
  end
end
wait_until_inactive() click to toggle source

exposed only for testing

# File lib/ldclient-rb/events.rb, line 152
def wait_until_inactive
  sync_msg = TestSyncMessage.new
  @inbox << sync_msg
  sync_msg.wait_for_completion
end

Private Instance Methods

post_to_inbox(message) click to toggle source
# File lib/ldclient-rb/events.rb, line 160
def post_to_inbox(message)
  begin
    @inbox.push(message, non_block=true)
  rescue ThreadError
    # If the inbox is full, it means the EventDispatcher thread is seriously backed up with not-yet-processed
    # events. This is unlikely, but if it happens, it means the application is probably doing a ton of flag
    # evaluations across many threads-- so if we wait for a space in the inbox, we risk a very serious slowdown
    # of the app. To avoid that, we'll just drop the event. The log warning about this will only be shown once.
    if @inbox_full.make_true
      @logger.warn { "[LDClient] Events are being produced faster than they can be processed; some events will be dropped" }
    end
  end
end