class Thread::Pool

A pool is a container of a limited amount of threads to which you can add tasks to run.

This is usually more performant and less memory intensive than creating a new thread for every task.

Attributes

abort_on_exception[RW]

If true, tasks will allow raised exceptions to pass through.

Similar to Thread.abort_on_exception

max[R]
min[R]
spawned[R]
waiting[R]

Public Class Methods

cpu_count() click to toggle source
# File lib/balotelli/thread/pool.rb, line 341
def cpu_count
  Etc.nprocessors * 2
rescue
  16
end
new(min, max = nil, &block) click to toggle source

Create the pool with minimum and maximum threads.

The pool will start with the minimum amount of threads created and will spawn new threads until the max is reached in case of need.

A default block can be passed, which will be used to {#process} the passed data.

# File lib/balotelli/thread/pool.rb, line 116
def initialize(min, max = nil, &block)
  @min   = min
  @max   = max || min
  @block = block

  @cond  = ConditionVariable.new
  @mutex = Mutex.new

  @done       = ConditionVariable.new
  @done_mutex = Mutex.new

  @todo     = []
  @workers  = []
  @timeouts = {}

  @spawned       = 0
  @waiting       = 0
  @shutdown      = false
  @trim_requests = 0
  @auto_trim     = false
  @idle_trim     = nil
  @timeout       = nil

  @mutex.synchronize {
    min.times {
      spawn_thread
    }
  }
end

Public Instance Methods

<<(*args, &block)
Alias for: process
auto_trim!() click to toggle source

Enable auto trimming, unneeded threads will be deleted until the minimum is reached.

# File lib/balotelli/thread/pool.rb, line 158
def auto_trim!
  @auto_trim = true

  self
end
auto_trim?() click to toggle source

Check if auto trimming is enabled.

# File lib/balotelli/thread/pool.rb, line 152
def auto_trim?
  @auto_trim
end
backlog() click to toggle source

Get the amount of tasks that still have to be run.

# File lib/balotelli/thread/pool.rb, line 200
def backlog
  @mutex.synchronize {
    @todo.length
  }
end
done?() click to toggle source

Are all tasks consumed?

# File lib/balotelli/thread/pool.rb, line 207
def done?
  @mutex.synchronize {
    _done?
  }
end
idle?() click to toggle source

Check if there are idle workers.

# File lib/balotelli/thread/pool.rb, line 239
def idle?
  @mutex.synchronize {
    _idle?
  }
end
idle_trim!(timeout) click to toggle source

Enable idle trimming. Unneeded threads will be deleted after the given number of seconds of inactivity. The minimum number of threads is respeced.

# File lib/balotelli/thread/pool.rb, line 178
def idle_trim!(timeout)
  @idle_trim = timeout

  self
end
idle_trim?() click to toggle source

Check if idle trimming is enabled.

# File lib/balotelli/thread/pool.rb, line 172
def idle_trim?
  !@idle_trim.nil?
end
no_auto_trim!() click to toggle source

Disable auto trimming.

# File lib/balotelli/thread/pool.rb, line 165
def no_auto_trim!
  @auto_trim = false

  self
end
no_idle_trim!() click to toggle source

Turn of idle trimming.

# File lib/balotelli/thread/pool.rb, line 185
def no_idle_trim!
  @idle_trim = nil

  self
end
process(*args, &block) click to toggle source

Add a task to the pool which will execute the block with the given argument.

If no block is passed the default block will be used if present, an ArgumentError will be raised otherwise.

# File lib/balotelli/thread/pool.rb, line 250
def process(*args, &block)
  unless block || @block
    raise ArgumentError, 'you must pass a block'
  end

  task = Task.new(self, *args, &(block || @block))

  @mutex.synchronize {
    raise 'unable to add work while shutting down' if shutdown?

    @todo << task

    if @waiting == 0 && @spawned < @max
      spawn_thread
    end

    @cond.signal
  }

  task
end
Also aliased as: <<
resize(min, max = nil) click to toggle source

Resize the pool with the passed arguments.

# File lib/balotelli/thread/pool.rb, line 192
def resize(min, max = nil)
  @min = min
  @max = max || min

  trim!
end
shutdown() click to toggle source

Shut down the pool, it will block until all tasks have finished running.

# File lib/balotelli/thread/pool.rb, line 305
def shutdown
  @mutex.synchronize {
    @shutdown = :nicely
    @cond.broadcast
  }

  until @workers.empty?
    if worker = @workers.first
      worker.join
    end
  end

  if @timeout
    @shutdown = :now

    wake_up_timeout

    @timeout.join
  end
end
shutdown!() click to toggle source

Shut down the pool instantly without finishing to execute tasks.

# File lib/balotelli/thread/pool.rb, line 293
def shutdown!
  @mutex.synchronize {
    @shutdown = :now
    @cond.broadcast
  }

  wake_up_timeout

  self
end
shutdown?() click to toggle source

Check if the pool has been shut down.

# File lib/balotelli/thread/pool.rb, line 147
def shutdown?
  !!@shutdown
end
shutdown_after(timeout) click to toggle source

Shutdown the pool after a given amount of time.

# File lib/balotelli/thread/pool.rb, line 327
def shutdown_after(timeout)
  Thread.new {
    sleep timeout

    shutdown
  }
end
trim(force = false) click to toggle source

Trim the unused threads, if forced threads will be trimmed even if there are tasks waiting.

# File lib/balotelli/thread/pool.rb, line 276
def trim(force = false)
  @mutex.synchronize {
    if (force || @waiting > 0) && @spawned - @trim_requests > @min
      @trim_requests += 1
      @cond.signal
    end
  }

  self
end
trim!() click to toggle source

Force #{trim}.

# File lib/balotelli/thread/pool.rb, line 288
def trim!
  trim true
end
wait(what = :idle) click to toggle source

Wait until all tasks are consumed. The caller will be blocked until then.

# File lib/balotelli/thread/pool.rb, line 214
def wait(what = :idle)
  case what
  when :done
    until done?
      @done_mutex.synchronize {
        break if _done?

        @done.wait @done_mutex
      }
    end

  when :idle
    until idle?
      @done_mutex.synchronize {
        break if _idle?

        @done.wait @done_mutex
      }
    end
  end

  self
end

Private Instance Methods

_done?() click to toggle source
# File lib/balotelli/thread/pool.rb, line 457
def _done?
  @todo.empty? and @waiting == @spawned
end
_idle?() click to toggle source
# File lib/balotelli/thread/pool.rb, line 461
def _idle?
  @todo.length < @waiting
end
done!() click to toggle source
# File lib/balotelli/thread/pool.rb, line 465
def done!
  @done_mutex.synchronize {
    @done.broadcast if _done? or _idle?
  }
end
spawn_thread() click to toggle source
# File lib/balotelli/thread/pool.rb, line 367
def spawn_thread
  @spawned += 1

  thread = Thread.new {
    loop do
      task = @mutex.synchronize {
        if @todo.empty?
          while @todo.empty?
            if @trim_requests > 0
              @trim_requests -= 1

              break
            end

            break if shutdown?

            @waiting += 1

            done!

            if @idle_trim and @spawned > @min
              check_time = Time.now + @idle_trim
              @cond.wait @mutex, @idle_trim
              @trim_requests += 1 if Time.now >= check_time && @spawned - @trim_requests > @min
            else
              @cond.wait @mutex
            end

            @waiting -= 1
          end

          break if @todo.empty? && shutdown?
        end

        @todo.shift
      } or break

      task.execute

      break if @shutdown == :now

      trim if auto_trim? && @spawned > @min
    end

    @mutex.synchronize {
      @spawned -= 1
      @workers.delete thread
    }
  }

  @workers << thread

  thread
end
spawn_timeout_thread() click to toggle source
# File lib/balotelli/thread/pool.rb, line 422
def spawn_timeout_thread
  @pipes   = IO.pipe
  @timeout = Thread.new {
    loop do
      now     = Time.now
      timeout = @timeouts.map {|task, time|
        next unless task.started_at

        now - task.started_at + task.timeout
      }.compact.min unless @timeouts.empty?

      readable, = IO.select([@pipes.first], nil, nil, timeout)

      break if @shutdown == :now

      if readable && !readable.empty?
        readable.first.read_nonblock 1024
      end

      now = Time.now
      @timeouts.each {|task, time|
        next if !task.started_at || task.terminated? || task.finished?

        if now > task.started_at + task.timeout
          task.timeout!
        end
      }

      @timeouts.reject! { |task, _| task.terminated? || task.finished? }

      break if @shutdown == :now
    end
  }
end
timeout_for(task, timeout) click to toggle source
# File lib/balotelli/thread/pool.rb, line 349
def timeout_for(task, timeout)
  unless @timeout
    spawn_timeout_thread
  end

  @mutex.synchronize {
    @timeouts[task] = timeout

    wake_up_timeout
  }
end
wake_up_timeout() click to toggle source
# File lib/balotelli/thread/pool.rb, line 361
def wake_up_timeout
  if defined? @pipes
    @pipes.last.write_nonblock 'x' rescue nil
  end
end