class XCKnife::StreamParser
Constants
- DEFAULT_EXTRAPOLATED_DURATION
- MachineAssignment
- Options
- PartitionWithMachines
- ResultStats
Attributes
number_of_shards[R]
relevant_partitions[R]
stats[R]
test_partitions[R]
Public Class Methods
new(number_of_shards, test_partitions, options_for_metapartition: Array.new(test_partitions.size, {}), allow_fewer_shards: false, on_extrapolation: nil)
click to toggle source
# File lib/xcknife/stream_parser.rb, line 15 def initialize(number_of_shards, test_partitions, options_for_metapartition: Array.new(test_partitions.size, {}), allow_fewer_shards: false, on_extrapolation: nil) @number_of_shards = number_of_shards @test_partitions = test_partitions.map(&:to_set) @relevant_partitions = test_partitions.flatten.to_set @stats = ResultStats.new @options_for_metapartition = options_for_metapartition.map { |o| Options::DEFAULT.merge(o) } @allow_fewer_shards = allow_fewer_shards @on_extrapolation = on_extrapolation ResultStats.members.each { |k| @stats[k] = 0 } end
Public Instance Methods
compute_shards_for_events(historical_events, current_events = nil)
click to toggle source
# File lib/xcknife/stream_parser.rb, line 85 def compute_shards_for_events(historical_events, current_events = nil) compute_shards_for_partitions(test_time_for_partitions(historical_events, current_events)) end
compute_shards_for_file(historical_filename, current_test_filename = nil)
click to toggle source
Parses the output of a xctool json-stream reporter and compute the shards based of that see: github.com/facebook/xctool#included-reporters
@param historical_filename: String the path of the, usually historical, test time performance. @param current_test_filename: [String, nil] = the path of the current test names and targets,
# File lib/xcknife/stream_parser.rb, line 81 def compute_shards_for_file(historical_filename, current_test_filename = nil) compute_shards_for_events(parse_json_stream_file(historical_filename), parse_json_stream_file(current_test_filename)) end
compute_shards_for_partitions(test_time_for_partitions)
click to toggle source
# File lib/xcknife/stream_parser.rb, line 89 def compute_shards_for_partitions(test_time_for_partitions) PartitionResult.new(@stats, split_machines_proportionally(test_time_for_partitions).map do |partition| compute_single_shards(partition.number_of_shards, partition.test_time_map, options: partition.options) end, test_time_for_partitions) end
compute_single_shards(number_of_shards, test_time_map, options: Options::DEFAULT)
click to toggle source
Computes a 2-aproximation to the optimal partition_time, which is an instance of the Open shop scheduling problem (which is NP-hard) see: en.wikipedia.org/wiki/Open-shop_scheduling
# File lib/xcknife/stream_parser.rb, line 149 def compute_single_shards(number_of_shards, test_time_map, options: Options::DEFAULT) raise XCKnife::XCKnifeError, 'There are not enough workers provided' if number_of_shards <= 0 raise XCKnife::XCKnifeError, 'Cannot shard an empty partition_time' if test_time_map.empty? assignements = Array.new(number_of_shards) { MachineAssignment.new(Hash.new { |k, v| k[v] = [] }, 0) } list_of_test_target_class_times = [] test_time_map.each do |test_target, class_times| class_times.each do |class_name, duration_in_milliseconds| list_of_test_target_class_times << [test_target, class_name, duration_in_milliseconds] end end # This might seem like an uncessary level of indirection, but it allows us to keep # logic consistent regardless of the `split_bundles_across_machines` option group = list_of_test_target_class_times.group_by do |test_target, class_name, _duration_in_milliseconds| options.split_bundles_across_machines ? [test_target, class_name] : test_target end list_of_test_target_classes_times = group.map do |(test_target, _), classes| [ test_target, classes.map { |_test_target, class_name, _duration_in_milliseconds| class_name }, classes.reduce(0) { |total_duration, (_test_target, _class_name, duration_in_milliseconds)| total_duration + duration_in_milliseconds } ] end list_of_test_target_classes_times.sort_by! { |_test_target, _class_names, duration_in_milliseconds| -duration_in_milliseconds } list_of_test_target_classes_times.each do |test_target, class_names, duration_in_milliseconds| assignemnt = assignements.min_by(&:total_time) assignemnt.test_time_map[test_target].concat class_names assignemnt.total_time += duration_in_milliseconds end if (empty_test_map_assignments = assignements.select { |a| a.test_time_map.empty? }) && !empty_test_map_assignments.empty? && !options.allow_fewer_shards test_grouping = options.split_bundles_across_machines ? 'classes' : 'targets' raise XCKnife::XCKnifeError, "Too many shards -- #{empty_test_map_assignments.size} of #{number_of_shards} assignments are empty," \ " because there are not enough test #{test_grouping} for that many shards." end assignements.reject! { |a| a.test_time_map.empty? } assignements end
parse_json_stream_file(filename)
click to toggle source
rubocop:enable Metrics/CyclomaticComplexity
# File lib/xcknife/stream_parser.rb, line 194 def parse_json_stream_file(filename) return nil if filename.nil? return [] unless File.exist?(filename) lines = IO.readlines(filename) lines.lazy.map { |line| OpenStruct.new(JSON.parse(line)) } end
split_machines_proportionally(partitions)
click to toggle source
rubocop:disable Metrics/CyclomaticComplexity
# File lib/xcknife/stream_parser.rb, line 113 def split_machines_proportionally(partitions) total = 0 partitions.each do |test_time_map| each_duration(test_time_map) { |duration_in_milliseconds| total += duration_in_milliseconds } end used_shards = 0 assignable_shards = number_of_shards - partitions.size partition_with_machines_list = partitions.each_with_index.map do |test_time_map, metapartition| options = @options_for_metapartition[metapartition] partition_time = 0 max_shard_count = test_time_map.each_value.map(&:size).reduce(&:+) || 1 max_shard_count = [max_shard_count, options.max_shard_count].min if options.max_shard_count each_duration(test_time_map) { |duration_in_milliseconds| partition_time += duration_in_milliseconds } n = [1 + (assignable_shards * partition_time.to_f / total).floor, max_shard_count].min used_shards += n PartitionWithMachines.new(test_time_map, n, partition_time, max_shard_count, options) end fifo_with_machines_who_can_use_more_shards = partition_with_machines_list.select { |x| x.number_of_shards < x.max_shard_count }.sort_by(&:partition_time) while number_of_shards > used_shards if fifo_with_machines_who_can_use_more_shards.empty? break if @allow_fewer_shards raise XCKnife::XCKnifeError, "There are #{number_of_shards - used_shards} extra machines" end machine = fifo_with_machines_who_can_use_more_shards.pop machine.number_of_shards += 1 used_shards += 1 fifo_with_machines_who_can_use_more_shards.unshift(machine) if machine.number_of_shards < machine.max_shard_count end partition_with_machines_list end
test_time_for_partitions(historical_events, current_events = nil)
click to toggle source
# File lib/xcknife/stream_parser.rb, line 95 def test_time_for_partitions(historical_events, current_events = nil) analyzer = EventsAnalyzer.for(current_events, relevant_partitions) @stats[:current_total_tests] = analyzer.total_tests times_for_target_class = Hash.new { |h, current_target| h[current_target] = Hash.new(0) } each_test_event(historical_events) do |target_name, result| next unless relevant_partitions.include?(target_name) inc_stat :historical_total_tests next unless analyzer.test_class?(target_name, result.className) times_for_target_class[target_name][result.className] += (result.totalDuration * 1000).ceil end extrapolate_times_for_current_events(analyzer, times_for_target_class) if current_events hash_partitions(times_for_target_class) end
Private Instance Methods
each_duration(test_time_map) { |duration_in_milliseconds| ... }
click to toggle source
# File lib/xcknife/stream_parser.rb, line 208 def each_duration(test_time_map) test_time_map.each do |_test_target, class_times| class_times.each do |_class_name, duration_in_milliseconds| yield(duration_in_milliseconds) end end end
extrapolate_times_for_current_events(analyzer, times_for_target_class)
click to toggle source
rubocop:disable Metrics/CyclomaticComplexity
# File lib/xcknife/stream_parser.rb, line 217 def extrapolate_times_for_current_events(analyzer, times_for_target_class) median_map = {} times_for_target_class.each do |test_target, class_times| median_map[test_target] = median(class_times.values) end all_times_for_all_classes = times_for_target_class.values.flat_map(&:values) median_of_targets = median(all_times_for_all_classes) analyzer.target_class_map.each do |test_target, class_set| if times_for_target_class.key?(test_target) class_set.each do |clazz| next if times_for_target_class[test_target].key?(clazz) inc_stat :class_extrapolations @on_extrapolation&.call(test_target: test_target, test_class: clazz) times_for_target_class[test_target][clazz] = median_map[test_target] end else inc_stat :target_extrapolations @on_extrapolation&.call(test_target: test_target, test_class: nil) class_set.each do |clazz| inc_stat :class_extrapolations @on_extrapolation&.call(test_target: test_target, test_class: clazz) times_for_target_class[test_target][clazz] = extrapolated_duration(median_of_targets, class_set) end end end end
extrapolated_duration(median_of_targets, class_set)
click to toggle source
# File lib/xcknife/stream_parser.rb, line 247 def extrapolated_duration(median_of_targets, class_set) return DEFAULT_EXTRAPOLATED_DURATION if median_of_targets.nil? median_of_targets / class_set.size end
hash_partitions(times)
click to toggle source
# File lib/xcknife/stream_parser.rb, line 257 def hash_partitions(times) ret = Array.new(test_partitions.size) { {} } times.each do |test_target, times_map| test_partitions.each_with_index do |partition, i| ret[i][test_target] = times_map if partition.include?(test_target) end end ret.each_with_index do |partition, index| raise XCKnife::XCKnifeError, "The following partition has no tests: #{test_partitions[index].to_a.inspect}" if partition.empty? end end
inc_stat(name)
click to toggle source
# File lib/xcknife/stream_parser.rb, line 204 def inc_stat(name) @stats[name] += 1 end
median(array)
click to toggle source
# File lib/xcknife/stream_parser.rb, line 253 def median(array) array.sort[array.size / 2] end