class ThreadJob::ThreadPool
Public Class Methods
new(max_size=5, logger=Logger.new(STDOUT))
click to toggle source
# File lib/thread_job/thread_pool.rb, line 6 def initialize(max_size=5, logger=Logger.new(STDOUT)) @queue = Queue.new @logger = logger @avail_pool = max_size.times.map do Thread.new do @logger.debug("[ThreadPool] started thread #{Thread.current}") while true monitor_queue end end end @use_pool = [] @mutex = Mutex.new end
Public Instance Methods
add_workers(num_workers)
click to toggle source
# File lib/thread_job/thread_pool.rb, line 28 def add_workers(num_workers) num_workers.times do thread = Thread.new do @logger.debug("[ThreadPool] started thread #{Thread.current}") while true monitor_queue end end @avail_pool.push(thread) end end
has_available_thread?()
click to toggle source
# File lib/thread_job/thread_pool.rb, line 21 def has_available_thread? @mutex.synchronize { @logger.debug("[ThreadPool] #{@avail_pool.length} threads available, #{@use_pool.length} threads in use") return @avail_pool.length > 0 } end
kill()
click to toggle source
# File lib/thread_job/thread_pool.rb, line 40 def kill @avail_pool.each do |avail_thread| avail_thread.kill end @use_pool.each do |used_thread| used_thread.kill end end
monitor_queue()
click to toggle source
# File lib/thread_job/thread_pool.rb, line 50 def monitor_queue work = @queue.pop if work @mutex.synchronize { @use_pool.push(Thread.current) @avail_pool.delete(Thread.current) } @logger.debug("[ThreadPool] Running job '#{work[:job_name]}' on thread #{Thread.current}") begin work[:job].run rescue => e @logger.error("[ThreadPool] Worker thread #{Thread.current} encountered an error #{e} while processing job '#{work[:job_name]}'") work[:job_store].fail_job(work[:queue_name], work[:id]) @mutex.synchronize { @avail_pool.push(Thread.current) @use_pool.delete(Thread.current) } return end work[:job_store].complete_job(work[:queue_name], work[:id]) @mutex.synchronize { @avail_pool.push(Thread.current) @use_pool.delete(Thread.current) } end end
run(job_hash)
click to toggle source
# File lib/thread_job/thread_pool.rb, line 79 def run(job_hash) @queue.push(job_hash) end