class ThreadJob::ThreadPool

Public Class Methods

new(max_size=5, logger=Logger.new(STDOUT)) click to toggle source
# File lib/thread_job/thread_pool.rb, line 6
def initialize(max_size=5, logger=Logger.new(STDOUT))
  @queue = Queue.new
  @logger = logger
  @avail_pool = max_size.times.map do
    Thread.new do
      @logger.debug("[ThreadPool] started thread #{Thread.current}")
      while true
        monitor_queue
      end
    end
  end
  @use_pool = []
  @mutex = Mutex.new
end

Public Instance Methods

add_workers(num_workers) click to toggle source
# File lib/thread_job/thread_pool.rb, line 28
def add_workers(num_workers)
  num_workers.times do
    thread = Thread.new do
      @logger.debug("[ThreadPool] started thread #{Thread.current}")
      while true
        monitor_queue
      end
    end
    @avail_pool.push(thread)
  end
end
has_available_thread?() click to toggle source
# File lib/thread_job/thread_pool.rb, line 21
def has_available_thread?
  @mutex.synchronize {
    @logger.debug("[ThreadPool] #{@avail_pool.length} threads available, #{@use_pool.length} threads in use")
    return @avail_pool.length > 0
  }
end
kill() click to toggle source
# File lib/thread_job/thread_pool.rb, line 40
def kill
  @avail_pool.each do |avail_thread|
    avail_thread.kill
  end

  @use_pool.each do |used_thread|
    used_thread.kill
  end
end
monitor_queue() click to toggle source
# File lib/thread_job/thread_pool.rb, line 50
def monitor_queue
  work = @queue.pop
  if work
    @mutex.synchronize {
      @use_pool.push(Thread.current)
      @avail_pool.delete(Thread.current)
    }

    @logger.debug("[ThreadPool] Running job '#{work[:job_name]}' on thread #{Thread.current}")
    begin
      work[:job].run
    rescue => e
      @logger.error("[ThreadPool] Worker thread #{Thread.current} encountered an error #{e} while processing job '#{work[:job_name]}'")
      work[:job_store].fail_job(work[:queue_name], work[:id])
      @mutex.synchronize {
        @avail_pool.push(Thread.current)
        @use_pool.delete(Thread.current)
      }
      return
    end
    work[:job_store].complete_job(work[:queue_name], work[:id])

    @mutex.synchronize {
      @avail_pool.push(Thread.current)
      @use_pool.delete(Thread.current)
    }
  end
end
run(job_hash) click to toggle source
# File lib/thread_job/thread_pool.rb, line 79
def run(job_hash)
  @queue.push(job_hash)
end