class Rasteira::EmbedWorker::Manager

Manager class that manages the thread pool and executes jobs.

Constants

MIN_THREAD_SIZE

TODO: to be specifiable

Attributes

job_pool[R]

Public Class Methods

new() click to toggle source
# File lib/rasteira/embed_worker/manager.rb, line 9
def initialize
  @job_pool = []
  @thread_pool = []
  @mutex = Mutex.new
end
run() click to toggle source

Create manager instance and run @return [Rasteira::EmbedWorker::Manager] an instance of a running manager

# File lib/rasteira/embed_worker/manager.rb, line 17
def self.run
  this = new
  this.run
  this
end

Public Instance Methods

enqueue_job!(worker_name, options = {}) click to toggle source

Enqueue new job @param [String] worker_name the name of worker class @param [Hash] options @option options [String] :worker_file_path the file path of worker script @option options [String] :current_directory the directory where you want to handle as a current directory @option options [Object] :args the arguments for the perform method in the worker class @return [Rasteira::Core::Job] job created job

# File lib/rasteira/embed_worker/manager.rb, line 55
def enqueue_job!(worker_name, options = {})
  @mutex.synchronize do
    @job_pool << ::Rasteira::Core::Job.new(worker_name, options)
  end
end
run() click to toggle source

Create thread pool and start all threads @return [nil]

# File lib/rasteira/embed_worker/manager.rb, line 28
def run
  @thread_pool = Array.new(MIN_THREAD_SIZE).map do
    Thread.start { worker_thread_process }
  end

  @thread_manager = Thread.start do
    loop do
      @mutex.synchronize do
        @thread_pool.select!(&:alive?)

        (@thread_pool.size...MIN_THREAD_SIZE).each do
          @thread_pool << Thread.start { worker_thread_process }
        end
      end

      sleep(1)
    end
  end
end
shutdown_workers!() click to toggle source

Kill all worker threads @return nil

# File lib/rasteira/embed_worker/manager.rb, line 63
def shutdown_workers!
  @thread_pool.each(&:kill)
  @thread_manager.kill
end
worker_statuses() click to toggle source

Return current worker threads id and status hashes @return [Array<Hash>]

# File lib/rasteira/embed_worker/manager.rb, line 70
def worker_statuses
  @thread_pool.map do |thread|
    { id: thread.object_id, status: thread.status }
  end
end

Private Instance Methods

worker_thread_process() click to toggle source
# File lib/rasteira/embed_worker/manager.rb, line 78
def worker_thread_process
  loop do
    begin
      job = nil
      @mutex.synchronize do
        job = @job_pool.pop
      end
      if !job.nil?
        job.start!
      else
        sleep(3)
      end
    rescue => e
      # TODO: logging
      puts e.inspect
    end
  end
end