class ThreadExecutor::Executor

A threaded executor.

Procs can be given to this executor and are executed according to the availability of threads.

Use

# Make an executor.
executor = ThreadExecutor::Executor.new 10

begin

  # Dispatch 100 jobs across the 10 threads.
  futures = 100.times.map { executor.call { do_long_running_work } }

  # Collect the results.
  results = futures.map { |future| future.value }

ensure
  # Clean up the threads.
  executor.finish
end

Public Class Methods

new(size=2) click to toggle source

Build a new executor with size Processor objects. The default size is 2.

Each Processor contains a work queue and a running ruby Thread which will process the elements in the work queue.

This Executor will insert elements into the work queue.

Use of the Executor is not thread safe. If more than one thread submit work to Processor objects through this Executor, the Executor must be protected by a lock of some sort.

# File lib/thread_executor/executor.rb, line 68
def initialize size=2
  @processors = []

  size.times do
    @processors << Processor.new
  end
end

Public Instance Methods

call(&t) click to toggle source

Enqueues the block in a processor queue with the fewest tasks.

Returns a future for the result.

# File lib/thread_executor/executor.rb, line 79
def call(&t)
  min_processor = @processors[0]
  min_size = min_processor.size
  @processors.each do |p|
    min_size2 = p.size
    if min_size > min_size2
      min_processor = p
      min_size = min_size2
    end
  end

  # Forward the user's block to the processor.
  min_processor.call &t
end
finish() click to toggle source

Shutdown and join all worker threads.

# File lib/thread_executor/executor.rb, line 100
def finish
  @processors.each do |p|
    p.finish
  end
end
size() click to toggle source

Sum of all queue depths.

# File lib/thread_executor/executor.rb, line 95
def size
  @processors.reduce(0) {|x,y| x + y.size}
end