class PeerCommander::ParallelExecutor

Executes a given set of commands with a specified parallelism

Constants

SLEEP_DURATION

Attributes

command_results[R]
future_command_map[R]
futures[R]
parallelism[R]

Public Class Methods

new() click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 8
def initialize
  @futures = []
  @command_results = []
end

Public Instance Methods

execute(commands, parallelism) click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 13
def execute(commands, parallelism)
  raise ArgumentError, "Parallelism must be at least 1" if parallelism < 1

  @parallelism = parallelism

  commands.each do |command|
    wait_for_slot if all_slots_filled?

    @futures << Concurrent::Future.execute { command.execute }
  end

  wait_for_all

  command_results
end

Private Instance Methods

all_slots_filled?() click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 55
def all_slots_filled?
  !slot_available?
end
extract_results_from(futures_to_extract) click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 59
def extract_results_from(futures_to_extract)
  futures_to_extract.map(&:value)
end
remove_completed_commands() click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 45
def remove_completed_commands
  completed = futures.select(&:complete?)
  @futures -= completed
  @command_results.push(*extract_results_from(completed))
end
slot_available?() click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 51
def slot_available?
  futures.size < parallelism
end
wait_for_all() click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 40
def wait_for_all
  sleep(SLEEP_DURATION) while futures.any?(&:incomplete?)
  @command_results.push(*extract_results_from(futures))
end
wait_for_slot() click to toggle source
# File lib/peer_commander/parallel_executor.rb, line 33
def wait_for_slot
  while all_slots_filled?
    sleep(SLEEP_DURATION)
    remove_completed_commands
  end
end