class PeerCommander::ParallelExecutor
Executes a given set of commands with a specified parallelism
Constants
- SLEEP_DURATION
Attributes
command_results[R]
future_command_map[R]
futures[R]
parallelism[R]
Public Class Methods
new()
click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 8 def initialize @futures = [] @command_results = [] end
Public Instance Methods
execute(commands, parallelism)
click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 13 def execute(commands, parallelism) raise ArgumentError, "Parallelism must be at least 1" if parallelism < 1 @parallelism = parallelism commands.each do |command| wait_for_slot if all_slots_filled? @futures << Concurrent::Future.execute { command.execute } end wait_for_all command_results end
Private Instance Methods
all_slots_filled?()
click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 55 def all_slots_filled? !slot_available? end
extract_results_from(futures_to_extract)
click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 59 def extract_results_from(futures_to_extract) futures_to_extract.map(&:value) end
remove_completed_commands()
click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 45 def remove_completed_commands completed = futures.select(&:complete?) @futures -= completed @command_results.push(*extract_results_from(completed)) end
slot_available?()
click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 51 def slot_available? futures.size < parallelism end
wait_for_all()
click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 40 def wait_for_all sleep(SLEEP_DURATION) while futures.any?(&:incomplete?) @command_results.push(*extract_results_from(futures)) end
wait_for_slot()
click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 33 def wait_for_slot while all_slots_filled? sleep(SLEEP_DURATION) remove_completed_commands end end