class TestQueue::Runner
Constants
- TOKEN_REGEX
Attributes
Public Class Methods
# File lib/test_queue/runner.rb, line 36 def initialize(test_framework, concurrency=nil, socket=nil, relay=nil) @test_framework = test_framework @stats = Stats.new(stats_file) if ENV['TEST_QUEUE_EARLY_FAILURE_LIMIT'] begin @early_failure_limit = Integer(ENV['TEST_QUEUE_EARLY_FAILURE_LIMIT']) rescue ArgumentError raise ArgumentError, 'TEST_QUEUE_EARLY_FAILURE_LIMIT could not be parsed as an integer' end end @procline = $0 @whitelist = if forced = ENV['TEST_QUEUE_FORCE'] forced.split(/\s*,\s*/) else [] end @whitelist.freeze all_files = @test_framework.all_suite_files.to_set @queue = @stats.all_suites .select { |suite| all_files.include?(suite.path) } .sort_by { |suite| -suite.duration } .map { |suite| [suite.name, suite.path] } if @whitelist.any? @queue.select! { |suite_name, path| @whitelist.include?(suite_name) } @queue.sort_by! { |suite_name, path| @whitelist.index(suite_name) } end @awaited_suites = Set.new(@whitelist) @original_queue = Set.new(@queue).freeze @workers = {} @completed = [] @concurrency = concurrency || (ENV['TEST_QUEUE_WORKERS'] && ENV['TEST_QUEUE_WORKERS'].to_i) || if File.exist?('/proc/cpuinfo') File.read('/proc/cpuinfo').split("\n").grep(/processor/).size elsif RUBY_PLATFORM =~ /darwin/ `/usr/sbin/sysctl -n hw.activecpu`.to_i else 2 end unless @concurrency > 0 raise ArgumentError, "Worker count (#{@concurrency}) must be greater than 0" end @relay_connection_timeout = (ENV['TEST_QUEUE_RELAY_TIMEOUT'] && ENV['TEST_QUEUE_RELAY_TIMEOUT'].to_i) || 30 @run_token = ENV['TEST_QUEUE_RELAY_TOKEN'] || SecureRandom.hex(8) @socket = socket || ENV['TEST_QUEUE_SOCKET'] || "/tmp/test_queue_#{$$}_#{object_id}.sock" @relay = relay || ENV['TEST_QUEUE_RELAY'] @remote_master_message = ENV["TEST_QUEUE_REMOTE_MASTER_MESSAGE"] if ENV.has_key?("TEST_QUEUE_REMOTE_MASTER_MESSAGE") if @relay == @socket STDERR.puts "*** Detected TEST_QUEUE_RELAY == TEST_QUEUE_SOCKET. Disabling relay mode." @relay = nil elsif @relay @queue = [] end @discovered_suites = Set.new @assignments = {} @exit_when_done = true @aborting = false end
Public Instance Methods
Stop the test run immediately.
message - String message to print to the console when exiting.
Doesn't return.
# File lib/test_queue/runner.rb, line 606 def abort(message) @aborting = true kill_subprocesses Kernel::abort("Aborting: #{message}") end
Prepare a worker for executing jobs after a fork.
# File lib/test_queue/runner.rb, line 410 def after_fork(num) end
# File lib/test_queue/runner.rb, line 382 def after_fork_internal(num, iterator) srand output = File.open("/tmp/test_queue_worker_#{$$}_output", 'w') $stdout.reopen(output) $stderr.reopen($stdout) $stdout.sync = $stderr.sync = true $0 = "test-queue worker [#{num}]" puts puts "==> Starting #$0 (#{Process.pid} on #{Socket.gethostname}) - iterating over #{iterator.sock}" puts after_fork(num) end
# File lib/test_queue/runner.rb, line 405 def around_filter(suite) yield end
# File lib/test_queue/runner.rb, line 340 def awaiting_suites? case when @awaited_suites.any? # We're waiting to find all the whitelisted suites so we can run them # in the correct order. true when @queue.empty? && !!@discovering_suites_pid # We don't have any suites yet, but we're working on it. true else # It's fine to run any queued suites now. false end end
# File lib/test_queue/runner.rb, line 425 def cleanup_worker end
# File lib/test_queue/runner.rb, line 450 def collect_worker_data(worker) if File.exist?(file = "/tmp/test_queue_worker_#{worker.pid}_output") worker.output = IO.binread(file) FileUtils.rm(file) end if File.exist?(file = "/tmp/test_queue_worker_#{worker.pid}_suites") worker.suites.replace(Marshal.load(IO.binread(file))) FileUtils.rm(file) end end
# File lib/test_queue/runner.rb, line 545 def connect_to_relay sock = nil start = Time.now puts "Attempting to connect for #{@relay_connection_timeout}s..." while sock.nil? begin sock = TCPSocket.new(*@relay.split(':')) rescue Errno::ECONNREFUSED => e raise e if Time.now - start > @relay_connection_timeout puts "Master not yet available, sleeping..." sleep 0.5 end end sock end
# File lib/test_queue/runner.rb, line 310 def discover_suites # Remote masters don't discover suites; the central master does and # distributes them to remote masters. return if relay? # No need to discover suites if all whitelisted suites are already # queued. return if @whitelist.any? && @awaited_suites.empty? @discovering_suites_pid = fork do terminate = false Signal.trap("INT") { terminate = true } $0 = "test-queue suite discovery process" @test_framework.all_suite_files.each do |path| @test_framework.suites_from_file(path).each do |suite_name, suite| Kernel.exit!(0) if terminate @server.connect_address.connect do |sock| sock.puts("TOKEN=#{@run_token}") sock.puts("NEW SUITE #{Marshal.dump([suite_name, path])}") end end end Kernel.exit! 0 end end
# File lib/test_queue/runner.rb, line 468 def distribute_queue return if relay? remote_workers = 0 until !awaiting_suites? && @queue.empty? && remote_workers == 0 queue_status(@start_time, @queue.size, @workers.size, remote_workers) if status = reap_suite_discovery_process(false) abort("Discovering suites failed.") unless status.success? abort("Failed to discover #{@awaited_suites.sort.join(", ")} specified in TEST_QUEUE_FORCE") if @awaited_suites.any? end if IO.select([@server], nil, nil, 0.1).nil? reap_workers(false) # check for worker deaths else sock = @server.accept token = sock.gets.strip cmd = sock.gets.strip token = token[TOKEN_REGEX, 1] # If we have a remote master from a different test run, respond with "WRONG RUN", and it will consider the test run done. if token != @run_token message = token.nil? ? "Worker sent no token to master" : "Worker from run #{token} connected to master" STDERR.puts "*** #{message} for run #{@run_token}; ignoring." sock.write("WRONG RUN\n") next end case cmd when /^POP (\S+) (\d+)/ hostname = $1 pid = Integer($2) if awaiting_suites? sock.write(Marshal.dump("WAIT")) elsif obj = @queue.shift data = Marshal.dump(obj) sock.write(data) @assignments[obj] = [hostname, pid] end when /^REMOTE MASTER (\d+) ([\w\.-]+)(?: (.+))?/ num = $1.to_i remote_master = $2 remote_master_message = $3 sock.write("OK\n") remote_workers += num message = "*** #{num} workers connected from #{remote_master} after #{Time.now-@start_time}s" message << " " + remote_master_message if remote_master_message STDERR.puts message when /^WORKER (\d+)/ data = sock.read($1.to_i) worker = Marshal.load(data) worker_completed(worker) remote_workers -= 1 when /^NEW SUITE (.+)/ suite_name, path = Marshal.load($1) enqueue_discovered_suite(suite_name, path) when /^KABOOM/ # worker reporting an abnormal number of test failures; # stop everything immediately and report the results. break else STDERR.puts("Ignoring unrecognized command: \"#{cmd}\"") end sock.close end end ensure stop_master reap_workers end
# File lib/test_queue/runner.rb, line 355 def enqueue_discovered_suite(suite_name, path) if @whitelist.any? && !@whitelist.include?(suite_name) return end @discovered_suites << [suite_name, path] if @original_queue.include?([suite_name, path]) # This suite was already added to the queue some other way. @awaited_suites.delete(suite_name) return end # We don't know how long new suites will take to run, so we put them at # the front of the queue. It's better to run a fast suite early than to # run a slow suite late. @queue.unshift [suite_name, path] if @awaited_suites.delete?(suite_name) && @awaited_suites.empty? # We've found all the whitelisted suites. Sort the queue to match the # whitelist. @queue.sort_by! { |suite_name, path| @whitelist.index(suite_name) } kill_suite_discovery_process("INT") end end
Run the tests.
If exit_when_done
is true, exit! will be called before this method completes. If exit_when_done
is false, this method will return an Integer number of failures.
# File lib/test_queue/runner.rb, line 125 def execute $stdout.sync = $stderr.sync = true @start_time = Time.now execute_internal exitstatus = summarize_internal if exit_when_done exit! exitstatus else exitstatus end end
# File lib/test_queue/runner.rb, line 232 def execute_internal start_master prepare(@concurrency) @prepared_time = Time.now start_relay if relay? discover_suites spawn_workers distribute_queue ensure stop_master kill_subprocesses end
# File lib/test_queue/runner.rb, line 573 def kill_subprocesses kill_workers kill_suite_discovery_process end
# File lib/test_queue/runner.rb, line 586 def kill_suite_discovery_process(signal="KILL") return unless @discovering_suites_pid Process.kill signal, @discovering_suites_pid reap_suite_discovery_process end
# File lib/test_queue/runner.rb, line 578 def kill_workers @workers.each do |pid, worker| Process.kill 'KILL', pid end reap_workers end
Run in the master before the fork. Used to create concurrency copies of any databases required by the test workers.
# File lib/test_queue/runner.rb, line 402 def prepare(concurrency) end
Subclasses can override to monitor the status of the queue.
For example, you may want to record metrics about how quickly remote workers connect, or abort the build if not enough connect.
This method is called very frequently during the test run, so don't do anything expensive/blocking.
This method is not called on remote masters when using remote workers, only on the central master.
start_time - Time when the test run began queue_size - Integer number of suites left in the queue local_worker_count - Integer number of active local workers remote_worker_count - Integer number of active remote workers
Returns nothing.
# File lib/test_queue/runner.rb, line 629 def queue_status(start_time, queue_size, local_worker_count, remote_worker_count) end
# File lib/test_queue/runner.rb, line 592 def reap_suite_discovery_process(blocking=true) return unless @discovering_suites_pid _, status = Process.waitpid2(@discovering_suites_pid, blocking ? 0 : Process::WNOHANG) return unless status @discovering_suites_pid = nil status end
# File lib/test_queue/runner.rb, line 433 def reap_workers(blocking=true) @workers.delete_if do |_, worker| if Process.waitpid(worker.pid, blocking ? 0 : Process::WNOHANG).nil? next false end worker.status = $? worker.end_time = Time.now collect_worker_data(worker) relay_to_master(worker) if relay? worker_completed(worker) true end end
# File lib/test_queue/runner.rb, line 541 def relay? !!@relay end
# File lib/test_queue/runner.rb, line 561 def relay_to_master(worker) worker.host = Socket.gethostname data = Marshal.dump(worker) sock = connect_to_relay sock.puts("TOKEN=#{@run_token}") sock.puts("WORKER #{data.bytesize}") sock.write(data) ensure sock.close if sock end
Entry point for internal runner implementations. The iterator will yield jobs from the shared queue on the master.
Returns an Integer number of failures.
# File lib/test_queue/runner.rb, line 417 def run_worker(iterator) iterator.each do |item| puts " #{item.inspect}" end return 0 # exit status end
# File lib/test_queue/runner.rb, line 292 def spawn_workers @concurrency.times do |i| num = i+1 pid = fork do @server.close if @server iterator = Iterator.new(@test_framework, relay?? @relay : @socket, method(:around_filter), early_failure_limit: @early_failure_limit, run_token: @run_token) after_fork_internal(num, iterator) ret = run_worker(iterator) || 0 cleanup_worker Kernel.exit! ret end @workers[pid] = Worker.new(pid, num) end end
# File lib/test_queue/runner.rb, line 246 def start_master if !relay? if @socket =~ /^(?:(.+):)?(\d+)$/ address = $1 || '0.0.0.0' port = $2.to_i @socket = "#$1:#$2" @server = TCPServer.new(address, port) else FileUtils.rm(@socket) if File.exist?(@socket) @server = UNIXServer.new(@socket) end end desc = "test-queue master (#{relay?? "relaying to #{@relay}" : @socket})" puts "Starting #{desc}" $0 = "#{desc} - #{@procline}" end
# File lib/test_queue/runner.rb, line 264 def start_relay return unless relay? sock = connect_to_relay message = @remote_master_message ? " #{@remote_master_message}" : "" message.gsub!(/(\r|\n)/, "") # Our "protocol" is newline-separated sock.puts("TOKEN=#{@run_token}") sock.puts("REMOTE MASTER #{@concurrency} #{Socket.gethostname} #{message}") response = sock.gets.strip unless response == "OK" STDERR.puts "*** Got non-OK response from master: #{response}" sock.close exit! 1 end sock.close rescue Errno::ECONNREFUSED STDERR.puts "*** Unable to connect to relay #{@relay}. Aborting.." exit! 1 end
# File lib/test_queue/runner.rb, line 227 def stats_file ENV['TEST_QUEUE_STATS'] || '.test_queue_stats' end
# File lib/test_queue/runner.rb, line 284 def stop_master return if relay? FileUtils.rm_f(@socket) if @socket && @server.is_a?(UNIXServer) @server.close rescue nil if @server @socket = @server = nil end
# File lib/test_queue/runner.rb, line 224 def summarize end
# File lib/test_queue/runner.rb, line 139 def summarize_internal puts puts "==> Summary (#{@completed.size} workers in %.4fs)" % (Time.now-@start_time) puts estatus = 0 misrun_suites = [] unassigned_suites = [] @failures = '' @completed.each do |worker| estatus += (worker.status.exitstatus || 1) @stats.record_suites(worker.suites) worker.suites.each do |suite| assignment = @assignments.delete([suite.name, suite.path]) host = worker.host || Socket.gethostname if assignment.nil? unassigned_suites << [suite.name, suite.path] elsif assignment != [host, worker.pid] misrun_suites << [suite.name, suite.path] + assignment + [host, worker.pid] end @discovered_suites.delete([suite.name, suite.path]) end summarize_worker(worker) @failures << worker.failure_output if worker.failure_output puts " [%2d] %60s %4d suites in %.4fs (%s %s)" % [ worker.num, worker.summary, worker.suites.size, worker.end_time - worker.start_time, worker.status.to_s, worker.host && " on #{worker.host.split('.').first}" ] end unless @failures.empty? puts puts "==> Failures" puts puts @failures end if !relay? unless @discovered_suites.empty? estatus += 1 puts puts "The following suites were discovered but were not run:" puts @discovered_suites.sort.each do |suite_name, path| puts "#{suite_name} - #{path}" end end unless unassigned_suites.empty? estatus += 1 puts puts "The following suites were not discovered but were run anyway:" puts unassigned_suites.sort.each do |suite_name, path| puts "#{suite_name} - #{path}" end end unless misrun_suites.empty? estatus += 1 puts puts "The following suites were run on the wrong workers:" puts misrun_suites.each do |suite_name, path, target_host, target_pid, actual_host, actual_pid| puts "#{suite_name} - #{path}: #{actual_host} (#{actual_pid}) - assigned to #{target_host} (#{target_pid})" end end end puts @stats.save summarize estatus = @completed.inject(0){ |s, worker| s + (worker.status.exitstatus || 1)} [estatus, 255].min end
# File lib/test_queue/runner.rb, line 428 def summarize_worker(worker) worker.summary = '' worker.failure_output = '' end
# File lib/test_queue/runner.rb, line 462 def worker_completed(worker) return if @aborting @completed << worker puts worker.output if ENV['TEST_QUEUE_VERBOSE'] || worker.status.exitstatus != 0 end