class Dissever::Processor

Processor object, for running multiprocessed tasks

Public Class Methods

new(params = {}, &block) click to toggle source
# File lib/dissever/processor.rb, line 7
def initialize(params = {}, &block)
  @options = params
  @options[:size] ||= 10
  @targets = block.call
end

Public Instance Methods

run!() click to toggle source
# File lib/dissever/processor.rb, line 13
def run!
  log '*' * @targets.size
  readers = fork_master
  log
  parse_readers(readers)
end

Private Instance Methods

check_files(files) click to toggle source
# File lib/dissever/processor.rb, line 63
def check_files(files)
  files, bad_apples = files.partition { |_, file| File.exist?(file) }
  if bad_apples.nil?
    bad_apples.each { |name, _| error "Failed to load results for #{name}" }
  end
  files
end
create_file(results) click to toggle source
# File lib/dissever/processor.rb, line 47
def create_file(results)
  tempfile = Tempfile.new('dissever')
  ObjectSpace.undefine_finalizer(tempfile)
  tempfile << JSON.dump(results)
  tempfile.close
  tempfile.path
end
error(msg) click to toggle source
# File lib/dissever/processor.rb, line 75
def error(msg)
  STDERR.puts msg
end
fork_master() click to toggle source
# File lib/dissever/processor.rb, line 22
def fork_master
  @targets.each_slice(@options[:size]).flat_map { |slice| fork_pool(slice) }
end
fork_pool(slice) click to toggle source
# File lib/dissever/processor.rb, line 26
def fork_pool(slice)
  res = slice.map do |name, block|
    reader, writer = IO.pipe
    fork { thread_run(reader, writer, name, &block) }
    writer.close
    [name, reader]
  end
  Process.waitall
  res
end
log(msg = nil, newline = true) click to toggle source
# File lib/dissever/processor.rb, line 71
def log(msg = nil, newline = true)
  STDERR.print("#{msg}#{"\n" if newline}") unless @options[:quiet]
end
parse_readers(readers) click to toggle source
# File lib/dissever/processor.rb, line 55
def parse_readers(readers)
  files = readers.map { |name, reader| [name, reader.read] }
  files = check_files(files)
  results = files.map { |name, file| [name, JSON.parse(File.read(file))] }
  files.each { |_, file| File.unlink file }
  Hash[results]
end
thread_run(reader, writer, name, &block) click to toggle source
# File lib/dissever/processor.rb, line 37
def thread_run(reader, writer, name, &block)
  $PROGRAM_NAME = "#{$PROGRAM_NAME} thread #{name}"
  reader.close
  writer << create_file(block.call)
  writer.close
  log '*', false
rescue StandardError => e
  error "#{target}: #{e.message}"
end