class RSpec::MultiprocessRunner::FileCoordinator
Constants
- COMMAND_FILE
- COMMAND_FINISHED
- COMMAND_PROCESS
- COMMAND_RESULTS
- COMMAND_START
Attributes
failed_workers[R]
options[RW]
results[R]
Public Class Methods
new(files, options=RSpec::MultiprocessRunner::CommandLineOptions.new)
click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 18 def initialize(files, options=RSpec::MultiprocessRunner::CommandLineOptions.new) @spec_files = [] @results = Set.new @threads = [] @failed_workers = [] self.options = options @spec_files_reference = files.to_set if options.head_node @spec_files = options.use_given_order ? files : sort_files(files) Thread.start { run_tcp_server } @node_socket, head_node_socket = Socket.pair(:UNIX, :STREAM) Thread.start { server_connection_established(head_node_socket) } else count = 100 while @node_socket.nil? do begin @node_socket = TCPSocket.new options.hostname, options.port raise unless start? rescue BadStartStringError @node_socket.close if @node_socket raise rescue @node_socket.close if @node_socket @node_socket = nil raise if count < 0 count -= 1 sleep(6) end end puts end ObjectSpace.define_finalizer( self, proc { @node_socket.close } ) end
Public Instance Methods
finished()
click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 87 def finished if options.head_node if @tcp_server_running @tcp_server_running = false @threads.each(&:join) @spec_files += missing_files.to_a end else @node_socket.puts [COMMAND_FINISHED].to_json end end
get_file()
click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 64 def get_file begin @node_socket.puts [COMMAND_FILE].to_json file = @node_socket.gets.chomp if @spec_files_reference.include? file return file else return nil # Malformed response, assume done, cease function end rescue StandardError => e puts("Got exception #{e} in get_file") return nil # If Error, assume done, cease function end end
missing_files()
click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 56 def missing_files if options.head_node @spec_files_reference - @results.map(&:filename) - @failed_workers.map(&:current_file) - @spec_files else [] end end
remaining_files()
click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 52 def remaining_files @spec_files end
send_results(results)
click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 79 def send_results(results) @node_socket.puts [COMMAND_RESULTS, results].to_json end
send_worker_status(worker)
click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 83 def send_worker_status(worker) @node_socket.puts [COMMAND_PROCESS, worker, Socket.gethostname].to_json end
Private Instance Methods
run_tcp_server()
click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 110 def run_tcp_server server = TCPServer.new options.port @tcp_server_running = true ObjectSpace.define_finalizer( self, proc { server.close } ) while @threads.size < options.max_nodes && @tcp_server_running @threads << Thread.start(server.accept) do |client| server_connection_established(client) if @tcp_server_running end end end
server_connection_established(socket)
click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 121 def server_connection_established(socket) loop do raw_response = socket.gets break unless raw_response command, results, node = JSON.parse(raw_response) if command == COMMAND_START if results == options.run_identifier socket.puts COMMAND_START else socket.puts COMMAND_FINISHED break end elsif command == COMMAND_FILE socket.puts @spec_files.shift elsif command == COMMAND_PROCESS && results @failed_workers << MockWorker.from_json_parse(results, node || "unknown") elsif command == COMMAND_RESULTS && results = results.map { |result| Result.from_json_parse(result) } @results += results elsif command == COMMAND_FINISHED break end end end
sort_files(files)
click to toggle source
Sorting by decreasing size attempts to ensure we don't send the slowest file to a worker right before all the other workers finish and then end up waiting for that one process to finish. In the future it would be nice to log execution time and sort by that.
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 105 def sort_files(files) # #sort_by caches the File.size result so we only call it once per file. files.sort_by { |file| -File.size(file) } end
start?()
click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 146 def start? begin @node_socket.puts [COMMAND_START, options.run_identifier].to_json response = @node_socket.gets response = response.chomp if response raise BadStartStringError if response == COMMAND_FINISHED response == COMMAND_START rescue Errno::EPIPE false end end