class Sqreen::Kit::Signals::BatchCollector::ProcessingLoop

Public Class Methods

new(collector) click to toggle source

@param [BatchCollector] collector

# File lib/sqreen/kit/signals/batch_collector.rb, line 66
def initialize(collector)
  @collector = collector
  @next_batch = []
  @deadline = nil
end

Public Instance Methods

flush_size() click to toggle source
# File lib/sqreen/kit/signals/batch_collector.rb, line 80
def flush_size
  @collector.flush_size
end
max_batch_size() click to toggle source
# File lib/sqreen/kit/signals/batch_collector.rb, line 76
def max_batch_size
  @collector.max_batch_size
end
queue() click to toggle source
# File lib/sqreen/kit/signals/batch_collector.rb, line 72
def queue
  @collector.queue
end
run() click to toggle source
# File lib/sqreen/kit/signals/batch_collector.rb, line 84
def run
  while run_loop_once; end
  logger.info 'Collector thread exiting'
end

Private Instance Methods

run_loop_once() click to toggle source
# File lib/sqreen/kit/signals/batch_collector.rb, line 91
def run_loop_once
  el = queue.pop(@deadline)
  if el.nil? # deadline passed
    submit
  elsif el.equal?(EXIT_SENTINEL)
    return false
  else
    # a signal or a trace
    if @next_batch.empty?
      # first object, set a deadline
      @deadline = Time.now.to_f + @collector.max_delay_s
    end
    @next_batch << el
    # drain the queue completely
    until @next_batch.size >= max_batch_size || (el = queue.pop_nb).nil?
      if el.equal?(EXIT_SENTINEL)
        queue << EXIT_SENTINEL # push it back
        break
      end
      @next_batch << el
    end

    submit if @next_batch.size >= flush_size
  end

  true
end
submit() click to toggle source
# File lib/sqreen/kit/signals/batch_collector.rb, line 119
def submit
  logger.debug { "Batch submit. Batch size: #{@next_batch.size}" }
  @deadline = nil
  return if @next_batch.empty?
  cur_batch = @next_batch
  @next_batch = []
  @collector.auth_sig_client.report_batch(cur_batch)
end