class Thimble::Thimble

Public Class Methods

async() { |e| ... } click to toggle source

Will perform anything handed to this asynchronously. Requires a block @return [Thread]

# File lib/Thimble.rb, line 60
def self.async
  Thread.new do |e|
    yield e
  end
end
new(array, manager = Manager.new, result = nil, name = "Main") click to toggle source
Calls superclass method
# File lib/Thimble.rb, line 11
def initialize(array, manager = Manager.new, result = nil, name = "Main")
  raise ArgumentError.new ("You need to pass a manager to Thimble!") unless manager.class == Manager
  raise ArgumentError.new ("There needs to be an iterable object passed to Thimble to start.") unless array.respond_to? :each
  @result = if result.nil?
    ThimbleQueue.new(array.size, "Result")
  else
    result
  end
  raise ArgumentError.new ("result needs to be an open ThimbleQueue") unless (@result.class == ThimbleQueue && !@result.closed?)
  @manager = manager
  @running = true
  super(array.size, name)
  @logger.debug("loading thimble #{name}")
  array.each {|item| push(item)}
  @logger.debug("finished loading thimble #{name}")
  close()
end

Public Instance Methods

map() click to toggle source

This will use the manager and transform your thimble queue. requires a block @return [ThimbleQueue]

# File lib/Thimble.rb, line 32
def map
  @logger.debug("starting map in #{@name} with id #{Thread.current.object_id}")
  @running = true
  while @running
    manage_workers &Proc.new
  end
  @result.close()
  @logger.debug("finishing map in #{@name} with id #{Thread.current.object_id}")
  @result
end
map_async() click to toggle source

This will use the manager and transform the thimble queue asynchronously. Will return the result instantly, so you can use it for next stage processing. requires a block @return [ThimbleQueue]

# File lib/Thimble.rb, line 47
def map_async
  @logger.debug("starting async map in #{@name} with id #{Thread.current.object_id}")
  @logger.debug("queue: #{@queue}")
  Thimble.async do
    map &Proc.new
  end
  @logger.debug("finished async map in #{@name} with id #{Thread.current.object_id}")
  @result
end

Private Instance Methods

get_batch() click to toggle source
# File lib/Thimble.rb, line 67
def get_batch
  batch = []
  while batch.size < @manager.batch_size
    item = self.next
    if item.nil?
      return nil if batch.size == 0
      return QueueItem.new(batch, "Batch")
    else
      batch << item
    end
  end
  QueueItem.new(batch, "Batch")
end
get_result(tuple) click to toggle source
# File lib/Thimble.rb, line 91
def get_result(tuple)
  if @manager.worker_type == :fork
    if tuple.reader.ready?
      piped_result = tuple.reader.read
      loadedResult = Marshal.load(piped_result)
      loadedResult.each { |r| raise r if r.class <= Exception }
      push_result(loadedResult)
      Process.kill("HUP", tuple.pid)
      @manager.rem_worker(tuple)
    end
  else
    if tuple.done == true
      push_result(tuple.result)
      @manager.rem_worker(tuple)
    end
  end
end
manage_workers() click to toggle source
# File lib/Thimble.rb, line 81
def manage_workers
  @manager.current_workers(@id).each do |pid, pair|
    get_result(pair.worker)
  end
  while (@manager.worker_available? && batch = get_batch)
    @manager.sub_worker( @manager.get_worker(batch, &Proc.new), @id)
  end
  @running = false if !@manager.working? && !batch
end
push_result(result) click to toggle source
# File lib/Thimble.rb, line 109
def push_result(result)
  if result.respond_to? :each
    result.each {|r| @result.push(r)}
  else
    @result.push(result)
  end
end