class Thimble::Manager

Attributes

batch_size[R]
max_workers[R]
queue_size[R]
worker_type[R]

Public Class Methods

deterministic() click to toggle source
# File lib/Manager.rb, line 92
def self.deterministic
  self.new(max_workers: 1, batch_size: 1, queue_size: 1)
end
new(max_workers: 6,batch_size: 1000, queue_size: 1000, worker_type: :fork) click to toggle source
# File lib/Manager.rb, line 6
def initialize(max_workers: 6,batch_size: 1000, queue_size: 1000, worker_type: :fork)
  raise ArgumentError.new ("worker type must be either :fork or :thread") unless worker_type == :thread || worker_type == :fork
  raise ArgumentError.new ("Your system does not respond to fork please use threads.") unless worker_type == :thread || Process.respond_to?(:fork)
  raise ArgumentError.new ("max_workers must be greater than 0") if max_workers < 1
  raise ArgumentError.new ("batch size must be greater than 0") if batch_size < 1
  @worker_type = worker_type
  @max_workers = max_workers
  @batch_size = batch_size
  @queue_size = queue_size
  @mutex = Mutex.new
  @current_workers = {}
end
small() click to toggle source
# File lib/Manager.rb, line 96
def self.small
  self.new(max_workers: 1, batch_size: 3, queue_size: 3)
end

Public Instance Methods

current_workers(id) click to toggle source
# File lib/Manager.rb, line 43
def current_workers(id)
  @mutex.synchronize do
    @current_workers.select { |k,v| v.id == id }
  end
end
get_fork_worker(batch) { |item| ... } click to toggle source
# File lib/Manager.rb, line 59
def get_fork_worker(batch)
  rd, wr = IO.pipe
  tup = OpenStruct.new
  pid = fork do
    Signal.trap("HUP") {exit}
    rd.close
    t = Marshal.dump(batch.item.map do |item|
      begin
        yield (item.item)
      rescue Exception => e
        e
      end
    end)
    wr.write(t)
    wr.close
  end
  wr.close
  tup.pid = pid
  tup.reader = rd
  tup
end
get_thread_worker(batch) { |item| ... } click to toggle source
# File lib/Manager.rb, line 81
def get_thread_worker(batch)
  tup = OpenStruct.new
  tup.pid = Thread.new do
    tup.result = batch.item.map do |item|
      yield item.item
    end
    tup.done = true
  end
  tup
end
get_worker(batch) click to toggle source
# File lib/Manager.rb, line 49
def get_worker (batch)
  @mutex.synchronize do
    if @worker_type == :fork
      get_fork_worker(batch, &Proc.new)
    else
      get_thread_worker(batch, &Proc.new)
    end
  end
end
rem_worker(worker) click to toggle source
# File lib/Manager.rb, line 37
def rem_worker(worker)
  @mutex.synchronize do
    @current_workers.delete(worker.pid)
  end
end
sub_worker(worker, id) click to toggle source
# File lib/Manager.rb, line 27
def sub_worker(worker, id)
  raise "Worker must contain a pid!" if worker.pid.nil?
  new_worker = OpenStruct.new
  new_worker.worker = worker
  new_worker.id = id
  @mutex.synchronize do
    @current_workers[worker.pid] = new_worker
  end
end
worker_available?() click to toggle source
# File lib/Manager.rb, line 19
def worker_available?
  @current_workers.size < @max_workers
end
working?() click to toggle source
# File lib/Manager.rb, line 23
def working?
  @current_workers.size > 0
end