class PCOWS
Constants
- RUNNINGEXT
Public Class Methods
new(num_threads,chunk_size,name=File.basename(__FILE__),timeout=180,quiet=false,debug=false)
click to toggle source
# File lib/bio-vcf/pcows.rb, line 9 def initialize(num_threads,chunk_size,name=File.basename(__FILE__),timeout=180,quiet=false,debug=false) num_threads = cpu_count() if not num_threads # FIXME: set to cpu_num by default # $stderr.print "Using ",num_threads,"threads \n" @num_threads = num_threads @chunk_size = chunk_size @pid_list = [] @name = name @timeout = timeout @quiet = quiet @debug = debug if @debug $stderr.print "PCOWS running in DEBUG MODE\n" end if multi_threaded @tmpdir = Dir::mktmpdir(@name+'_') end @last_output = 0 # counter @output_locked = false end
Public Instance Methods
cleanup()
click to toggle source
# File lib/bio-vcf/pcows.rb, line 217 def cleanup() @pid_list.each do |info| (pid,count,fn) = info if pid_running?(pid) $stderr.print "Killing child ",[info],"\n" begin Process.kill 9, pid Process.wait pid rescue Errno::ENOENT $stdout.puts "INFO: #{pidfile} did not exist: Errno::ENOENT" if not @quiet rescue Errno::ESRCH $stdout.puts "INFO: The process #{opid} did not exist: Errno::ESRCH" if not @quiet end end File.unlink(fn) if File.exist?(fn) cleanup_keep_file(fn,wait: false) tempfn = fn+'.'+RUNNINGEXT File.unlink(tempfn) if File.exist?(tempfn) end cleanup_tmpdir() end
process_output(func=nil,type=:by_line, blocking=false)
click to toggle source
—- In this section the output gets collected and passed on to a
printer thread. This function makes sure the printing is ordered and that no printers are running at the same time. The printer thread should be doing as little processing as possible. In this implementation type==:by_line will call func for each line. Otherwise it is called once with the filename.
# File lib/bio-vcf/pcows.rb, line 105 def process_output(func=nil,type=:by_line, blocking=false) return if single_threaded output = lambda { |fn| if type == :by_line File.new(fn).each_line { |buf| print buf } else func.call(fn) end } if @output_locked # ---- is the other thread still running? We wait until it # is finished to start the next one (pid,count,fn) = @output_locked $stderr.print "Checking for output_lock on existing #{fn}\n" if not @quiet return if File.exist?(fn) # continue because thread still processing # Now we should remove the .keep file cleanup_keep_file(fn) @last_output += 1 # get next one in line @output_locked = false end # ---- process the next output chunk. After completion it # gets renamed to chunk.keep. This to avoid missing # output (if we unlink the file prematurely) if info = @pid_list[@last_output] (pid,count,fn) = info $stderr.print "Testing (#{@last_output}) for output file ",[info],"\n" if @debug if File.exist?(fn) # Yes! We have the next output, create outputter @output_locked = info $stderr.print "Set lock on ",[info],"\n" if not @quiet if not blocking $stderr.print "Processing output file #{fn} (non-blocking)\n" if not @quiet pid = fork do output.call(fn) # after finishing output move it to .keep FileUtils::mv(fn,fn+'.keep') exit(0) end Process.detach(pid) else $stderr.print "Processing output file #{fn} (blocking)\n" if not @quiet output.call(fn) FileUtils::mv(fn,fn+'.keep') end else sleep 0.2 end end end
process_remaining_output()
click to toggle source
# File lib/bio-vcf/pcows.rb, line 195 def process_remaining_output() return if single_threaded $stderr.print "Processing remaining output...\n" if not @quiet while @output_locked sleep 0.2 process_output() # keep trying end @pid_list.each do |info| (pid,count,fn) = info while pid_or_file_running?(pid,fn) or File.exist?(fn) $stderr.print "Trying: ",[info],"\n" if not @quiet process_output(nil,:by_line,true) sleep 0.2 end end while @output_locked sleep 0.1 process_output(nil,:by_line,true) end cleanup_tmpdir() end
submit_final_worker(func,state)
click to toggle source
# File lib/bio-vcf/pcows.rb, line 64 def submit_final_worker(func,state) @final_worker = true submit_worker(func,state) end
submit_worker(func,state)
click to toggle source
Feed the worker 'func and state' to COWS. Note that func is a lambda closure so it can pick up surrounding scope at invocation in addition to the data captured in 'state'.
# File lib/bio-vcf/pcows.rb, line 33 def submit_worker(func,state) pid = nil if multi_threaded count = @pid_list.size+1 fn = mktmpfilename(count) pid = fork do # ---- This is running a new copy-on-write process tempfn = fn+'.'+RUNNINGEXT STDOUT.reopen(File.open(tempfn, 'w+')) func.call(state).each { | line | print line } STDOUT.flush STDOUT.close # sleep 0.1 # f.flush # f.close # sleep 0.2 # interval to make sure we are done writing, # otherwise there may be misses at the end of a # block (maybe the f.close fixed it) FileUtils::mv(tempfn,fn) exit(0) end Process.detach(pid) else # ---- Single threaded: call in main process and output immediately func.call(state).each { | line | print line } end @pid_list << [ pid,count,fn ] return true end
wait_for_worker(info)
click to toggle source
Wait for a worker slot to appear. When working the pid is writing a file with extension .part(ial). After completion the file is renamed without .part and a slot is free.
# File lib/bio-vcf/pcows.rb, line 160 def wait_for_worker(info) (pid,count,fn) = info if pid_or_file_running?(pid,fn) $stderr.print "Waiting up to #{@timeout} seconds for pid=#{pid} to complete #{fn}\n" if not @quiet begin Timeout.timeout(@timeout) do while not File.exist?(fn) # wait for the result to appear sleep 0.2 return if not pid_or_file_running?(pid,fn) # worker is gone end end # Partial file should have been renamed: raise "FATAL: child process #{pid} appears to have crashed #{fn}" if not File.exist?(fn) $stderr.print "OK pid=#{pid}, processing starts of #{fn}\n" if not @quiet rescue Timeout::Error # Kill it to speed up exit Process.kill 9, pid Process.wait pid $stderr.print "FATAL: child process killed because it stopped responding, pid = #{pid}, fn = #{fn}, count = #{count}\n" $stderr.print "Bailing out" raise end end end
wait_for_worker_slot()
click to toggle source
Make sure no more than num_threads are running at the same time - this is achieved by checking the PID table and the running files in the tmpdir
# File lib/bio-vcf/pcows.rb, line 73 def wait_for_worker_slot() return if single_threaded Timeout.timeout(@timeout) do printed_timeout_message = false while true # ---- count running pids running = @pid_list.reduce(0) do | sum, info | (pid,count,fn) = info if pid_or_file_running?(pid,fn) sum+1 else sum end end return if running < @num_threads if not printed_timeout_message $stderr.print "Waiting for slot (timeout=#{@timeout})\n" if not @quiet printed_timeout_message = true end sleep 0.1 end end end
wait_for_workers()
click to toggle source
This is the final cleanup after the reader thread is done. All workers need to complete.
# File lib/bio-vcf/pcows.rb, line 188 def wait_for_workers() return if single_threaded @pid_list.each do |info| wait_for_worker(info) end end
Private Instance Methods
cleanup_keep_file(fn, opts = { wait: true })
click to toggle source
# File lib/bio-vcf/pcows.rb, line 280 def cleanup_keep_file(fn, opts = { wait: true }) if not @debug keep = fn+'.keep' return if not opts[:wait] and !File.exist?(keep) $stderr.print "Trying to remove #{keep}\n" if not @quiet while true if File.exist?(keep) $stderr.print "Removing #{keep}\n" if not @quiet File.unlink(keep) break # forever loop end sleep 0.1 end #forever end end
cleanup_tmpdir()
click to toggle source
# File lib/bio-vcf/pcows.rb, line 296 def cleanup_tmpdir if not @debug $stderr.print "Removing dir #{@tmpdir}\n" if not @quiet Dir.unlink(@tmpdir) if @tmpdir end end
cpu_count()
click to toggle source
# File lib/bio-vcf/pcows.rb, line 267 def cpu_count begin return File.read('/proc/cpuinfo').scan(/^processor\s*:/).size if File.exist? '/proc/cpuinfo' # Actually, the JVM does not allow fork... return Java::Java.lang.Runtime.getRuntime.availableProcessors if defined? Java::Java rescue LoadError # Count on MAC return Integer `sysctl -n hw.ncpu 2>/dev/null` end $stderr.print "Could not determine number of CPUs" if not @quiet 1 end
mktmpfilename(num,ext=nil)
click to toggle source
# File lib/bio-vcf/pcows.rb, line 241 def mktmpfilename(num,ext=nil) @tmpdir+sprintf("/%0.6d-",num)+@name+(ext ? '.'+ext : '') end
multi_threaded()
click to toggle source
# File lib/bio-vcf/pcows.rb, line 263 def multi_threaded @num_threads > 1 end
pid_or_file_running?(pid,fn)
click to toggle source
# File lib/bio-vcf/pcows.rb, line 245 def pid_or_file_running?(pid,fn) (pid && pid_running?(pid)) or File.exist?(fn+'.'+RUNNINGEXT) end
pid_running?(pid)
click to toggle source
# File lib/bio-vcf/pcows.rb, line 249 def pid_running?(pid) begin fpid,status=Process.waitpid2(pid,Process::WNOHANG) rescue Errno::ECHILD, Errno::ESRCH return false end return true if nil == fpid && nil == status return ! (status.exited? || status.signaled?) end
single_threaded()
click to toggle source
# File lib/bio-vcf/pcows.rb, line 259 def single_threaded @num_threads == 1 end