class Bio::MAF::Parser

MAF parser, used for sequential and random-access parsing.

Options:

* `:parse_extended`: whether to parse 'i' and 'q' lines
* `:parse_empty`: whether to parse 'e' lines
* `:remove_gaps`: remove gaps left after filtering sequences
* `:join_blocks`: join blocks where possible
* `:upcase`: fold sequence data to upper case
* `:chunk_size`: read MAF file in chunks of this many bytes
* `:random_chunk_size`: as above, but for random access ({#fetch_blocks})
* `:merge_max`: merge up to this many bytes of blocks for
  random access
* `:threads`: number of threads to use for parallel
  parsing. Only useful under JRuby.
* `:strict`: abort on un-parseable lines instead of continuing with
  a warning.

@api public

Constants

DEFAULT_OPTS
MERGE_MAX
RANDOM_CHUNK_SIZE
SEQ_CHUNK_SIZE
WRAP_OPTS

Attributes

at_end[R]

@return [Boolean] whether EOF has been reached.

base_reader[R]

@return [Class] ChunkReader class to use for random access @see ParseContext

chunk_start[R]

@return [Integer] starting offset of the current chunk.

compression[R]

@return [Symbol] compression method used for this file, or nil

cr[R]

@return [ChunkReader] ChunkReader.

f[R]

@return [IO] file handle for MAF file.

file_spec[R]

@return [String] path of MAF file being parsed.

header[R]

@return [Header] header of the MAF file being parsed.

last_block_pos[R]

@return [Integer] offset of the last block start in this chunk.

opts[R]

@return [Hash] parser options.

parse_empty[RW]
parse_extended[RW]

@api private

phys_f[R]

May be gzip-compressed. @return [IO] file handle for physical MAF file. @api private

s[R]

@return [StringScanner] scanner for parsing.

Public Class Methods

new(file_spec, parse_opts={}) click to toggle source

Create a new parser instance.

@param [String] file_spec path of file to parse. @param [Hash] parse_opts parser options. @api public

# File lib/bio/maf/parser.rb, line 587
def initialize(file_spec, parse_opts={})
  opts = DEFAULT_OPTS.merge(parse_opts)
  @opts = opts
  @random_access_chunk_size = opts[:random_chunk_size]
  @merge_max = opts[:merge_max]
  @parse_extended = opts[:parse_extended]
  @parse_empty = opts[:parse_empty]
  @chunk_start = 0
  if file_spec.respond_to? :flush
    # an IO object
    # guess what, Pathnames respond to :read...
    @f = file_spec
    @file_spec = @f.path if @f.respond_to?(:path)
    # TODO: test for gzip?
  else
    # a pathname (or Pathname)
    @file_spec = file_spec
    @phys_f = File.open(file_spec)
    if file_spec.to_s.end_with?(".maf.gz")
      @f = Zlib::GzipReader.new(@phys_f)
      @compression = :gzip
    else
      @f = @phys_f
    end
  end
  if @file_spec.to_s =~ /\.bgzf?$/
    @base_reader = BGZFChunkReader
    @compression = :bgzf
  else
    @base_reader = ChunkReader
  end
  @cr = base_reader.new(@f, opts[:chunk_size])
  if JRUBY_P && opts[:readahead_thread]
    LOG.debug "Using ThreadedChunkReaderWrapper."
    @cr = ThreadedChunkReaderWrapper.new(@cr)
  end
  @s = StringScanner.new(cr.read_chunk())
  set_last_block_pos!
  @at_end = false
  _parse_header()
end

Public Instance Methods

_merge_bgzf_fetch_list(orig_fl) click to toggle source

Build a merged fetch list in a BGZF-aware way. This will group together all MAF blocks from a single BGZF block. These MAF blocks may not be consecutive.

# File lib/bio/maf/parser.rb, line 820
def _merge_bgzf_fetch_list(orig_fl)
  block_e = orig_fl.chunk { |entry|
    Bio::BGZF::vo_block_offset(entry[0])
  }
  block_e.collect do |bgzf_block, fl|
    # text size to read from disk, from the start of the first
    # block to the end of the last block
    text_size = fl.last[0] + fl.last[1] - fl.first[0]
    offsets = fl.collect { |e| e[0] }
    [fl.first[0], text_size, offsets]
  end
end
_merge_fetch_list(orig_fl) click to toggle source
# File lib/bio/maf/parser.rb, line 797
def _merge_fetch_list(orig_fl)
  fl = orig_fl.dup
  r = []
  until fl.empty? do
    cur = fl.shift
    if r.last \
      && (r.last[0] + r.last[1]) == cur[0] \
      && (r.last[1] + cur[1]) <= @merge_max
      # contiguous with the previous one
      # add to length and increment count
      r.last[1] += cur[1]
      r.last[2] << cur[0]
    else
      cur << [cur[0]]
      r << cur
    end
  end
  return r
end
_parse_header() click to toggle source

Parse the header of the MAF file.

# File lib/bio/maf/parser.rb, line 834
def _parse_header
  parse_error("not a MAF file") unless s.scan(/##maf\s*/)
  vars = parse_maf_vars()
  align_params = nil
  while s.scan(/^#\s*(.+?)\n/)
    if align_params == nil
      align_params = s[1]
    else
      align_params << ' ' << s[1]
    end
  end
  @header = Header.new(vars, align_params)
  if ! s.skip_until(BLOCK_START)
    @at_end = true
  end
end
_wrap(options, fun, &blk) click to toggle source

options should be [:outer, …, :inner]

# File lib/bio/maf/parser.rb, line 898
def _wrap(options, fun, &blk)
  first = options.shift
  case first
  when nil
    fun.call(&blk)
  when :sequence_filter
    conv_map(options,
             fun,
             lambda { |b| b if b.sequences.size > 1 },
             &blk)
  when :join_blocks
    block_joiner(options, fun, &blk)
  when :as_bio_alignment
    conv_send(options,
              fun,
              :to_bio_alignment,
              &blk)
  when :upcase
    conv_send(options,
              fun,
              :upcase!,
              true,
              &blk)
  when :remove_gaps
    conv_map(options,
             fun,
             lambda { |b| b.remove_gaps! if b.filtered?; b },
             &blk)
  else
    raise "unhandled wrapper mode: #{first}"
  end
end
block_joiner(options, fun) { |prev| ... } click to toggle source
# File lib/bio/maf/parser.rb, line 937
def block_joiner(options, fun)
  prev = nil
  _wrap(options, fun) do |cur|
    if prev && (prev.filtered? || cur.filtered?) \
      && prev.joinable_with?(cur)
      prev = prev.join(cur)
    else
      yield prev if prev
      prev = cur
    end
  end
  yield prev if prev
end
close() click to toggle source
# File lib/bio/maf/parser.rb, line 629
def close
  f.close
end
context(chunk_size) click to toggle source

Create a {ParseContext} for random access, using the given chunk size.

@return [ParseContext] @api private

# File lib/bio/maf/parser.rb, line 638
def context(chunk_size)
  # IO#dup calls dup(2) internally, but seems broken on JRuby...
  if file_spec
    fd = File.open(file_spec)
  else
    fd = f.dup
  end
  ParseContext.new(fd, chunk_size, self)
end
conv_map(options, search, fun) { |v| ... } click to toggle source
# File lib/bio/maf/parser.rb, line 951
def conv_map(options, search, fun)
  _wrap(options, search) do |block|
    v = fun.call(block)
    yield v if v
  end
end
conv_send(options, search, sym, always_yield_block=false) { |block| ... } click to toggle source
# File lib/bio/maf/parser.rb, line 958
def conv_send(options, search, sym, always_yield_block=false)
  _wrap(options, search) do |block|
    v = block.send(sym)
    if always_yield_block
      yield block
    else
      yield v if v
    end
  end
end
each_block(&blk) click to toggle source

Parse all alignment blocks until EOF.

Delegates to {#parse_blocks_parallel} if ‘:threads` is set under JRuby.

@return [Enumerator<Block>] enumerator of {Block}s if no block given. @yield [block] Passes each {Block} in turn to a block @api public

# File lib/bio/maf/parser.rb, line 859
def each_block(&blk)
  if block_given?
    if JRUBY_P && opts[:seq_parse_thread]
      fun = method(:parse_blocks_parallel)
    else
      fun = method(:each_block_seq)
    end
    wrap_block_seq(fun, &blk)
  else
    enum_for(:each_block)
  end
end
Also aliased as: parse_blocks
each_block_seq() { |block| ... } click to toggle source
# File lib/bio/maf/parser.rb, line 873
def each_block_seq
  until at_end
    block = _parse_block()
    yield block if block
  end
end
fetch_blocks(fetch_list, &blk) click to toggle source

Fetch and parse blocks given by ‘fetch_list`.

‘fetch_list` should be an array of `[offset, length]` tuples.

@param [Array] fetch_list the fetch list @yield [block] each block matched, in turn @return [Enumerable<Block>] each matching {Block}, if no block given

# File lib/bio/maf/parser.rb, line 683
def fetch_blocks(fetch_list, &blk)
  if blk
    merged = merge_fetch_list(fetch_list)
    if JRUBY_P && @opts.fetch(:threads, 1) > 1
      fun = lambda { |&b2| fetch_blocks_merged_parallel(merged, &b2) }
    else
      fun = lambda { |&b2| fetch_blocks_merged(merged, &b2) }
    end
    wrap_block_seq(fun, &blk)
  else
    enum_for(:fetch_blocks, fetch_list)
  end
end
fetch_blocks_merged(fetch_list, &blk) click to toggle source

Fetch and parse the blocks given by the merged fetch list.

@param [Array] fetch_list merged fetch list from {#merge_fetch_list}. @return [Array<Block>] the requested alignment blocks

# File lib/bio/maf/parser.rb, line 701
def fetch_blocks_merged(fetch_list, &blk)
  start = Time.now
  total_size = fetch_list.collect { |e| e[1] }.reduce(:+)
  count = 0
  with_context(@random_access_chunk_size) do |ctx|
    fetch_list.each do |e|
      ctx.fetch_blocks(*e, &blk)
      count += 1
    end
  end
  elapsed = Time.now - start
  rate = (total_size / 1048576.0) / elapsed
  LOG.debug { sprintf("Fetched %d blocks in %.3fs, %.1f MB/s.",
                      count, elapsed, rate) }
end
fetch_blocks_merged_parallel(fetch_list) { |block| ... } click to toggle source

Fetch and parse the blocks given by the merged fetch list, in parallel. Uses the number of threads specified by the ‘:threads` parser option.

@param [Array] fetch_list merged fetch list from {#merge_fetch_list}. @return [Array<Block>] the requested alignment blocks

# File lib/bio/maf/parser.rb, line 723
def fetch_blocks_merged_parallel(fetch_list)
  total_size = fetch_list.collect { |e| e[1] }.reduce(:+)
  start = Time.now
  n_threads = @opts.fetch(:threads, 1)
  # TODO: break entries up into longer runs for more
  # sequential I/O
  jobs = java.util.concurrent.ConcurrentLinkedQueue.new(fetch_list)
  ct = CompletionTracker.new(fetch_list)
  completed = ct.queue
  threads = []
  n_threads.times { threads << make_worker(jobs, ct) }

  n_res = 0
  while n_res < fetch_list.size
    c = completed.poll(1, java.util.concurrent.TimeUnit::SECONDS)
    unless c
      raise "Worker failed!" if threads.find { |t| t.status.nil? }
      next
    end
    c.each do |block|
      yield block
    end
    n_res += 1
  end
  threads.each { |t| t.join }
  elapsed = Time.now - start
  LOG.debug { sprintf("Fetched blocks from %d threads in %.1fs.",
                      n_threads,
                      elapsed) }
  mb = total_size / 1048576.0
  LOG.debug { sprintf("%.3f MB processed (%.1f MB/s).",
                      mb,
                      mb / elapsed) }
end
filter_seq_count(fun) { |block| ... } click to toggle source
# File lib/bio/maf/parser.rb, line 931
def filter_seq_count(fun)
  fun.call() do |block|
    yield block if block.filtered? && block.sequences.size > 1
  end
end
make_worker(jobs, ct) click to toggle source

Create a worker thread for parallel parsing.

@see fetch_blocks_merged_parallel

# File lib/bio/maf/parser.rb, line 761
def make_worker(jobs, ct)
  Thread.new do
    begin
      with_context(@random_access_chunk_size) do |ctx|
        while true
          req = jobs.poll
          break unless req
          n_blocks = req[2].size
          blocks = ctx.fetch_blocks(*req).to_a
          if blocks.size != n_blocks
            raise "expected #{n_blocks}, got #{blocks.size}: #{e.inspect}"
          end
          ct << blocks
        end
      end
    rescue Exception => e
      LOG.error "Worker failing: #{e.class}: #{e}"
      LOG.error e
      raise e
    end
  end
end
merge_fetch_list(orig_fl) click to toggle source

Merge contiguous blocks in the given fetch list, up to ‘:merge_max` bytes.

Returns ‘[offset, size, [offset1, offset2, …]]` tuples.

# File lib/bio/maf/parser.rb, line 788
def merge_fetch_list(orig_fl)
  case compression
  when nil
    _merge_fetch_list(orig_fl)
  when :bgzf
    _merge_bgzf_fetch_list(orig_fl)
  end
end
parse_block() click to toggle source
# File lib/bio/maf/parser.rb, line 880
def parse_block
  b = nil
  wrap_block_seq(lambda { |&blk| blk.call(_parse_block()) }) do |block|
    b = block
  end
  b
end
parse_blocks(&blk)
Alias for: each_block
parse_blocks_parallel() { |block| ... } click to toggle source

Parse alignment blocks with a worker thread.

@block block handler @api private

# File lib/bio/maf/parser.rb, line 973
def parse_blocks_parallel
  queue = java.util.concurrent.LinkedBlockingQueue.new(128)
  worker = Thread.new do
    begin
      LOG.debug "Starting parse worker."
      until at_end
        block = _parse_block()
        queue.put(block) if block
      end
      queue.put(:eof)
      LOG.debug { "Parse worker reached EOF." }
    rescue Exception
      LOG.error $!
      Thread.current[:exception] = $!
      raise
    end
  end
  saw_eof = false
  n_final_poll = 0
  while true
    block = queue.poll(1, java.util.concurrent.TimeUnit::SECONDS)
    if block == :eof
      saw_eof = true
      break
    elsif block
      yield block
    else
      # timed out
      unless worker.alive?
        LOG.debug "Worker has exited."
        n_final_poll += 1
      end
    end
    break if n_final_poll > 1
  end
  unless saw_eof
    raise "worker exited unexpectedly from #{worker[:exception]}!"
  end
end
sequence_filter() click to toggle source

Sequence filter to apply. @api public @return [Hash]

# File lib/bio/maf/parser.rb, line 665
def sequence_filter
  @sequence_filter ||= {}
end
sequence_filter=(filter) click to toggle source

Set the sequence filter. @api public @param [Hash] filter the new filter

# File lib/bio/maf/parser.rb, line 672
def sequence_filter=(filter)
  @sequence_filter = filter
end
with_context(chunk_size) { |ctx| ... } click to toggle source

Execute the given block with a {ParseContext} using the given ‘chunk_size` as an argument.

@see context @api private

# File lib/bio/maf/parser.rb, line 653
def with_context(chunk_size)
  ctx = context(chunk_size)
  begin
    yield ctx
  ensure
    ctx.f.close
  end
end
wrap_block_seq(fun, &blk) click to toggle source
# File lib/bio/maf/parser.rb, line 890
def wrap_block_seq(fun, &blk)
  opts = WRAP_OPTS.find_all { |o| @opts[o] }
  opts << :sequence_filter if sequence_filter && (! sequence_filter.empty?)
  LOG.debug { "wrapping #{fun} with #{opts.inspect}" }
  _wrap(opts, fun, &blk)
end