class LaunchDarkly::EventDispatcher

@private

Public Class Methods

new(inbox, sdk_key, config, diagnostic_accumulator, event_sender) click to toggle source
# File lib/ldclient-rb/events.rb, line 177
def initialize(inbox, sdk_key, config, diagnostic_accumulator, event_sender)
  @sdk_key = sdk_key
  @config = config
  @diagnostic_accumulator = config.diagnostic_opt_out? ? nil : diagnostic_accumulator
  @event_sender = event_sender

  @user_keys = SimpleLRUCacheSet.new(config.user_keys_capacity)
  @formatter = EventOutputFormatter.new(config)
  @disabled = Concurrent::AtomicBoolean.new(false)
  @last_known_past_time = Concurrent::AtomicReference.new(0)
  @deduplicated_users = 0
  @events_in_last_batch = 0
  
  outbox = EventBuffer.new(config.capacity, config.logger)
  flush_workers = NonBlockingThreadPool.new(MAX_FLUSH_WORKERS)

  if !@diagnostic_accumulator.nil?
    diagnostic_event_workers = NonBlockingThreadPool.new(1)
    init_event = @diagnostic_accumulator.create_init_event(config)
    send_diagnostic_event(init_event, diagnostic_event_workers)
  else
    diagnostic_event_workers = nil
  end

  Thread.new { main_loop(inbox, outbox, flush_workers, diagnostic_event_workers) }
end

Private Instance Methods

dispatch_event(event, outbox) click to toggle source
# File lib/ldclient-rb/events.rb, line 250
def dispatch_event(event, outbox)
  return if @disabled.value

  # Always record the event in the summary.
  outbox.add_to_summary(event)

  # Decide whether to add the event to the payload. Feature events may be added twice, once for
  # the event (if tracked) and once for debugging.
  will_add_full_event = false
  debug_event = nil
  if event[:kind] == "feature"
    will_add_full_event = event[:trackEvents]
    if should_debug_event(event)
      debug_event = event.clone
      debug_event[:debug] = true
    end
  else
    will_add_full_event = true
  end

  # For each user we haven't seen before, we add an index event - unless this is already
  # an identify event for that user.
  if !(will_add_full_event && @config.inline_users_in_events)
    if event.has_key?(:user) && !notice_user(event[:user]) && event[:kind] != "identify"
      outbox.add_event({
        kind: "index",
        creationDate: event[:creationDate],
        user: event[:user]
      })
    end
  end

  outbox.add_event(event) if will_add_full_event
  outbox.add_event(debug_event) if !debug_event.nil?
end
do_shutdown(flush_workers, diagnostic_event_workers) click to toggle source
# File lib/ldclient-rb/events.rb, line 234
def do_shutdown(flush_workers, diagnostic_event_workers)
  flush_workers.shutdown
  flush_workers.wait_for_termination
  if !diagnostic_event_workers.nil?
    diagnostic_event_workers.shutdown
    diagnostic_event_workers.wait_for_termination
  end
  @event_sender.stop if @event_sender.respond_to?(:stop)
end
main_loop(inbox, outbox, flush_workers, diagnostic_event_workers) click to toggle source
# File lib/ldclient-rb/events.rb, line 206
def main_loop(inbox, outbox, flush_workers, diagnostic_event_workers)
  running = true
  while running do
    begin
      message = inbox.pop
      case message
      when EventMessage
        dispatch_event(message.event, outbox)
      when FlushMessage
        trigger_flush(outbox, flush_workers)
      when FlushUsersMessage
        @user_keys.clear
      when DiagnosticEventMessage
        send_and_reset_diagnostics(outbox, diagnostic_event_workers)
      when TestSyncMessage
        synchronize_for_testing(flush_workers, diagnostic_event_workers)
        message.completed
      when StopMessage
        do_shutdown(flush_workers, diagnostic_event_workers)
        running = false
        message.completed
      end
    rescue => e
      Util.log_exception(@config.logger, "Unexpected error in event processor", e)
    end
  end
end
notice_user(user) click to toggle source

Add to the set of users we've noticed, and return true if the user was already known to us.

# File lib/ldclient-rb/events.rb, line 287
def notice_user(user)
  if user.nil? || !user.has_key?(:key)
    true
  else
    known = @user_keys.add(user[:key].to_s)
    @deduplicated_users += 1 if known
    known
  end
end
send_and_reset_diagnostics(outbox, diagnostic_event_workers) click to toggle source
# File lib/ldclient-rb/events.rb, line 335
def send_and_reset_diagnostics(outbox, diagnostic_event_workers)
  return if @diagnostic_accumulator.nil?
  dropped_count = outbox.get_and_clear_dropped_count
  event = @diagnostic_accumulator.create_periodic_event_and_reset(dropped_count, @deduplicated_users, @events_in_last_batch)
  @deduplicated_users = 0
  @events_in_last_batch = 0
  send_diagnostic_event(event, diagnostic_event_workers)
end
send_diagnostic_event(event, diagnostic_event_workers) click to toggle source
# File lib/ldclient-rb/events.rb, line 344
def send_diagnostic_event(event, diagnostic_event_workers)
  return if diagnostic_event_workers.nil?
  uri = URI(@config.events_uri + "/diagnostic")
  diagnostic_event_workers.post do
    begin
      @event_sender.send_event_data(event.to_json, "diagnostic event", true)
    rescue => e
      Util.log_exception(@config.logger, "Unexpected error in event processor", e)
    end
  end
end
should_debug_event(event) click to toggle source
# File lib/ldclient-rb/events.rb, line 297
def should_debug_event(event)
  debug_until = event[:debugEventsUntilDate]
  if !debug_until.nil?
    last_past = @last_known_past_time.value
    debug_until > last_past && debug_until > Impl::Util.current_time_millis
  else
    false
  end
end
synchronize_for_testing(flush_workers, diagnostic_event_workers) click to toggle source
# File lib/ldclient-rb/events.rb, line 244
def synchronize_for_testing(flush_workers, diagnostic_event_workers)
  # Used only by unit tests. Wait until all active flush workers have finished.
  flush_workers.wait_all
  diagnostic_event_workers.wait_all if !diagnostic_event_workers.nil?
end
trigger_flush(outbox, flush_workers) click to toggle source
# File lib/ldclient-rb/events.rb, line 307
def trigger_flush(outbox, flush_workers)
  if @disabled.value
    return
  end

  payload = outbox.get_payload  
  if !payload.events.empty? || !payload.summary.counters.empty?
    count = payload.events.length + (payload.summary.counters.empty? ? 0 : 1)
    @events_in_last_batch = count
    # If all available worker threads are busy, success will be false and no job will be queued.
    success = flush_workers.post do
      begin
        events_out = @formatter.make_output_events(payload.events, payload.summary)
        result = @event_sender.send_event_data(events_out.to_json, "#{events_out.length} events", false)
        @disabled.value = true if result.must_shutdown
        if !result.time_from_server.nil?
          @last_known_past_time.value = (result.time_from_server.to_f * 1000).to_i
        end
      rescue => e
        Util.log_exception(@config.logger, "Unexpected error in event processor", e)
      end
    end
    outbox.clear if success # Reset our internal state, these events now belong to the flush worker
  else
    @events_in_last_batch = 0
  end
end