class LaunchDarkly::EventDispatcher
@private
Public Class Methods
new(inbox, sdk_key, config, diagnostic_accumulator, event_sender)
click to toggle source
# File lib/ldclient-rb/events.rb, line 177 def initialize(inbox, sdk_key, config, diagnostic_accumulator, event_sender) @sdk_key = sdk_key @config = config @diagnostic_accumulator = config.diagnostic_opt_out? ? nil : diagnostic_accumulator @event_sender = event_sender @user_keys = SimpleLRUCacheSet.new(config.user_keys_capacity) @formatter = EventOutputFormatter.new(config) @disabled = Concurrent::AtomicBoolean.new(false) @last_known_past_time = Concurrent::AtomicReference.new(0) @deduplicated_users = 0 @events_in_last_batch = 0 outbox = EventBuffer.new(config.capacity, config.logger) flush_workers = NonBlockingThreadPool.new(MAX_FLUSH_WORKERS) if !@diagnostic_accumulator.nil? diagnostic_event_workers = NonBlockingThreadPool.new(1) init_event = @diagnostic_accumulator.create_init_event(config) send_diagnostic_event(init_event, diagnostic_event_workers) else diagnostic_event_workers = nil end Thread.new { main_loop(inbox, outbox, flush_workers, diagnostic_event_workers) } end
Private Instance Methods
dispatch_event(event, outbox)
click to toggle source
# File lib/ldclient-rb/events.rb, line 250 def dispatch_event(event, outbox) return if @disabled.value # Always record the event in the summary. outbox.add_to_summary(event) # Decide whether to add the event to the payload. Feature events may be added twice, once for # the event (if tracked) and once for debugging. will_add_full_event = false debug_event = nil if event[:kind] == "feature" will_add_full_event = event[:trackEvents] if should_debug_event(event) debug_event = event.clone debug_event[:debug] = true end else will_add_full_event = true end # For each user we haven't seen before, we add an index event - unless this is already # an identify event for that user. if !(will_add_full_event && @config.inline_users_in_events) if event.has_key?(:user) && !notice_user(event[:user]) && event[:kind] != "identify" outbox.add_event({ kind: "index", creationDate: event[:creationDate], user: event[:user] }) end end outbox.add_event(event) if will_add_full_event outbox.add_event(debug_event) if !debug_event.nil? end
do_shutdown(flush_workers, diagnostic_event_workers)
click to toggle source
# File lib/ldclient-rb/events.rb, line 234 def do_shutdown(flush_workers, diagnostic_event_workers) flush_workers.shutdown flush_workers.wait_for_termination if !diagnostic_event_workers.nil? diagnostic_event_workers.shutdown diagnostic_event_workers.wait_for_termination end @event_sender.stop if @event_sender.respond_to?(:stop) end
main_loop(inbox, outbox, flush_workers, diagnostic_event_workers)
click to toggle source
# File lib/ldclient-rb/events.rb, line 206 def main_loop(inbox, outbox, flush_workers, diagnostic_event_workers) running = true while running do begin message = inbox.pop case message when EventMessage dispatch_event(message.event, outbox) when FlushMessage trigger_flush(outbox, flush_workers) when FlushUsersMessage @user_keys.clear when DiagnosticEventMessage send_and_reset_diagnostics(outbox, diagnostic_event_workers) when TestSyncMessage synchronize_for_testing(flush_workers, diagnostic_event_workers) message.completed when StopMessage do_shutdown(flush_workers, diagnostic_event_workers) running = false message.completed end rescue => e Util.log_exception(@config.logger, "Unexpected error in event processor", e) end end end
notice_user(user)
click to toggle source
Add to the set of users we've noticed, and return true if the user was already known to us.
# File lib/ldclient-rb/events.rb, line 287 def notice_user(user) if user.nil? || !user.has_key?(:key) true else known = @user_keys.add(user[:key].to_s) @deduplicated_users += 1 if known known end end
send_and_reset_diagnostics(outbox, diagnostic_event_workers)
click to toggle source
# File lib/ldclient-rb/events.rb, line 335 def send_and_reset_diagnostics(outbox, diagnostic_event_workers) return if @diagnostic_accumulator.nil? dropped_count = outbox.get_and_clear_dropped_count event = @diagnostic_accumulator.create_periodic_event_and_reset(dropped_count, @deduplicated_users, @events_in_last_batch) @deduplicated_users = 0 @events_in_last_batch = 0 send_diagnostic_event(event, diagnostic_event_workers) end
send_diagnostic_event(event, diagnostic_event_workers)
click to toggle source
# File lib/ldclient-rb/events.rb, line 344 def send_diagnostic_event(event, diagnostic_event_workers) return if diagnostic_event_workers.nil? uri = URI(@config.events_uri + "/diagnostic") diagnostic_event_workers.post do begin @event_sender.send_event_data(event.to_json, "diagnostic event", true) rescue => e Util.log_exception(@config.logger, "Unexpected error in event processor", e) end end end
should_debug_event(event)
click to toggle source
# File lib/ldclient-rb/events.rb, line 297 def should_debug_event(event) debug_until = event[:debugEventsUntilDate] if !debug_until.nil? last_past = @last_known_past_time.value debug_until > last_past && debug_until > Impl::Util.current_time_millis else false end end
synchronize_for_testing(flush_workers, diagnostic_event_workers)
click to toggle source
# File lib/ldclient-rb/events.rb, line 244 def synchronize_for_testing(flush_workers, diagnostic_event_workers) # Used only by unit tests. Wait until all active flush workers have finished. flush_workers.wait_all diagnostic_event_workers.wait_all if !diagnostic_event_workers.nil? end
trigger_flush(outbox, flush_workers)
click to toggle source
# File lib/ldclient-rb/events.rb, line 307 def trigger_flush(outbox, flush_workers) if @disabled.value return end payload = outbox.get_payload if !payload.events.empty? || !payload.summary.counters.empty? count = payload.events.length + (payload.summary.counters.empty? ? 0 : 1) @events_in_last_batch = count # If all available worker threads are busy, success will be false and no job will be queued. success = flush_workers.post do begin events_out = @formatter.make_output_events(payload.events, payload.summary) result = @event_sender.send_event_data(events_out.to_json, "#{events_out.length} events", false) @disabled.value = true if result.must_shutdown if !result.time_from_server.nil? @last_known_past_time.value = (result.time_from_server.to_f * 1000).to_i end rescue => e Util.log_exception(@config.logger, "Unexpected error in event processor", e) end end outbox.clear if success # Reset our internal state, these events now belong to the flush worker else @events_in_last_batch = 0 end end