class XCKnife::StreamParser

Constants

DEFAULT_EXTRAPOLATED_DURATION
MachineAssignment
Options
PartitionWithMachines
ResultStats

Attributes

number_of_shards[R]
relevant_partitions[R]
stats[R]
test_partitions[R]

Public Class Methods

new(number_of_shards, test_partitions, options_for_metapartition: Array.new(test_partitions.size, {}), allow_fewer_shards: false, on_extrapolation: nil) click to toggle source
# File lib/xcknife/stream_parser.rb, line 15
def initialize(number_of_shards, test_partitions, options_for_metapartition: Array.new(test_partitions.size, {}), allow_fewer_shards: false, on_extrapolation: nil)
  @number_of_shards = number_of_shards
  @test_partitions = test_partitions.map(&:to_set)
  @relevant_partitions = test_partitions.flatten.to_set
  @stats = ResultStats.new
  @options_for_metapartition = options_for_metapartition.map { |o| Options::DEFAULT.merge(o) }
  @allow_fewer_shards = allow_fewer_shards
  @on_extrapolation = on_extrapolation
  ResultStats.members.each { |k| @stats[k] = 0 }
end

Public Instance Methods

compute_shards_for_events(historical_events, current_events = nil) click to toggle source
# File lib/xcknife/stream_parser.rb, line 85
def compute_shards_for_events(historical_events, current_events = nil)
  compute_shards_for_partitions(test_time_for_partitions(historical_events, current_events))
end
compute_shards_for_file(historical_filename, current_test_filename = nil) click to toggle source

Parses the output of a xctool json-stream reporter and compute the shards based of that see: github.com/facebook/xctool#included-reporters

@param historical_filename: String the path of the, usually historical, test time performance. @param current_test_filename: [String, nil] = the path of the current test names and targets,

# File lib/xcknife/stream_parser.rb, line 81
def compute_shards_for_file(historical_filename, current_test_filename = nil)
  compute_shards_for_events(parse_json_stream_file(historical_filename), parse_json_stream_file(current_test_filename))
end
compute_shards_for_partitions(test_time_for_partitions) click to toggle source
# File lib/xcknife/stream_parser.rb, line 89
def compute_shards_for_partitions(test_time_for_partitions)
  PartitionResult.new(@stats, split_machines_proportionally(test_time_for_partitions).map do |partition|
    compute_single_shards(partition.number_of_shards, partition.test_time_map, options: partition.options)
  end, test_time_for_partitions)
end
compute_single_shards(number_of_shards, test_time_map, options: Options::DEFAULT) click to toggle source

Computes a 2-aproximation to the optimal partition_time, which is an instance of the Open shop scheduling problem (which is NP-hard) see: en.wikipedia.org/wiki/Open-shop_scheduling

# File lib/xcknife/stream_parser.rb, line 149
def compute_single_shards(number_of_shards, test_time_map, options: Options::DEFAULT)
  raise XCKnife::XCKnifeError, 'There are not enough workers provided' if number_of_shards <= 0
  raise XCKnife::XCKnifeError, 'Cannot shard an empty partition_time' if test_time_map.empty?

  assignements = Array.new(number_of_shards) { MachineAssignment.new(Hash.new { |k, v| k[v] = [] }, 0) }

  list_of_test_target_class_times = []
  test_time_map.each do |test_target, class_times|
    class_times.each do |class_name, duration_in_milliseconds|
      list_of_test_target_class_times << [test_target, class_name, duration_in_milliseconds]
    end
  end

  # This might seem like an uncessary level of indirection, but it allows us to keep
  # logic consistent regardless of the `split_bundles_across_machines` option
  group = list_of_test_target_class_times.group_by do |test_target, class_name, _duration_in_milliseconds|
    options.split_bundles_across_machines ? [test_target, class_name] : test_target
  end

  list_of_test_target_classes_times = group.map do |(test_target, _), classes|
    [
      test_target,
      classes.map { |_test_target, class_name, _duration_in_milliseconds| class_name },
      classes.reduce(0) { |total_duration, (_test_target, _class_name, duration_in_milliseconds)| total_duration + duration_in_milliseconds }
    ]
  end

  list_of_test_target_classes_times.sort_by! { |_test_target, _class_names, duration_in_milliseconds| -duration_in_milliseconds }
  list_of_test_target_classes_times.each do |test_target, class_names, duration_in_milliseconds|
    assignemnt = assignements.min_by(&:total_time)
    assignemnt.test_time_map[test_target].concat class_names
    assignemnt.total_time += duration_in_milliseconds
  end

  if (empty_test_map_assignments = assignements.select { |a| a.test_time_map.empty? }) && !empty_test_map_assignments.empty? && !options.allow_fewer_shards
    test_grouping = options.split_bundles_across_machines ? 'classes' : 'targets'
    raise XCKnife::XCKnifeError, "Too many shards -- #{empty_test_map_assignments.size} of #{number_of_shards} assignments are empty," \
                                 " because there are not enough test #{test_grouping} for that many shards."
  end
  assignements.reject! { |a| a.test_time_map.empty? }

  assignements
end
parse_json_stream_file(filename) click to toggle source

rubocop:enable Metrics/CyclomaticComplexity

# File lib/xcknife/stream_parser.rb, line 194
def parse_json_stream_file(filename)
  return nil if filename.nil?
  return [] unless File.exist?(filename)

  lines = IO.readlines(filename)
  lines.lazy.map { |line| OpenStruct.new(JSON.parse(line)) }
end
split_machines_proportionally(partitions) click to toggle source

rubocop:disable Metrics/CyclomaticComplexity

# File lib/xcknife/stream_parser.rb, line 113
def split_machines_proportionally(partitions)
  total = 0
  partitions.each do |test_time_map|
    each_duration(test_time_map) { |duration_in_milliseconds| total += duration_in_milliseconds }
  end

  used_shards = 0
  assignable_shards = number_of_shards - partitions.size
  partition_with_machines_list = partitions.each_with_index.map do |test_time_map, metapartition|
    options = @options_for_metapartition[metapartition]
    partition_time = 0
    max_shard_count = test_time_map.each_value.map(&:size).reduce(&:+) || 1
    max_shard_count = [max_shard_count, options.max_shard_count].min if options.max_shard_count
    each_duration(test_time_map) { |duration_in_milliseconds| partition_time += duration_in_milliseconds }
    n = [1 + (assignable_shards * partition_time.to_f / total).floor, max_shard_count].min
    used_shards += n
    PartitionWithMachines.new(test_time_map, n, partition_time, max_shard_count, options)
  end

  fifo_with_machines_who_can_use_more_shards = partition_with_machines_list.select { |x| x.number_of_shards < x.max_shard_count }.sort_by(&:partition_time)
  while number_of_shards > used_shards
    if fifo_with_machines_who_can_use_more_shards.empty?
      break if @allow_fewer_shards

      raise XCKnife::XCKnifeError, "There are #{number_of_shards - used_shards} extra machines"
    end
    machine = fifo_with_machines_who_can_use_more_shards.pop
    machine.number_of_shards += 1
    used_shards += 1
    fifo_with_machines_who_can_use_more_shards.unshift(machine) if machine.number_of_shards < machine.max_shard_count
  end
  partition_with_machines_list
end
test_time_for_partitions(historical_events, current_events = nil) click to toggle source
# File lib/xcknife/stream_parser.rb, line 95
def test_time_for_partitions(historical_events, current_events = nil)
  analyzer = EventsAnalyzer.for(current_events, relevant_partitions)
  @stats[:current_total_tests] = analyzer.total_tests
  times_for_target_class = Hash.new { |h, current_target| h[current_target] = Hash.new(0) }
  each_test_event(historical_events) do |target_name, result|
    next unless relevant_partitions.include?(target_name)

    inc_stat :historical_total_tests
    next unless analyzer.test_class?(target_name, result.className)

    times_for_target_class[target_name][result.className] += (result.totalDuration * 1000).ceil
  end

  extrapolate_times_for_current_events(analyzer, times_for_target_class) if current_events
  hash_partitions(times_for_target_class)
end

Private Instance Methods

each_duration(test_time_map) { |duration_in_milliseconds| ... } click to toggle source
# File lib/xcknife/stream_parser.rb, line 208
def each_duration(test_time_map)
  test_time_map.each do |_test_target, class_times|
    class_times.each do |_class_name, duration_in_milliseconds|
      yield(duration_in_milliseconds)
    end
  end
end
extrapolate_times_for_current_events(analyzer, times_for_target_class) click to toggle source

rubocop:disable Metrics/CyclomaticComplexity

# File lib/xcknife/stream_parser.rb, line 217
def extrapolate_times_for_current_events(analyzer, times_for_target_class)
  median_map = {}
  times_for_target_class.each do |test_target, class_times|
    median_map[test_target] = median(class_times.values)
  end

  all_times_for_all_classes = times_for_target_class.values.flat_map(&:values)
  median_of_targets = median(all_times_for_all_classes)
  analyzer.target_class_map.each do |test_target, class_set|
    if times_for_target_class.key?(test_target)
      class_set.each do |clazz|
        next if times_for_target_class[test_target].key?(clazz)

        inc_stat :class_extrapolations
        @on_extrapolation&.call(test_target: test_target, test_class: clazz)
        times_for_target_class[test_target][clazz] = median_map[test_target]
      end
    else
      inc_stat :target_extrapolations
      @on_extrapolation&.call(test_target: test_target, test_class: nil)
      class_set.each do |clazz|
        inc_stat :class_extrapolations
        @on_extrapolation&.call(test_target: test_target, test_class: clazz)
        times_for_target_class[test_target][clazz] = extrapolated_duration(median_of_targets, class_set)
      end
    end
  end
end
extrapolated_duration(median_of_targets, class_set) click to toggle source
# File lib/xcknife/stream_parser.rb, line 247
def extrapolated_duration(median_of_targets, class_set)
  return DEFAULT_EXTRAPOLATED_DURATION if median_of_targets.nil?

  median_of_targets / class_set.size
end
hash_partitions(times) click to toggle source
# File lib/xcknife/stream_parser.rb, line 257
def hash_partitions(times)
  ret = Array.new(test_partitions.size) { {} }
  times.each do |test_target, times_map|
    test_partitions.each_with_index do |partition, i|
      ret[i][test_target] = times_map if partition.include?(test_target)
    end
  end
  ret.each_with_index do |partition, index|
    raise XCKnife::XCKnifeError, "The following partition has no tests: #{test_partitions[index].to_a.inspect}" if partition.empty?
  end
end
inc_stat(name) click to toggle source
# File lib/xcknife/stream_parser.rb, line 204
def inc_stat(name)
  @stats[name] += 1
end
median(array) click to toggle source
# File lib/xcknife/stream_parser.rb, line 253
def median(array)
  array.sort[array.size / 2]
end