class SimpleFuture

A container holding the (eventual) result of a forked child process once that process finishes. The child process executes the code block that must be passed to the constructor:

sf = SimpleFuture.new { do_slow_thing }
... do stuff ...
use(sf.value)

The code block must return a value that can be encoded by `Marshal` and **must not** exit prematurely.

Exceptions thrown inside the block will trigger a `SimpleFuture::ChildError` in the parent process but that exception will contain the original in its `cause` field.

Public Class Methods

all_done?() click to toggle source

Test if all instances created so far have run to completion. As a side effect, it will also call `wait` on instances whose child processes are running but have finished (i.e. their `check_if_ready` would return true.) This lets you use it as a non-blocking way to clean up the remaining children.

# File lib/simple-future.rb, line 206
def self.all_done?
  @@in_progress.select!{ |sp| !sp.check_if_ready }
  return @@in_progress.size == 0
end
max_tasks() click to toggle source

Return the maximum number of concurrent child processes allowed.

# File lib/simple-future.rb, line 190
def self.max_tasks()     return @@max_tasks; end
max_tasks=(value) click to toggle source

Set the maximum number of concurrent child processes allowed. If set to less than 1, it is interpreted as meaning no limit.

It is initially set to the number of available cores as provided by the `Etc` module.

# File lib/simple-future.rb, line 197
def self.max_tasks=(value)
  @@max_tasks = value
end
new(&action) click to toggle source

In addition to creating a new `SimpleFuture`, the constructor creates a child process and evaluates `action` in it. If the maximum number of child processes would be exceeded, it will block until a process finishes.

# File lib/simple-future.rb, line 88
def initialize(&action)
  @readPipe = nil
  @pid = nil
  @complete = false
  @result = nil

  self.class.all_done?       # Reclaim all completed children
  block_until_clear()
  launch(action)
end
wait_for_all() click to toggle source

Wait until all child processes have run to completion and recover their results. Programs should call this before exiting if there is a chance that an instance was created without having `wait` called on it.

# File lib/simple-future.rb, line 215
def self.wait_for_all
  @@in_progress.each{|sp| sp.wait}
  @@in_progress = []
  return
end

Public Instance Methods

check_if_ready() click to toggle source

Check if the child process has finished evaluating the block and has a result ready. If `check_if_ready` returns `true`, `wait` will not block when called.

Note: `check_if_ready` tests if there's data on the pipe to the child process to see if it has finished. A sufficiently evil child block might be able to cause a true result while still blocking `wait`.

Don't do that.

@return [Boolean]

# File lib/simple-future.rb, line 181
def check_if_ready
  return true if complete?
  return false unless @readPipe.ready?
  wait
  return true
end
complete?() click to toggle source

Test if the child process has finished and its result is available.

Note that this will only be true after a call to `wait` (i.e. the child process finished and its result has been retrieved.) If you want to see if the result is (probably) available, use `check_if_ready`.

# File lib/simple-future.rb, line 106
def complete?()   return @complete; end
value() click to toggle source

Return the result of the child process, blocking if it is not yet available. Blocking is done by calling `wait`, so the process will be cleaned up.

# File lib/simple-future.rb, line 111
def value
  wait
  return @result
end
wait() click to toggle source

Block until the child process finishes, recover its result and clean up the process. `wait` must be called for each `SimpleFuture` to prevent zombie processes. In practice, this is rarely a problem since `value` calls `wait` and you usually want to get all of the values. See `wait_for_all`.

It is safe to call `wait` multiple times on a `SimpleFuture`.

@raise [ChildError] The child process raised an uncaught exception. @raise [ResultTypeError] Marshal cannot encode the result @raise [Error] An error occurred in the IPC system or child process.

# File lib/simple-future.rb, line 127
def wait
  # Quit if the child has already exited
  return if complete?

  # Read the contents; this may block
  data = @readPipe.read

  # Reap the child process; this shouldn't block for long
  Process.wait(@pid)

  # And now we're complete, regardless of what happens next.  (We
  # set it early so that errors later on won't allow waiting again
  # and associated mystery errors.)
  @complete = true

  # Close and discard the pipe; we're done with it
  @readPipe.close
  @readPipe = nil

  # If the child process exited badly, this is an error
  raise Error.new("Error in child process #{@pid}!") unless
    $?.exitstatus == 0 && !data.empty?

  # Decode the result.  If it's an exception object, that's the
  # error that was thrown in the child and that means an error here
  # as well.
  rbox = Marshal.load(data)
  raise rbox if rbox.is_a? ResultTypeError
  raise ChildError.new("Child process failed with an exception.", rbox) if
    rbox.is_a? Exception

  # Ensure rbox is a ResultContainer. This *probably* can't happen.
  raise Error.new("Invalid result object type: #{rbox.class}") unless
    rbox.is_a? ResultContainer    

  # Aaaaaand, retrieve the value.
  @result = rbox.value
  
  return      # return nil
end

Private Instance Methods

block_until_clear() click to toggle source

If we're currently at maximum allowed processes, wait until the oldest of them finishes. (TO DO: if possible, make it wait until any process exits.)

# File lib/simple-future.rb, line 263
def block_until_clear
  return unless @@max_tasks > 0 && @@in_progress.size >= @@max_tasks

  @@in_progress.shift.wait()
end
launch(action) click to toggle source

Create a forked child process connected to this one with a pipe, eval `action` and return the marshalled result (or exception, in case of an error) via the pipe. Results are wrapped in a `ResultContainer` so that the parent can distinguish between exceptions and legitimately returned Exception objects.

# File lib/simple-future.rb, line 228
def launch(action)
  @readPipe, writePipe = IO.pipe
  @pid = Process.fork do
    @readPipe.close()

    result = nil
    begin
      result = ResultContainer.new( action.call() )
    rescue Exception => e
      result = e
    end

    rs = nil
    begin
      rs = Marshal.dump(result)
    rescue TypeError => e
      rv = result
      rv = rv.value if rv.class == ResultContainer
      rs = Marshal.dump(ResultTypeError.new("Type #{rv.class} " +
                                            "cannot be dumped."))
    end

    writePipe.write(rs)
    writePipe.close
    exit!(0)
  end

  writePipe.close

  @@in_progress.push self
end