class ParallelCucumber::WorkerManager

Public Class Methods

new(options, logger) click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 3
def initialize(options, logger)
  @options = options
  @batch_size = options[:batch_size]
  @logger = logger
  @queue_connection_params = options[:queue_connection_params]
  @backlog = ParallelCucumber::Helper::Queue.new(@queue_connection_params)
  @queue_tracker = Tracker.new(@backlog)
  @back_up_worker_size = options[:backup_worker_count]
  @directed_queues = Hash.new do |hash, key|
    hash[key] = ParallelCucumber::Helper::Queue.new(@queue_connection_params, "_#{key}")
  end
  @workers = {}
  @unchecked_workers = Queue.new
  @healthy_workers = Queue.new
end

Public Instance Methods

inform_healthy(worker) click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 29
def inform_healthy(worker)
  @healthy_workers.enq(worker)
end
inform_idle(worker) click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 33
def inform_idle(worker)
  @unchecked_workers.enq(worker)
end
kill() click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 25
def kill
  @current_thread.kill
end
start(number_of_workers) click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 19
def start(number_of_workers)
  create_workers(number_of_workers)
  start_managing
  start_workers
end

Private Instance Methods

any_worker_busy?() click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 117
def any_worker_busy?
  @workers.values.any?(&:busy_running_test?)
end
create_workers(number_of_workers) click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 39
def create_workers(number_of_workers)
  number_of_workers.times do |index|
    @workers["W#{index}"] =
      ParallelCucumber::Worker.new(options: @options, index: index, stdout_logger: @logger, manager: self)
  end
end
env_for_worker(env_variables, worker_number) click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 121
def env_for_worker(env_variables, worker_number)
  env = env_variables.map do |k, v|
    case v
    when String, Numeric, TrueClass, FalseClass
      [k, v]
    when Array
      [k, v[worker_number]]
    when Hash
      value = v[worker_number.to_s]
      [k, value] unless value.nil?
    when NilClass
    else
      raise("Don't know how to set '#{v}'<#{v.class}> to the environment variable '#{k}'")
    end
  end.compact.to_h

  # Defaults, if absent in env. Shame 'merge' isn't something non-commutative like 'adopts/defaults'.
  env = { TEST: 1, TEST_PROCESS_NUMBER: worker_number, WORKER_INDEX: worker_number }.merge(env)

  # Overwrite this if it exists in env.
  env.merge(PARALLEL_CUCUMBER_EXPORTS: env.keys.join(',')).map { |k, v| [k.to_s, v.to_s] }.to_h
end
give_job_to_healthy_worker() click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 102
def give_job_to_healthy_worker
  return if @healthy_workers.empty?

  worker_name = @healthy_workers.pop(true)
  worker = @workers[worker_name]
  batch = []
  directed_queue = @directed_queues[worker.index]
  @batch_size.times do
    batch << (directed_queue.empty? ? @backlog : directed_queue).dequeue
  end
  batch.compact!
  @logger.info("=== #{worker_name} was assigned #{batch.count} from the queue (#{@queue_tracker.status}): #{batch.join(' ')}")
  worker.assign_job(Job.new(Job::RUN_TESTS, batch))
end
kill_all_workers() click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 81
def kill_all_workers
  @logger.info('=== Killing All Workers')
  @workers.values.each { |w| w.assign_job(Job.new(Job::DIE)) }
end
kill_surplus_workers() click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 86
def kill_surplus_workers
  until (@unchecked_workers.size + @healthy_workers.size) <= @back_up_worker_size
    queue = !@unchecked_workers.empty? ? @unchecked_workers : @healthy_workers
    worker = queue.pop(true)
    @logger.info("Backup workers more than #{@back_up_worker_size}, killing #{worker}")
    @workers[worker].assign_job(Job.new(Job::DIE))
  end
end
pre_check_unchecked_workers() click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 95
def pre_check_unchecked_workers
  while !@unchecked_workers.empty? && worker = @unchecked_workers.pop(false)
    @logger.info("=== #{worker} was asked precheck")
    @workers[worker].assign_job(Job.new(Job::PRECHECK))
  end
end
start_managing() click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 46
def start_managing
  @current_thread = Thread.start do
    loop do
      if !@backlog.empty?
        pre_check_unchecked_workers
        give_job_to_healthy_worker
      elsif any_worker_busy?
        kill_surplus_workers
      else
        break
      end
      sleep 0.5
    end
  rescue StandardError => e
    puts "There was a FATAL ERROR with worker manager. #{e}"
    raise e
  ensure
    kill_all_workers
  end
end
start_workers() click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 67
def start_workers
  indices = (0...@workers.size).to_a
  @results = Parallel.map(indices.dup, in_threads: @workers.size,
                          finish: ->(_, ix, _) { @logger.synch { |l| l.info("Finished: #{ix} remaining: #{indices -= [ix]}") } }) do |index|
    puts "Starting W#{index}"
    @workers["W#{index}"].start(env_for_worker(@options[:env_variables], index))
  end
  @results.inject do |seed, result|
    seed.merge(result) do |_key, oldval, newval|
      (newval[:finish_time] > oldval[:finish_time]) ? newval : oldval
    end
  end
end