class ParallelCucumber::Worker
Attributes
index[R]
Public Class Methods
new(options:, index:, stdout_logger:, manager:)
click to toggle source
# File lib/parallel_cucumber/worker.rb, line 9 def initialize(options:, index:, stdout_logger:, manager:) @group_by = options[:group_by] @batch_timeout = options[:batch_timeout] @batch_error_timeout = options[:batch_error_timeout] @setup_timeout = options[:setup_timeout] @cucumber_options = options[:cucumber_options] @test_command = options[:test_command] @index = index @name = "W#{@index}" @setup_worker = options[:setup_worker] @teardown_worker = options[:teardown_worker] @worker_delay = options[:worker_delay] @debug = options[:debug] @log_decoration = options[:log_decoration] @log_dir = options[:log_dir] @log_file = "#{@log_dir}/worker_#{index}.log" @stdout_logger = stdout_logger # .sync writes only. @is_busy_running_test = false @jobs_queue = Queue.new @manager = manager end
Public Instance Methods
assign_job(instruction)
click to toggle source
# File lib/parallel_cucumber/worker.rb, line 33 def assign_job(instruction) @jobs_queue.enq(instruction) end
autoshutting_file()
click to toggle source
# File lib/parallel_cucumber/worker.rb, line 41 def autoshutting_file file_handle = { log_file: @log_file } def file_handle.write(message) File.open(self[:log_file], 'a') { |f| f << message } rescue => e STDERR.puts "Log failure: #{e} writing '#{message.to_s.chomp}' to #{self[:log_file]}" end def file_handle.close end def file_handle.fsync end def file_handle.path self[:log_file] end file_handle end
busy_running_test?()
click to toggle source
# File lib/parallel_cucumber/worker.rb, line 37 def busy_running_test? @is_busy_running_test && @current_thread.alive? end
parse_results(f, tests)
click to toggle source
# File lib/parallel_cucumber/worker.rb, line 273 def parse_results(f, tests) unless File.file?(f) @logger.error("Results file does not exist: #{f}") return Helper::Cucumber.unknown_result(tests) end json_report = File.read(f) if json_report.empty? @logger.error("Results file is empty: #{f}") return Helper::Cucumber.unknown_result(tests) end Helper::Cucumber.parse_json_report(json_report) rescue => e trace = e.backtrace.join("\n\t").sub("\n\t", ": #{$ERROR_INFO}#{e.class ? " (#{e.class})" : ''}\n\t") @logger.error("Threw: JSON parse of results caused #{trace}") Helper::Cucumber.unknown_result(tests) end
process_results(batch_results, tests)
click to toggle source
# File lib/parallel_cucumber/worker.rb, line 170 def process_results(batch_results, tests) batch_keys = batch_results.keys test_syms = tests.map(&:to_sym) unrun = test_syms - batch_keys surfeit = batch_keys - test_syms unrun.each { |test| batch_results[test][:status] = Status::UNKNOWN } surfeit.each { |test| batch_results.delete(test) } @logger.error("Did not run #{unrun.count}/#{tests.count}: #{unrun.join(' ')}") unless unrun.empty? @logger.error("Extraneous runs (#{surfeit.count}): #{surfeit.join(' ')}") unless surfeit.empty? return if surfeit.empty? # Don't see how this can happen, but... @logger.error("Tests/result mismatch: #{tests.count}!=#{batch_results.count}: #{tests}/#{batch_keys}") end
run_batch(env, results, running_total, tests)
click to toggle source
# File lib/parallel_cucumber/worker.rb, line 127 def run_batch(env, results, running_total, tests) @is_busy_running_test = true batch_id = "#{Time.now.to_i}-#{@index}" @logger.debug("Batch ID is #{batch_id}") batch_mm, batch_ss = time_it do begin Hooks.fire_before_batch_hooks(tests, batch_id, env) rescue StandardError => e trace = e.backtrace.join("\n\t") @logger.warn("There was exception in before_batch hook #{e.message} \n #{trace}") end batch_results = test_batch(batch_id, env, running_total, tests) begin Hooks.fire_after_batch_hooks(batch_results, batch_id, env) rescue StandardError => e trace = e.backtrace.join("\n\t") @logger.warn("There was exception in after_batch hook #{e.message} \n #{trace}") end process_results(batch_results, tests) running_totals(batch_results, running_total) results.merge!(batch_results) @is_busy_running_test = false end ensure @logger.debug("Batch #{batch_id} took #{batch_mm} minutes #{batch_ss} seconds") @logger.update_into(@stdout_logger) end
running_totals(batch_results, running_total)
click to toggle source
# File lib/parallel_cucumber/worker.rb, line 157 def running_totals(batch_results, running_total) batch_info = Status.constants.map do |status| status = Status.const_get(status) [status, batch_results.select { |_t, s| s[:status] == status }.keys] end.to_h batch_info.each do |s, tt| @logger.info("#{s.to_s.upcase} #{tt.count} tests: #{tt.join(' ')}") unless tt.empty? running_total[s] += tt.count unless tt.empty? end running_total[:batches] += 1 @logger.info(running_total.sort.to_s + ' t=' + Time.now.to_s) end
setup(env)
click to toggle source
# File lib/parallel_cucumber/worker.rb, line 256 def setup(env) return unless @setup_worker mm, ss = time_it do @logger.info('Setup running') begin Helper::Command.exec_command(env, 'setup', @setup_worker, @logger, @log_decoration, timeout: @setup_timeout) rescue @logger.warn("Setup failed: #{@index} quitting immediately") raise 'Setup failed: quitting immediately' end end ensure @logger.debug("Setup took #{mm} minutes #{ss} seconds") @logger.update_into(@stdout_logger) end
start(env)
click to toggle source
# File lib/parallel_cucumber/worker.rb, line 63 def start(env) @current_thread = Thread.current @manager.inform_idle(@name) env = env.dup.merge!('WORKER_LOG' => @log_file) File.delete(@log_file) if File.exist?(@log_file) @logger = ParallelCucumber::CustomLogger.new(autoshutting_file) @logger.progname = "Worker-#{@index}" @logger.level = @debug ? ParallelCucumber::CustomLogger::DEBUG : ParallelCucumber::CustomLogger::INFO results = {} begin @logger.info("Logging to #{@log_file}") unless @worker_delay.zero? @logger.info("Waiting #{@worker_delay * @index} seconds before start") sleep(@worker_delay * @index) end @logger.debug(<<-LOG) Additional environment variables: #{env.map { |k, v| "#{k}=#{v}" }.join(' ')} LOG @logger.update_into(@stdout_logger) # TODO: Replace running total with queues for passed, failed, unknown, skipped. running_total = Hash.new(0) # Default new keys to 0 running_total[:group] = env[@group_by] if @group_by begin setup(env) loop_mm, loop_ss = time_it do loop do job = @jobs_queue.pop(false) case job.type when Job::PRECHECK Hooks.fire_worker_health_check(env) @manager.inform_healthy(@name) when Job::RUN_TESTS run_batch(env, results, running_total, job.details) @manager.inform_idle(@name) when Job::DIE break else raise("Invalid job #{job.inspect}") end end end @logger.debug("Loop took #{loop_mm} minutes #{loop_ss} seconds") @logger.update_into(@stdout_logger) rescue StandardError => e trace = e.backtrace.join("\n\t").sub("\n\t", ": #{$ERROR_INFO}#{e.class ? " (#{e.class})" : ''}\n\t") @logger.error("Threw: #{e.inspect} #{trace}") ensure results[":worker-#{@index}"] = running_total teardown(env) end ensure @logger.update_into(@stdout_logger) end results end
teardown(env)
click to toggle source
# File lib/parallel_cucumber/worker.rb, line 238 def teardown(env) return unless @teardown_worker mm, ss = time_it do @logger.info("\nTeardown running at #{Time.now}\n") begin Helper::Command.exec_command( env, 'teardown', @teardown_worker, @logger, @log_decoration, timeout: @setup_timeout ) rescue @logger.warn('Teardown finished with error') end end ensure @logger.debug("Teardown took #{mm} minutes #{ss} seconds") @logger.update_into(@stdout_logger) end
test_batch(batch_id, env, running_total, tests)
click to toggle source
# File lib/parallel_cucumber/worker.rb, line 184 def test_batch(batch_id, env, running_total, tests) # Prefer /tmp to Mac's brain-dead /var/folders/y8/8kqjszcs2slchjx2z5lrw2t80000gp/T/w-1497514590-0 nonsense prefer_tmp = ENV.fetch('PREFER_TMP', Dir.tmpdir) test_batch_dir = "#{Dir.exist?(prefer_tmp) ? prefer_tmp : Dir.tmpdir}/w-#{batch_id}" FileUtils.rm_rf(test_batch_dir) FileUtils.mkpath(test_batch_dir) test_state = "#{test_batch_dir}/test_state.json" cmd = "#{@test_command} --format ParallelCucumber::Helper::Cucumber::JsonStatusFormatter --out #{test_state} #{@cucumber_options} " batch_env = { :TEST_BATCH_ID.to_s => batch_id, :TEST_BATCH_DIR.to_s => test_batch_dir, :BATCH_NUMBER.to_s => running_total[:batches].to_s }.merge(env) mapped_batch_cmd, file_map = Helper::Cucumber.batch_mapped_files(cmd, test_batch_dir, batch_env) file_map.each { |_user, worker| FileUtils.mkpath(worker) if worker =~ %r{\/$} } mapped_batch_cmd += ' ' + tests.join(' ') begin ParallelCucumber::Helper::Command.exec_command( batch_env, 'batch', mapped_batch_cmd, @logger, @log_decoration, timeout: @batch_timeout, capture: true, return_script_error: true, return_on_timeout: true, collect_stacktrace: true ) rescue => e @logger << "ERROR #{e} #{e.backtrace.first(5)}" begin Hooks.fire_on_batch_error(tests: tests, batch_id: batch_id, env: batch_env, exception: e) rescue StandardError => exc trace = exc.backtrace.join("\n\t") @logger.warn("There was exception in on_batch_error hook #{exc.message} \n #{trace}") end return Helper::Cucumber.unknown_result(tests) end parse_results(test_state, tests) ensure Helper::Command.wrap_block(@log_decoration, "file copy #{Time.now}", @logger) do # Copy files we might have renamed or moved file_map.each do |user, worker| next if worker == user Helper::Processes.cp_rv(worker, user, @logger) end @logger << "\nCopied files in map: #{file_map.first(5)}...#{file_map.count} #{Time.now}\n" # Copy everything else too, in case it's interesting. Helper::Processes.cp_rv("#{test_batch_dir}/*", @log_dir, @logger) @logger << "\nCopied everything else #{Time.now} #{Time.now}\n" end @logger.update_into(@stdout_logger) FileUtils.rm_rf(test_batch_dir) @logger << "\nRemoved all files #{Time.now}\n" # Tracking down 30 minute pause! @logger.update_into(@stdout_logger) end