class Rasteira::EmbedWorker::Manager
Manager
class that manages the thread pool and executes jobs.
Constants
- MIN_THREAD_SIZE
TODO: to be specifiable
Attributes
Public Class Methods
# File lib/rasteira/embed_worker/manager.rb, line 9 def initialize @job_pool = [] @thread_pool = [] @mutex = Mutex.new end
Create manager instance and run @return [Rasteira::EmbedWorker::Manager] an instance of a running manager
# File lib/rasteira/embed_worker/manager.rb, line 17 def self.run this = new this.run this end
Public Instance Methods
Enqueue new job @param [String] worker_name the name of worker class @param [Hash] options @option options [String] :worker_file_path the file path of worker script @option options [String] :current_directory the directory where you want to handle as a current directory @option options [Object] :args the arguments for the perform method in the worker class @return [Rasteira::Core::Job] job created job
# File lib/rasteira/embed_worker/manager.rb, line 55 def enqueue_job!(worker_name, options = {}) @mutex.synchronize do @job_pool << ::Rasteira::Core::Job.new(worker_name, options) end end
Create thread pool and start all threads @return [nil]
# File lib/rasteira/embed_worker/manager.rb, line 28 def run @thread_pool = Array.new(MIN_THREAD_SIZE).map do Thread.start { worker_thread_process } end @thread_manager = Thread.start do loop do @mutex.synchronize do @thread_pool.select!(&:alive?) (@thread_pool.size...MIN_THREAD_SIZE).each do @thread_pool << Thread.start { worker_thread_process } end end sleep(1) end end end
Kill all worker threads @return nil
# File lib/rasteira/embed_worker/manager.rb, line 63 def shutdown_workers! @thread_pool.each(&:kill) @thread_manager.kill end
Return current worker threads id and status hashes @return [Array<Hash>]
# File lib/rasteira/embed_worker/manager.rb, line 70 def worker_statuses @thread_pool.map do |thread| { id: thread.object_id, status: thread.status } end end
Private Instance Methods
# File lib/rasteira/embed_worker/manager.rb, line 78 def worker_thread_process loop do begin job = nil @mutex.synchronize do job = @job_pool.pop end if !job.nil? job.start! else sleep(3) end rescue => e # TODO: logging puts e.inspect end end end