class Bio::MAF::Parser
MAF
parser, used for sequential and random-access parsing.
Options:
* `:parse_extended`: whether to parse 'i' and 'q' lines * `:parse_empty`: whether to parse 'e' lines * `:remove_gaps`: remove gaps left after filtering sequences * `:join_blocks`: join blocks where possible * `:upcase`: fold sequence data to upper case * `:chunk_size`: read MAF file in chunks of this many bytes * `:random_chunk_size`: as above, but for random access ({#fetch_blocks}) * `:merge_max`: merge up to this many bytes of blocks for random access * `:threads`: number of threads to use for parallel parsing. Only useful under JRuby. * `:strict`: abort on un-parseable lines instead of continuing with a warning.
@api public
Constants
- DEFAULT_OPTS
- MERGE_MAX
- RANDOM_CHUNK_SIZE
- SEQ_CHUNK_SIZE
- WRAP_OPTS
Attributes
@return [Boolean] whether EOF has been reached.
@return [Class] ChunkReader
class to use for random access @see ParseContext
@return [Integer] starting offset of the current chunk.
@return [Symbol] compression method used for this file, or nil
@return [ChunkReader] ChunkReader
.
@return [IO] file handle for MAF
file.
@return [String] path of MAF
file being parsed.
@return [Header] header of the MAF
file being parsed.
@return [Integer] offset of the last block start in this chunk.
@return [Hash] parser options.
@api private
May be gzip-compressed. @return [IO] file handle for physical MAF
file. @api private
@return [StringScanner] scanner for parsing.
Public Class Methods
Create a new parser instance.
@param [String] file_spec
path of file to parse. @param [Hash] parse_opts parser options. @api public
# File lib/bio/maf/parser.rb, line 587 def initialize(file_spec, parse_opts={}) opts = DEFAULT_OPTS.merge(parse_opts) @opts = opts @random_access_chunk_size = opts[:random_chunk_size] @merge_max = opts[:merge_max] @parse_extended = opts[:parse_extended] @parse_empty = opts[:parse_empty] @chunk_start = 0 if file_spec.respond_to? :flush # an IO object # guess what, Pathnames respond to :read... @f = file_spec @file_spec = @f.path if @f.respond_to?(:path) # TODO: test for gzip? else # a pathname (or Pathname) @file_spec = file_spec @phys_f = File.open(file_spec) if file_spec.to_s.end_with?(".maf.gz") @f = Zlib::GzipReader.new(@phys_f) @compression = :gzip else @f = @phys_f end end if @file_spec.to_s =~ /\.bgzf?$/ @base_reader = BGZFChunkReader @compression = :bgzf else @base_reader = ChunkReader end @cr = base_reader.new(@f, opts[:chunk_size]) if JRUBY_P && opts[:readahead_thread] LOG.debug "Using ThreadedChunkReaderWrapper." @cr = ThreadedChunkReaderWrapper.new(@cr) end @s = StringScanner.new(cr.read_chunk()) set_last_block_pos! @at_end = false _parse_header() end
Public Instance Methods
Build a merged fetch list in a BGZF-aware way. This will group together all MAF
blocks from a single BGZF block. These MAF
blocks may not be consecutive.
# File lib/bio/maf/parser.rb, line 820 def _merge_bgzf_fetch_list(orig_fl) block_e = orig_fl.chunk { |entry| Bio::BGZF::vo_block_offset(entry[0]) } block_e.collect do |bgzf_block, fl| # text size to read from disk, from the start of the first # block to the end of the last block text_size = fl.last[0] + fl.last[1] - fl.first[0] offsets = fl.collect { |e| e[0] } [fl.first[0], text_size, offsets] end end
# File lib/bio/maf/parser.rb, line 797 def _merge_fetch_list(orig_fl) fl = orig_fl.dup r = [] until fl.empty? do cur = fl.shift if r.last \ && (r.last[0] + r.last[1]) == cur[0] \ && (r.last[1] + cur[1]) <= @merge_max # contiguous with the previous one # add to length and increment count r.last[1] += cur[1] r.last[2] << cur[0] else cur << [cur[0]] r << cur end end return r end
Parse the header of the MAF
file.
# File lib/bio/maf/parser.rb, line 834 def _parse_header parse_error("not a MAF file") unless s.scan(/##maf\s*/) vars = parse_maf_vars() align_params = nil while s.scan(/^#\s*(.+?)\n/) if align_params == nil align_params = s[1] else align_params << ' ' << s[1] end end @header = Header.new(vars, align_params) if ! s.skip_until(BLOCK_START) @at_end = true end end
options should be [:outer, …, :inner]
# File lib/bio/maf/parser.rb, line 898 def _wrap(options, fun, &blk) first = options.shift case first when nil fun.call(&blk) when :sequence_filter conv_map(options, fun, lambda { |b| b if b.sequences.size > 1 }, &blk) when :join_blocks block_joiner(options, fun, &blk) when :as_bio_alignment conv_send(options, fun, :to_bio_alignment, &blk) when :upcase conv_send(options, fun, :upcase!, true, &blk) when :remove_gaps conv_map(options, fun, lambda { |b| b.remove_gaps! if b.filtered?; b }, &blk) else raise "unhandled wrapper mode: #{first}" end end
# File lib/bio/maf/parser.rb, line 937 def block_joiner(options, fun) prev = nil _wrap(options, fun) do |cur| if prev && (prev.filtered? || cur.filtered?) \ && prev.joinable_with?(cur) prev = prev.join(cur) else yield prev if prev prev = cur end end yield prev if prev end
# File lib/bio/maf/parser.rb, line 629 def close f.close end
Create a {ParseContext} for random access, using the given chunk size.
@return [ParseContext] @api private
# File lib/bio/maf/parser.rb, line 638 def context(chunk_size) # IO#dup calls dup(2) internally, but seems broken on JRuby... if file_spec fd = File.open(file_spec) else fd = f.dup end ParseContext.new(fd, chunk_size, self) end
# File lib/bio/maf/parser.rb, line 951 def conv_map(options, search, fun) _wrap(options, search) do |block| v = fun.call(block) yield v if v end end
# File lib/bio/maf/parser.rb, line 958 def conv_send(options, search, sym, always_yield_block=false) _wrap(options, search) do |block| v = block.send(sym) if always_yield_block yield block else yield v if v end end end
Parse all alignment blocks until EOF.
Delegates to {#parse_blocks_parallel} if ‘:threads` is set under JRuby.
@return [Enumerator<Block>] enumerator of {Block}s if no block given. @yield [block] Passes each {Block} in turn to a block @api public
# File lib/bio/maf/parser.rb, line 859 def each_block(&blk) if block_given? if JRUBY_P && opts[:seq_parse_thread] fun = method(:parse_blocks_parallel) else fun = method(:each_block_seq) end wrap_block_seq(fun, &blk) else enum_for(:each_block) end end
# File lib/bio/maf/parser.rb, line 873 def each_block_seq until at_end block = _parse_block() yield block if block end end
Fetch and parse blocks given by ‘fetch_list`.
‘fetch_list` should be an array of `[offset, length]` tuples.
@param [Array] fetch_list the fetch list @yield [block] each block matched, in turn @return [Enumerable<Block>] each matching {Block}, if no block given
# File lib/bio/maf/parser.rb, line 683 def fetch_blocks(fetch_list, &blk) if blk merged = merge_fetch_list(fetch_list) if JRUBY_P && @opts.fetch(:threads, 1) > 1 fun = lambda { |&b2| fetch_blocks_merged_parallel(merged, &b2) } else fun = lambda { |&b2| fetch_blocks_merged(merged, &b2) } end wrap_block_seq(fun, &blk) else enum_for(:fetch_blocks, fetch_list) end end
Fetch and parse the blocks given by the merged fetch list.
@param [Array] fetch_list merged fetch list from {#merge_fetch_list}. @return [Array<Block>] the requested alignment blocks
# File lib/bio/maf/parser.rb, line 701 def fetch_blocks_merged(fetch_list, &blk) start = Time.now total_size = fetch_list.collect { |e| e[1] }.reduce(:+) count = 0 with_context(@random_access_chunk_size) do |ctx| fetch_list.each do |e| ctx.fetch_blocks(*e, &blk) count += 1 end end elapsed = Time.now - start rate = (total_size / 1048576.0) / elapsed LOG.debug { sprintf("Fetched %d blocks in %.3fs, %.1f MB/s.", count, elapsed, rate) } end
Fetch and parse the blocks given by the merged fetch list, in parallel. Uses the number of threads specified by the ‘:threads` parser option.
@param [Array] fetch_list merged fetch list from {#merge_fetch_list}. @return [Array<Block>] the requested alignment blocks
# File lib/bio/maf/parser.rb, line 723 def fetch_blocks_merged_parallel(fetch_list) total_size = fetch_list.collect { |e| e[1] }.reduce(:+) start = Time.now n_threads = @opts.fetch(:threads, 1) # TODO: break entries up into longer runs for more # sequential I/O jobs = java.util.concurrent.ConcurrentLinkedQueue.new(fetch_list) ct = CompletionTracker.new(fetch_list) completed = ct.queue threads = [] n_threads.times { threads << make_worker(jobs, ct) } n_res = 0 while n_res < fetch_list.size c = completed.poll(1, java.util.concurrent.TimeUnit::SECONDS) unless c raise "Worker failed!" if threads.find { |t| t.status.nil? } next end c.each do |block| yield block end n_res += 1 end threads.each { |t| t.join } elapsed = Time.now - start LOG.debug { sprintf("Fetched blocks from %d threads in %.1fs.", n_threads, elapsed) } mb = total_size / 1048576.0 LOG.debug { sprintf("%.3f MB processed (%.1f MB/s).", mb, mb / elapsed) } end
# File lib/bio/maf/parser.rb, line 931 def filter_seq_count(fun) fun.call() do |block| yield block if block.filtered? && block.sequences.size > 1 end end
Create a worker thread for parallel parsing.
@see fetch_blocks_merged_parallel
# File lib/bio/maf/parser.rb, line 761 def make_worker(jobs, ct) Thread.new do begin with_context(@random_access_chunk_size) do |ctx| while true req = jobs.poll break unless req n_blocks = req[2].size blocks = ctx.fetch_blocks(*req).to_a if blocks.size != n_blocks raise "expected #{n_blocks}, got #{blocks.size}: #{e.inspect}" end ct << blocks end end rescue Exception => e LOG.error "Worker failing: #{e.class}: #{e}" LOG.error e raise e end end end
Merge contiguous blocks in the given fetch list, up to ‘:merge_max` bytes.
Returns ‘[offset, size, [offset1, offset2, …]]` tuples.
# File lib/bio/maf/parser.rb, line 788 def merge_fetch_list(orig_fl) case compression when nil _merge_fetch_list(orig_fl) when :bgzf _merge_bgzf_fetch_list(orig_fl) end end
# File lib/bio/maf/parser.rb, line 880 def parse_block b = nil wrap_block_seq(lambda { |&blk| blk.call(_parse_block()) }) do |block| b = block end b end
Parse alignment blocks with a worker thread.
@block block handler @api private
# File lib/bio/maf/parser.rb, line 973 def parse_blocks_parallel queue = java.util.concurrent.LinkedBlockingQueue.new(128) worker = Thread.new do begin LOG.debug "Starting parse worker." until at_end block = _parse_block() queue.put(block) if block end queue.put(:eof) LOG.debug { "Parse worker reached EOF." } rescue Exception LOG.error $! Thread.current[:exception] = $! raise end end saw_eof = false n_final_poll = 0 while true block = queue.poll(1, java.util.concurrent.TimeUnit::SECONDS) if block == :eof saw_eof = true break elsif block yield block else # timed out unless worker.alive? LOG.debug "Worker has exited." n_final_poll += 1 end end break if n_final_poll > 1 end unless saw_eof raise "worker exited unexpectedly from #{worker[:exception]}!" end end
Sequence
filter to apply. @api public @return [Hash]
# File lib/bio/maf/parser.rb, line 665 def sequence_filter @sequence_filter ||= {} end
Set the sequence filter. @api public @param [Hash] filter the new filter
# File lib/bio/maf/parser.rb, line 672 def sequence_filter=(filter) @sequence_filter = filter end
Execute the given block with a {ParseContext} using the given ‘chunk_size` as an argument.
@see context
@api private
# File lib/bio/maf/parser.rb, line 653 def with_context(chunk_size) ctx = context(chunk_size) begin yield ctx ensure ctx.f.close end end
# File lib/bio/maf/parser.rb, line 890 def wrap_block_seq(fun, &blk) opts = WRAP_OPTS.find_all { |o| @opts[o] } opts << :sequence_filter if sequence_filter && (! sequence_filter.empty?) LOG.debug { "wrapping #{fun} with #{opts.inspect}" } _wrap(opts, fun, &blk) end