class Thread::Pool

A pool is a container of a limited amount of threads to which you can add tasks to run.

This is usually more performant and less memory intensive than creating a new thread for every task.

Attributes

abort_on_exception[RW]

If true, tasks will allow raised exceptions to pass through.

Similar to Thread.abort_on_exception

max[R]
min[R]
spawned[R]
waiting[R]

Public Class Methods

new(min, max = nil, &block) click to toggle source

Create the pool with minimum and maximum threads.

The pool will start with the minimum amount of threads created and will spawn new threads until the max is reached in case of need.

A default block can be passed, which will be used to {#process} the passed data.

# File lib/thread/pool.rb, line 124
    def initialize(min, max = nil, &block)
            @min   = min
            @max   = max || min
            @block = block

            @cond  = ConditionVariable.new
            @mutex = Mutex.new

            @done       = ConditionVariable.new
            @done_mutex = Mutex.new

            @todo     = []
            @workers  = []
            @timeouts = {}

            @spawned       = 0
            @waiting       = 0
            @shutdown      = false
            @trim_requests = 0
            @auto_trim     = false
            @idle_trim     = nil

@mutex.synchronize {
  min.times.with_index { |index|
    begin
      spawn_thread
    rescue
      puts "can't create more than #{index} threads"
      break
    end
  }
}
    end

Public Instance Methods

<<(*args, &block)
Alias for: process
auto_trim!() click to toggle source

Enable auto trimming, unneeded threads will be deleted until the minimum is reached.

# File lib/thread/pool.rb, line 170
def auto_trim!
        @auto_trim = true

        self
end
auto_trim?() click to toggle source

Check if auto trimming is enabled.

# File lib/thread/pool.rb, line 164
def auto_trim?
        @auto_trim
end
backlog() click to toggle source

Get the amount of tasks that still have to be run.

# File lib/thread/pool.rb, line 217
def backlog
        @mutex.synchronize {
                @todo.length
        }
end
done?() click to toggle source

Are all tasks consumed?

# File lib/thread/pool.rb, line 224
def done?
        @mutex.synchronize {
                _done?
        }
end
future(&block) click to toggle source
# File lib/thread/future.rb, line 161
def future(&block)
        Thread.future self, &block
end
idle?() click to toggle source

Check if there are idle workers.

# File lib/thread/pool.rb, line 256
def idle?
        @mutex.synchronize {
                _idle?
        }
end
idle_trim!(timeout) click to toggle source

Enable idle trimming. Unneeded threads will be deleted after the given number of seconds of inactivity. The minimum number of threads is respeced.

# File lib/thread/pool.rb, line 190
def idle_trim!(timeout)
        @idle_trim = timeout

        self
end
idle_trim?() click to toggle source

Check if idle trimming is enabled.

# File lib/thread/pool.rb, line 184
def idle_trim?
        !@idle_trim.nil?
end
no_auto_trim!() click to toggle source

Disable auto trimming.

# File lib/thread/pool.rb, line 177
def no_auto_trim!
        @auto_trim = false

        self
end
no_idle_trim!() click to toggle source

Turn of idle trimming.

# File lib/thread/pool.rb, line 197
def no_idle_trim!
        @idle_trim = nil

        self
end
process(*args, &block) click to toggle source

Add a task to the pool which will execute the block with the given argument.

If no block is passed the default block will be used if present, an ArgumentError will be raised otherwise.

# File lib/thread/pool.rb, line 267
def process(*args, &block)
        unless block || @block
                raise ArgumentError, 'you must pass a block'
        end

        task = Task.new(self, *args, &(block || @block))

        @mutex.synchronize {
                raise 'unable to add work while shutting down' if shutdown?

                @todo << task

                if @waiting == 0 && @spawned < @max
                        spawn_thread
                end

                @cond.signal
        }

        task
end
Also aliased as: <<
resize(min, max = nil) click to toggle source

Resize the pool with the passed arguments.

# File lib/thread/pool.rb, line 209
def resize(min, max = nil)
        @min = min
        @max = max || min

        trim!
end
shutdown() click to toggle source

Shut down the pool, it will block until all tasks have finished running.

# File lib/thread/pool.rb, line 322
def shutdown
        @mutex.synchronize {
                @shutdown = :nicely
                @cond.broadcast
        }

        until @workers.empty?
                if worker = @workers.first
                        worker.join
                end
        end

        if @timeout
                @shutdown = :now

                wake_up_timeout

                @timeout.join
        end
end
shutdown!() click to toggle source

Shut down the pool instantly without finishing to execute tasks.

# File lib/thread/pool.rb, line 310
def shutdown!
        @mutex.synchronize {
                @shutdown = :now
                @cond.broadcast
        }

        wake_up_timeout

        self
end
shutdown?() click to toggle source

Check if the pool has been shut down.

# File lib/thread/pool.rb, line 159
def shutdown?
        !!@shutdown
end
shutdown_after(timeout) click to toggle source

Shutdown the pool after a given amount of time.

# File lib/thread/pool.rb, line 344
def shutdown_after(timeout)
        Thread.new {
                sleep timeout

                shutdown
        }
end
size() click to toggle source

Returns the actual amount of workers

# File lib/thread/pool.rb, line 204
def size
  @workers.size
end
trim(force = false) click to toggle source

Trim the unused threads, if forced threads will be trimmed even if there are tasks waiting.

# File lib/thread/pool.rb, line 293
def trim(force = false)
        @mutex.synchronize {
                if (force || @waiting > 0) && @spawned - @trim_requests > @min
                        @trim_requests += 1
                        @cond.signal
                end
        }

        self
end
trim!() click to toggle source

Force #{trim}.

# File lib/thread/pool.rb, line 305
def trim!
        trim true
end
wait(what = :idle) click to toggle source

Wait until all tasks are consumed. The caller will be blocked until then.

# File lib/thread/pool.rb, line 231
def wait(what = :idle)
        case what
        when :done
                until done?
                        @done_mutex.synchronize {
                                break if _done?

                                @done.wait @done_mutex
                        }
                end

        when :idle
                until idle?
                        @done_mutex.synchronize {
                                break if _idle?

                                @done.wait @done_mutex
                        }
                end
        end

        self
end

Private Instance Methods

_done?() click to toggle source
# File lib/thread/pool.rb, line 468
def _done?
        @todo.empty? and @waiting == @spawned
end
_idle?() click to toggle source
# File lib/thread/pool.rb, line 472
def _idle?
        @todo.length < @waiting
end
done!() click to toggle source
# File lib/thread/pool.rb, line 476
def done!
        @done_mutex.synchronize {
                @done.broadcast if _done? or _idle?
        }
end
spawn_thread() click to toggle source
# File lib/thread/pool.rb, line 378
def spawn_thread
        @spawned += 1

        thread = Thread.new {
                loop do
                        task = @mutex.synchronize {
                                if @todo.empty?
                                        while @todo.empty?
                                                if @trim_requests > 0
                                                        @trim_requests -= 1

                                                        break
                                                end

                                                break if shutdown?

                                                @waiting += 1

                                                done!

                                                if @idle_trim and @spawned > @min
                                                        check_time = Time.now + @idle_trim
                                                        @cond.wait @mutex, @idle_trim
                                                        @trim_requests += 1 if Time.now >= check_time && @spawned - @trim_requests > @min
                                                else
                                                        @cond.wait @mutex
                                                end

                                                @waiting -= 1
                                        end

                                        break if @todo.empty? && shutdown?
                                end

                                @todo.shift
                        } or break

                        task.execute

                        break if @shutdown == :now

                        trim if auto_trim? && @spawned > @min
                end

                @mutex.synchronize {
                        @spawned -= 1
                        @workers.delete thread
                }
        }

        @workers << thread

        thread
end
spawn_timeout_thread() click to toggle source
# File lib/thread/pool.rb, line 433
def spawn_timeout_thread
        @pipes   = IO.pipe
        @timeout = Thread.new {
                loop do
                        now     = Time.now
                        timeout = @timeouts.map {|task, time|
                                next unless task.started_at

                                now - task.started_at + task.timeout
                        }.compact.min unless @timeouts.empty?

                        readable, = IO.select([@pipes.first], nil, nil, timeout)

                        break if @shutdown == :now

                        if readable && !readable.empty?
                                readable.first.read_nonblock 1024
                        end

                        now = Time.now
                        @timeouts.each {|task, time|
                                next if !task.started_at || task.terminated? || task.finished?

                                if now > task.started_at + task.timeout
                                        task.timeout!
                                end
                        }

                        @timeouts.reject! { |task, _| task.terminated? || task.finished? }

                        break if @shutdown == :now
                end
        }
end
timeout_for(task, timeout) click to toggle source
# File lib/thread/pool.rb, line 360
def timeout_for(task, timeout)
        unless @timeout
                spawn_timeout_thread
        end

        @mutex.synchronize {
                @timeouts[task] = timeout

                wake_up_timeout
        }
end
wake_up_timeout() click to toggle source
# File lib/thread/pool.rb, line 372
def wake_up_timeout
        if defined? @pipes
                @pipes.last.write_nonblock 'x' rescue nil
        end
end