class Optimizely::BatchEventProcessor

Constants

DEFAULT_BATCH_INTERVAL
DEFAULT_BATCH_SIZE
DEFAULT_QUEUE_CAPACITY
DEFAULT_TIMEOUT_INTERVAL
FLUSH_SIGNAL
SHUTDOWN_SIGNAL

Attributes

batch_size[R]

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.

current_batch[R]

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.

event_dispatcher[R]

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.

event_queue[R]

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.

flush_interval[R]

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.

started[R]

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.

Public Class Methods

new( event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: nil, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil ) click to toggle source
# File lib/optimizely/event/batch_event_processor.rb, line 38
def initialize(
  event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY),
  event_dispatcher: nil,
  batch_size: DEFAULT_BATCH_SIZE,
  flush_interval: DEFAULT_BATCH_INTERVAL,
  logger: NoOpLogger.new,
  notification_center: nil
)
  @event_queue = event_queue
  @logger = logger
  @event_dispatcher = event_dispatcher || EventDispatcher.new(logger: @logger)
  @batch_size = if (batch_size.is_a? Integer) && positive_number?(batch_size)
                  batch_size
                else
                  @logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.")
                  DEFAULT_BATCH_SIZE
                end
  @flush_interval = if positive_number?(flush_interval)
                      flush_interval
                    else
                      @logger.log(Logger::DEBUG, "Setting to default flush_interval: #{DEFAULT_BATCH_INTERVAL} ms.")
                      DEFAULT_BATCH_INTERVAL
                    end
  @notification_center = notification_center
  @current_batch = []
  @started = false
  @stopped = false
end

Public Instance Methods

flush() click to toggle source
# File lib/optimizely/event/batch_event_processor.rb, line 83
def flush
  @event_queue << FLUSH_SIGNAL
  @wait_mutex.synchronize { @resource.signal }
end
process(user_event) click to toggle source
# File lib/optimizely/event/batch_event_processor.rb, line 88
def process(user_event)
  @logger.log(Logger::DEBUG, "Received userEvent: #{user_event}")

  # if the processor has been explicitly stopped. Don't accept tasks
  if @stopped
    @logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.')
    return
  end

  # start if the processor hasn't been started
  start! unless @started

  begin
    @event_queue.push(user_event, true)
    @wait_mutex.synchronize { @resource.signal }
  rescue => e
    @logger.log(Logger::WARN, 'Payload not accepted by the queue: ' + e.message)
    return
  end
end
start!() click to toggle source
# File lib/optimizely/event/batch_event_processor.rb, line 67
def start!
  if @started == true
    @logger.log(Logger::WARN, 'Service already started.')
    return
  end
  @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
  @logger.log(Logger::INFO, 'Starting scheduler.')
  if @wait_mutex.nil?
    @wait_mutex = Mutex.new
    @resource = ConditionVariable.new
  end
  @thread = Thread.new { run_queue }
  @started = true
  @stopped = false
end
stop!() click to toggle source
# File lib/optimizely/event/batch_event_processor.rb, line 109
def stop!
  return unless @started

  @logger.log(Logger::INFO, 'Stopping scheduler.')
  @event_queue << SHUTDOWN_SIGNAL
  @wait_mutex.synchronize { @resource.signal }
  @thread.join(DEFAULT_TIMEOUT_INTERVAL)
  @started = false
  @stopped = true
end

Private Instance Methods

add_to_batch(user_event) click to toggle source
# File lib/optimizely/event/batch_event_processor.rb, line 191
def add_to_batch(user_event)
  if should_split?(user_event)
    flush_queue!
    @current_batch = []
  end

  # Reset the deadline if starting a new batch.
  @flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty?

  @logger.log(Logger::DEBUG, "Adding user event: #{user_event} to batch.")
  @current_batch << user_event
  return unless @current_batch.length >= @batch_size

  @logger.log(Logger::DEBUG, 'Flushing on max batch size.')
  flush_queue!
end
flush_queue!() click to toggle source
# File lib/optimizely/event/batch_event_processor.rb, line 170
def flush_queue!
  return if @current_batch.empty?

  log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger)
  begin
    @logger.log(
      Logger::INFO,
      'Flushing Queue.'
    )

    @event_dispatcher.dispatch_event(log_event)
    @notification_center&.send_notifications(
      NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
      log_event
    )
  rescue StandardError => e
    @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
  end
  @current_batch = []
end
positive_number?(value) click to toggle source
# File lib/optimizely/event/batch_event_processor.rb, line 228
def positive_number?(value)
  # Returns true if the given value is positive finite number.
  #   false otherwise.
  Helpers::Validator.finite_number?(value) && value.positive?
end
process_queue() click to toggle source
# File lib/optimizely/event/batch_event_processor.rb, line 122
def process_queue
  while @event_queue.length.positive?
    item = @event_queue.pop
    if item == SHUTDOWN_SIGNAL
      @logger.log(Logger::DEBUG, 'Received shutdown signal.')
      return false
    end

    if item == FLUSH_SIGNAL
      @logger.log(Logger::DEBUG, 'Received flush signal.')
      flush_queue!
      next
    end

    add_to_batch(item) if item.is_a? Optimizely::UserEvent
  end
  true
end
run_queue() click to toggle source
# File lib/optimizely/event/batch_event_processor.rb, line 141
def run_queue
  loop do
    if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline
      @logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.')

      break unless process_queue

      flush_queue!
      @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
    end

    break unless process_queue

    # what is the current interval to flush in seconds
    interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) * 0.001

    next unless interval.positive?

    @wait_mutex.synchronize { @resource.wait(@wait_mutex, interval) }
  end
rescue SignalException
  @logger.log(Logger::ERROR, 'Interrupted while processing buffer.')
rescue => e
  @logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}")
ensure
  @logger.log(Logger::INFO, 'Exiting processing loop. Attempting to flush pending events.')
  flush_queue!
end
should_split?(user_event) click to toggle source
# File lib/optimizely/event/batch_event_processor.rb, line 208
def should_split?(user_event)
  return false if @current_batch.empty?

  current_context = @current_batch.last.event_context
  new_context = user_event.event_context

  # Revisions should match
  unless current_context[:revision] == new_context[:revision]
    @logger.log(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.')
    return true
  end

  # Projects should match
  unless current_context[:project_id] == new_context[:project_id]
    @logger.log(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.')
    return true
  end
  false
end