class ReadyPool

thread pool implementation

In the example 10 threads are prepared to run the Proc passed as the second argument. If more are needed they are spun up when required. When a thread completes its task it returns itself to the pool.

The start method passes its argument to the thread Proc.

Example

require 'ready_pool'
require 'async_emitter'

emitter = AsyncEmitter.new

emitter.on :data, lambda { |data| puts "emitted #{data}" }

rp = ReadyPool.new 10, Proc.new { |data| emitter.emit :data, data }

20.times do |i|
        rp.start i
end

gets #wait for user input

rp.kill_all

Public Class Methods

new(num_threads, procedure) click to toggle source

ReadyPool constructor

@param num_threads [FixedNum] initial number of threads @param procedure [Proc] called when start method is called

# File lib/ready_pool.rb, line 41
def initialize (num_threads, procedure)
        @procedure = procedure
        @pool_semaphore = Mutex.new
        @pool_condition = ConditionVariable.new
        @pool = []

        @pool_semaphore.synchronize do
                num_threads.times do |i|
                        @pool[i] = new_thread
                        @pool[i][:ready] = false
                        
                        @pool[i][:thread] = Thread.new do 
                                thread_proc @pool[i]
                        end

                        @pool_condition.wait @pool_semaphore

                end
        end
end

Public Instance Methods

kill_all() click to toggle source

kills all threads

# File lib/ready_pool.rb, line 94
def kill_all
        @pool.each do |th|
                Thread.kill th[:thread]
        end
        @pool = []
end
start(data) click to toggle source

starts the thread

@param data [Object] data passed to the thread Proc @return [Thread]

# File lib/ready_pool.rb, line 68
def start data
        th = nil
        @pool_semaphore.synchronize do
                th = @pool.shift
        end

        if th == nil
                th = new_thread
                th[:ready] = false
                th[:thread] = Thread.new do
                        thread_proc th
                end
                        
                @pool_semaphore.synchronize do
                        @pool_condition.wait @pool_semaphore
                end
        end

        th[:data] = data
        signal_thread th
        return th[:thread]
end

Protected Instance Methods

new_thread() click to toggle source
# File lib/ready_pool.rb, line 120
def new_thread
        th = {}
        th[:semaphore] = Mutex.new
        th[:cv] = ConditionVariable.new
        return th
end
signal_thread(th) click to toggle source
# File lib/ready_pool.rb, line 127
def signal_thread (th)
        th[:semaphore].synchronize do
                th[:cv].signal
        end
end
thread_proc(th) click to toggle source
# File lib/ready_pool.rb, line 102
def thread_proc (th)
        while true
                th[:semaphore].synchronize do

                        @pool_semaphore.synchronize do
                                @pool_condition.signal
                        end

                        th[:cv].wait th[:semaphore]
                        @procedure.call th[:data]
                        @pool_semaphore.synchronize do
                                @pool.push th
                        end 
                        
                end
        end
end