class ParallelCucumber::WorkerManager
Public Class Methods
new(options, logger)
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 3 def initialize(options, logger) @options = options @batch_size = options[:batch_size] @logger = logger @queue_connection_params = options[:queue_connection_params] @backlog = ParallelCucumber::Helper::Queue.new(@queue_connection_params) @queue_tracker = Tracker.new(@backlog) @back_up_worker_size = options[:backup_worker_count] @directed_queues = Hash.new do |hash, key| hash[key] = ParallelCucumber::Helper::Queue.new(@queue_connection_params, "_#{key}") end @workers = {} @unchecked_workers = Queue.new @healthy_workers = Queue.new end
Public Instance Methods
inform_healthy(worker)
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 29 def inform_healthy(worker) @healthy_workers.enq(worker) end
inform_idle(worker)
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 33 def inform_idle(worker) @unchecked_workers.enq(worker) end
kill()
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 25 def kill @current_thread.kill end
start(number_of_workers)
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 19 def start(number_of_workers) create_workers(number_of_workers) start_managing start_workers end
Private Instance Methods
any_worker_busy?()
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 117 def any_worker_busy? @workers.values.any?(&:busy_running_test?) end
create_workers(number_of_workers)
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 39 def create_workers(number_of_workers) number_of_workers.times do |index| @workers["W#{index}"] = ParallelCucumber::Worker.new(options: @options, index: index, stdout_logger: @logger, manager: self) end end
env_for_worker(env_variables, worker_number)
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 121 def env_for_worker(env_variables, worker_number) env = env_variables.map do |k, v| case v when String, Numeric, TrueClass, FalseClass [k, v] when Array [k, v[worker_number]] when Hash value = v[worker_number.to_s] [k, value] unless value.nil? when NilClass else raise("Don't know how to set '#{v}'<#{v.class}> to the environment variable '#{k}'") end end.compact.to_h # Defaults, if absent in env. Shame 'merge' isn't something non-commutative like 'adopts/defaults'. env = { TEST: 1, TEST_PROCESS_NUMBER: worker_number, WORKER_INDEX: worker_number }.merge(env) # Overwrite this if it exists in env. env.merge(PARALLEL_CUCUMBER_EXPORTS: env.keys.join(',')).map { |k, v| [k.to_s, v.to_s] }.to_h end
give_job_to_healthy_worker()
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 102 def give_job_to_healthy_worker return if @healthy_workers.empty? worker_name = @healthy_workers.pop(true) worker = @workers[worker_name] batch = [] directed_queue = @directed_queues[worker.index] @batch_size.times do batch << (directed_queue.empty? ? @backlog : directed_queue).dequeue end batch.compact! @logger.info("=== #{worker_name} was assigned #{batch.count} from the queue (#{@queue_tracker.status}): #{batch.join(' ')}") worker.assign_job(Job.new(Job::RUN_TESTS, batch)) end
kill_all_workers()
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 81 def kill_all_workers @logger.info('=== Killing All Workers') @workers.values.each { |w| w.assign_job(Job.new(Job::DIE)) } end
kill_surplus_workers()
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 86 def kill_surplus_workers until (@unchecked_workers.size + @healthy_workers.size) <= @back_up_worker_size queue = !@unchecked_workers.empty? ? @unchecked_workers : @healthy_workers worker = queue.pop(true) @logger.info("Backup workers more than #{@back_up_worker_size}, killing #{worker}") @workers[worker].assign_job(Job.new(Job::DIE)) end end
pre_check_unchecked_workers()
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 95 def pre_check_unchecked_workers while !@unchecked_workers.empty? && worker = @unchecked_workers.pop(false) @logger.info("=== #{worker} was asked precheck") @workers[worker].assign_job(Job.new(Job::PRECHECK)) end end
start_managing()
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 46 def start_managing @current_thread = Thread.start do loop do if !@backlog.empty? pre_check_unchecked_workers give_job_to_healthy_worker elsif any_worker_busy? kill_surplus_workers else break end sleep 0.5 end rescue StandardError => e puts "There was a FATAL ERROR with worker manager. #{e}" raise e ensure kill_all_workers end end
start_workers()
click to toggle source
# File lib/parallel_cucumber/worker_manager.rb, line 67 def start_workers indices = (0...@workers.size).to_a @results = Parallel.map(indices.dup, in_threads: @workers.size, finish: ->(_, ix, _) { @logger.synch { |l| l.info("Finished: #{ix} remaining: #{indices -= [ix]}") } }) do |index| puts "Starting W#{index}" @workers["W#{index}"].start(env_for_worker(@options[:env_variables], index)) end @results.inject do |seed, result| seed.merge(result) do |_key, oldval, newval| (newval[:finish_time] > oldval[:finish_time]) ? newval : oldval end end end