class Dissever::Processor
Processor
object, for running multiprocessed tasks
Public Class Methods
new(params = {}, &block)
click to toggle source
# File lib/dissever/processor.rb, line 7 def initialize(params = {}, &block) @options = params @options[:size] ||= 10 @targets = block.call end
Public Instance Methods
run!()
click to toggle source
# File lib/dissever/processor.rb, line 13 def run! log '*' * @targets.size readers = fork_master log parse_readers(readers) end
Private Instance Methods
check_files(files)
click to toggle source
# File lib/dissever/processor.rb, line 63 def check_files(files) files, bad_apples = files.partition { |_, file| File.exist?(file) } if bad_apples.nil? bad_apples.each { |name, _| error "Failed to load results for #{name}" } end files end
create_file(results)
click to toggle source
# File lib/dissever/processor.rb, line 47 def create_file(results) tempfile = Tempfile.new('dissever') ObjectSpace.undefine_finalizer(tempfile) tempfile << JSON.dump(results) tempfile.close tempfile.path end
error(msg)
click to toggle source
# File lib/dissever/processor.rb, line 75 def error(msg) STDERR.puts msg end
fork_master()
click to toggle source
# File lib/dissever/processor.rb, line 22 def fork_master @targets.each_slice(@options[:size]).flat_map { |slice| fork_pool(slice) } end
fork_pool(slice)
click to toggle source
# File lib/dissever/processor.rb, line 26 def fork_pool(slice) res = slice.map do |name, block| reader, writer = IO.pipe fork { thread_run(reader, writer, name, &block) } writer.close [name, reader] end Process.waitall res end
log(msg = nil, newline = true)
click to toggle source
# File lib/dissever/processor.rb, line 71 def log(msg = nil, newline = true) STDERR.print("#{msg}#{"\n" if newline}") unless @options[:quiet] end
parse_readers(readers)
click to toggle source
# File lib/dissever/processor.rb, line 55 def parse_readers(readers) files = readers.map { |name, reader| [name, reader.read] } files = check_files(files) results = files.map { |name, file| [name, JSON.parse(File.read(file))] } files.each { |_, file| File.unlink file } Hash[results] end
thread_run(reader, writer, name, &block)
click to toggle source
# File lib/dissever/processor.rb, line 37 def thread_run(reader, writer, name, &block) $PROGRAM_NAME = "#{$PROGRAM_NAME} thread #{name}" reader.close writer << create_file(block.call) writer.close log '*', false rescue StandardError => e error "#{target}: #{e.message}" end