class RSpec::MultiprocessRunner::FileCoordinator

Constants

COMMAND_FILE
COMMAND_FINISHED
COMMAND_PROCESS
COMMAND_RESULTS
COMMAND_START

Attributes

failed_workers[R]
options[RW]
results[R]

Public Class Methods

new(files, options=RSpec::MultiprocessRunner::CommandLineOptions.new) click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 18
def initialize(files, options=RSpec::MultiprocessRunner::CommandLineOptions.new)
  @spec_files = []
  @results = Set.new
  @threads = []
  @failed_workers = []
  self.options = options
  @spec_files_reference = files.to_set
  if options.head_node
    @spec_files = options.use_given_order ? files : sort_files(files)
    Thread.start { run_tcp_server }
    @node_socket, head_node_socket = Socket.pair(:UNIX, :STREAM)
    Thread.start { server_connection_established(head_node_socket) }
  else
    count = 100
    while @node_socket.nil? do
      begin
        @node_socket = TCPSocket.new options.hostname, options.port
        raise unless start?
      rescue BadStartStringError
        @node_socket.close if @node_socket
        raise
      rescue
        @node_socket.close if @node_socket
        @node_socket = nil
        raise if count < 0
        count -= 1
        sleep(6)
      end
    end
    puts
  end
  ObjectSpace.define_finalizer( self, proc { @node_socket.close } )
end

Public Instance Methods

finished() click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 87
def finished
  if options.head_node
    if @tcp_server_running
     @tcp_server_running = false
     @threads.each(&:join)
     @spec_files += missing_files.to_a
    end
  else
    @node_socket.puts [COMMAND_FINISHED].to_json
  end
end
get_file() click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 64
def get_file
  begin
    @node_socket.puts [COMMAND_FILE].to_json
    file = @node_socket.gets.chomp
    if @spec_files_reference.include? file
      return file
    else
      return nil # Malformed response, assume done, cease function
    end
  rescue StandardError => e
    puts("Got exception #{e} in get_file")
    return nil # If Error, assume done, cease function
  end
end
missing_files() click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 56
def missing_files
  if options.head_node
    @spec_files_reference - @results.map(&:filename) - @failed_workers.map(&:current_file) - @spec_files
  else
    []
  end
end
remaining_files() click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 52
def remaining_files
  @spec_files
end
send_results(results) click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 79
def send_results(results)
  @node_socket.puts [COMMAND_RESULTS, results].to_json
end
send_worker_status(worker) click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 83
def send_worker_status(worker)
  @node_socket.puts [COMMAND_PROCESS, worker, Socket.gethostname].to_json
end

Private Instance Methods

run_tcp_server() click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 110
def run_tcp_server
  server = TCPServer.new options.port
  @tcp_server_running = true
  ObjectSpace.define_finalizer( self, proc { server.close } )
  while @threads.size < options.max_nodes && @tcp_server_running
    @threads << Thread.start(server.accept) do |client|
      server_connection_established(client) if @tcp_server_running
    end
  end
end
server_connection_established(socket) click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 121
def server_connection_established(socket)
  loop do
    raw_response = socket.gets
    break unless raw_response
    command, results, node = JSON.parse(raw_response)
    if command == COMMAND_START
      if results == options.run_identifier
        socket.puts COMMAND_START
      else
        socket.puts COMMAND_FINISHED
        break
      end
    elsif command == COMMAND_FILE
      socket.puts @spec_files.shift
    elsif command == COMMAND_PROCESS && results
      @failed_workers << MockWorker.from_json_parse(results, node || "unknown")
    elsif command == COMMAND_RESULTS && results = results.map { |result|
      Result.from_json_parse(result) }
      @results += results
    elsif command == COMMAND_FINISHED
      break
    end
  end
end
sort_files(files) click to toggle source

Sorting by decreasing size attempts to ensure we don't send the slowest file to a worker right before all the other workers finish and then end up waiting for that one process to finish. In the future it would be nice to log execution time and sort by that.

# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 105
def sort_files(files)
  # #sort_by caches the File.size result so we only call it once per file.
  files.sort_by { |file| -File.size(file) }
end
start?() click to toggle source
# File lib/rspec/multiprocess_runner/file_coordinator.rb, line 146
def start?
  begin
    @node_socket.puts [COMMAND_START, options.run_identifier].to_json
    response = @node_socket.gets
    response = response.chomp if response
    raise BadStartStringError if response == COMMAND_FINISHED
    response == COMMAND_START
  rescue Errno::EPIPE
    false
  end
end