class Bio::MAF::KyotoIndex

Constants

CHUNK_THRESHOLD_BLOCKS
CHUNK_THRESHOLD_BYTES
COMPRESSION_KEY
FILE_KEY
FORMAT_VERSION
FORMAT_VERSION_KEY
MAX_SPECIES
REF_SEQ_KEY

Attributes

db[R]
index_sequences[RW]
maf_file[R]
path[R]
ref_only[R]
ref_seq[RW]
species[R]
species_max_id[R]

Public Class Methods

build(parser, path, ref_only=true) click to toggle source

Build a new index from the MAF file being parsed by ‘parser`, and store it in `path`. @param [Parser] parser MAF parser for file to index @param [String] path path to index file to create @return [KyotoIndex]

# File lib/bio/maf/index.rb, line 398
def self.build(parser, path, ref_only=true)
  idx = self.new(path)
  idx.build(parser, ref_only)
  return idx
end
new(path, db_arg=nil) click to toggle source
KyotoIndex Internals

@api private

# File lib/bio/maf/index.rb, line 469
def initialize(path, db_arg=nil)
  @species = {}
  @species_max_id = -1
  @index_sequences = {}
  @max_sid = -1
  if db_arg || ((path.size > 1) and File.exist?(path))
    mode = KyotoCabinet::DB::OREADER
  else
    mode = KyotoCabinet::DB::OWRITER | KyotoCabinet::DB::OCREATE
  end
  @db = db_arg || KyotoCabinet::DB.new
  @path = path
  path_str = "#{path.to_s}#opts=ls#dfunit=100000"
  unless db_arg || db.open(path_str, mode)
    raise "Could not open DB file!"
  end
  if mode == KyotoCabinet::DB::OREADER
    version = db[FORMAT_VERSION_KEY].to_i
    if version != FORMAT_VERSION
      raise "Index #{path} is version #{version}, expecting version #{FORMAT_VERSION}!"
    end
    @maf_file = db[FILE_KEY]
    self.ref_seq = db[REF_SEQ_KEY]
    load_index_sequences
    load_species
  end
  @mutex = Mutex.new
end
open(path) click to toggle source

Open an existing index for reading. @param [String] path path to existing Kyoto Cabinet index @return [KyotoIndex]

# File lib/bio/maf/index.rb, line 389
def self.open(path)
  return KyotoIndex.new(path)
end

Public Instance Methods

build(parser, ref_only=true) click to toggle source
# File lib/bio/maf/index.rb, line 742
def build(parser, ref_only=true)
  prep(parser.file_spec,
       parser.compression,
       ref_only)

  n = 0
  acc = []
  acc_bytes = 0
  parser.each_block do |block|
    acc << block
    acc_bytes += block.size
    if acc_bytes > CHUNK_THRESHOLD_BYTES \
      || acc.size > CHUNK_THRESHOLD_BLOCKS
      index_blocks(acc)
      acc = []
      acc_bytes = 0
    end
    n += 1
  end
  index_blocks(acc)
  LOG.debug { "Created index for #{n} blocks and #{@index_sequences.size} sequences." }
  db.synchronize(true)
end
build_block_value(block) click to toggle source
# File lib/bio/maf/index.rb, line 840
def build_block_value(block)
  bits = block.sequences.collect {|s| 1 << species_id_for_seq(s.source) }
  vec = bits.reduce(0, :|)
  return [block.offset,
          block.size,
          block.text_size,
          block.sequences.size,
          vec].pack(VAL_FMT)
end
close() click to toggle source

Close the underlying Kyoto Cabinet database handle.

# File lib/bio/maf/index.rb, line 451
def close
  db.close
end
dump(stream=$stdout) click to toggle source
# File lib/bio/maf/index.rb, line 507
def dump(stream=$stdout)
  bgzf = (db[COMPRESSION_KEY] == 'bgzf')
  stream.puts "KyotoIndex dump: #{@path}"
  stream.puts
  if db.count == 0
    stream.puts "Empty database!"
    return
  end
  db.cursor_process do |cur|
    stream.puts "== Metadata =="
    cur.jump('')
    while true
      k, v = cur.get(false)
      raise "unexpected end of records!" unless k
      break if k[0] == "\xff"
      stream.puts "#{k}: #{v}"
      unless cur.step
        raise "could not advance cursor!"
      end
    end
    stream.puts "== Index records =="
    while pair = cur.get(true)
      _, chr, bin, s_start, s_end = pair[0].unpack(KEY_FMT)
      offset, len, text_size, n_seq, species_vec = pair[1].unpack(VAL_FMT)
      stream.puts "#{chr} [bin #{bin}] #{s_start}:#{s_end}"
      stream.puts "  offset #{offset}, length #{len}"
      if bgzf
        block = Bio::BGZF.vo_block_offset(offset)
        data = Bio::BGZF.vo_data_offset(offset)
        stream.puts "  BGZF block offset #{block}, data offset #{data}"
      end
      stream.puts "  text size: #{text_size}"
      stream.puts "  sequences in block: #{n_seq}"
      stream.printf("  species vector: %016x\n", species_vec)
    end
  end
end
entries_for(block) click to toggle source
# File lib/bio/maf/index.rb, line 850
def entries_for(block)
  begin
    unless block.ref_seq.source == @ref_seq
      raise "Inconsistent reference sequence: expected #{@ref_seq}, got #{block.ref_seq.source}"
    end
    h = {}
    val = build_block_value(block)
    to_index = ref_only ? [block.sequences.first] : block.sequences
    to_index.each do |seq|
      seq_id = seq_id_for(seq.source)
      # size 0 occurs in e.g. upstream1000.maf.gz
      next if seq.size == 0
      seq_end = seq.start + seq.size
      bin = Bio::Ucsc::UcscBin.bin_from_range(seq.start, seq_end)
      key = [255, seq_id, bin, seq.start, seq_end].pack(KEY_FMT)
      h[key] = val
    end
    return h
  rescue Exception => e
    LOG.error "Failed to index block at offset #{block.offset}:\n#{block}"
    raise e
  end
end
fetch_list(intervals, filter_spec={}) click to toggle source

Build a fetch list of alignment blocks to read, given an array of Bio::GenomicInterval objects

# File lib/bio/maf/index.rb, line 567
def fetch_list(intervals, filter_spec={})
  filter_spec ||= {}
  filters = Filters.build(filter_spec, self)
  chrom = intervals.first.chrom
  chrom_id = index_sequences[chrom]
  unless chrom_id
    raise "chromosome #{chrom} not indexed!"
  end
  if intervals.find { |i| i.chrom != chrom }
    raise "all intervals must be for the same chromosome!"
  end
  # for each bin, build a list of the intervals to look for there
  bin_intervals = Hash.new { |h, k| h[k] = [] }
  intervals.each do |i|
    i.bin_all.each do |bin|
      bin_intervals[bin] << (i.zero_start...i.zero_end)
    end
  end
  bin_intervals.values.each do |intervals|
    intervals.sort_by! {|i| i.begin}
  end
  matches = if RUBY_PLATFORM == 'java' && bin_intervals.size > 4
              scan_bins_parallel(chrom_id, bin_intervals, filters)
            else
              scan_bins(chrom_id, bin_intervals, filters)
            end
  matches.sort_by! { |e| e[0] } # sort by offset in file
end
find(intervals, parser, filter={}, &blk) click to toggle source

Find all alignment blocks in the genomic regions in the list of Bio::GenomicInterval objects, and parse them with the given parser.

An optional Hash of filters may be passed in. The following keys are used:

* `:with_all_species => ["sp1", "sp2", ...]`

    Only match alignment blocks containing all given species.

* `:at_least_n_sequences => n`

    Only match alignment blocks with at least N sequences.

* `:min_size => n`

    Only match alignment blocks with text size at least N.

* `:max_size => n`

    Only match alignment blocks with text size at most N.

@param [Enumerable<Bio::GenomicInterval>] intervals genomic

intervals to parse.

@param [Parser] parser MAF parser for file to fetch blocks

from.

@param [Hash] filter Block filter expression. @yield [block] each {Block} matched, in turn @return [Enumerable<Block>] each matching {Block}, if no block given @api public

# File lib/bio/maf/index.rb, line 435
def find(intervals, parser, filter={}, &blk)
  start = Time.now
  fl = fetch_list(intervals, filter)
  LOG.debug { sprintf("Built fetch list of %d items in %.3fs.",
                      fl.size,
                      Time.now - start) }
  if ! fl.empty?
    parser.fetch_blocks(fl, &blk)
  else
    if ! block_given?
     []
    end
  end
end
index_blocks(blocks) click to toggle source
# File lib/bio/maf/index.rb, line 766
def index_blocks(blocks)
  h = @mutex.synchronize do
    if ! @seen_first
      # set the reference sequence from the first block
      first_block = blocks.first
      self.ref_seq = first_block.sequences.first.source
      db[REF_SEQ_KEY] = ref_seq
      @seen_first = true
    end
    blocks.map { |b| entries_for(b) }.reduce(:merge!)
  end
  db.set_bulk(h, false)
end
load_index_sequences() click to toggle source
# File lib/bio/maf/index.rb, line 780
def load_index_sequences
  h = {}
  db.match_prefix("sequence:").each do |key|
    _, name = key.split(':', 2)
    id = db[key].to_i
    h[name] = id
  end
  @index_sequences = h
  @max_sid = @index_sequences.values.max
end
load_species() click to toggle source
# File lib/bio/maf/index.rb, line 805
def load_species
  db.match_prefix("species:").each do |key|
    _, name = key.split(':', 2)
    id = db[key].to_i
    @species[name] = id
  end
  @species_max_id = @species.values.sort.last || -1
end
make_scan_worker(jobs, completed) { |cur, req| ... } click to toggle source
# File lib/bio/maf/index.rb, line 660
def make_scan_worker(jobs, completed)
  Thread.new do
    with_profiling do
      db.cursor_process do |cur|
        while true
          req = jobs.poll
          break unless req
          begin
            result = yield(cur, req)
            completed.put(result)
          rescue Exception => e
            completed.put(e)
            LOG.error "Worker failing: #{e.class}: #{e}"
            LOG.error e
            raise e
          end
        end
      end
    end
  end
end
overlaps?(gi, i_start, i_end) click to toggle source
# File lib/bio/maf/index.rb, line 721
def overlaps?(gi, i_start, i_end)
  g_start = gi.begin

  (i_start <= g_start && g_start < i_end) \
   || gi.include?(i_start)
end
prep(file_spec, compression, ref_only) click to toggle source
# File lib/bio/maf/index.rb, line 731
def prep(file_spec, compression, ref_only)
  db[FORMAT_VERSION_KEY] = FORMAT_VERSION
  db[FILE_KEY] = File.basename(file_spec)
  @maf_file = db[FILE_KEY]
  if compression
    db[COMPRESSION_KEY] = compression.to_s
  end
  @ref_only = ref_only
  @seen_first = false
end
reopen() click to toggle source

Reopen the same DB handle read-only. Only useful for unit tests.

# File lib/bio/maf/index.rb, line 503
def reopen
  KyotoIndex.new(@path, @db)
end
scan_bin(cur, chrom_id, bin, bin_intervals, filters) click to toggle source
# File lib/bio/maf/index.rb, line 682
def scan_bin(cur, chrom_id, bin, bin_intervals, filters)
  # bin_intervals is sorted by zero_start
  # compute the start and end of all intervals of interest
  spanning_start = bin_intervals.first.begin
  spanning_end = bin_intervals.map {|i| i.end}.max
  # scan from the start of the bin
  cur.jump(bin_start_prefix(chrom_id, bin))
  matches = []
  while pair = cur.get(true)
    c_chr, c_bin, c_start, c_end = pair[0].unpack(KEY_SCAN_FMT)
    if (c_chr != chrom_id) \
      || (c_bin != bin) \
      || c_start >= spanning_end
      # we've hit the next bin, or chromosome, or gone past
      # the spanning interval, so we're done with this bin
      break
    end
    if c_end >= spanning_start # possible overlap
      # any intervals that end before the start of the current
      # block are no longer relevant
      while bin_intervals.first.end < c_start
        bin_intervals.shift
      end
      bin_intervals.each do |i|
        i_start = i.begin
        break if i_start > c_end
        if ((c_start <= i_start && i_start < c_end) \
            || i.include?(c_start)) \
            && filters.match(pair)
          # match
          matches << extract_index_offset(pair)
          break
        end
      end
    end
  end
  matches
end
scan_bins(chrom_id, bin_intervals, filters) click to toggle source

Scan the index for blocks matching the given bins and intervals.

# File lib/bio/maf/index.rb, line 597
def scan_bins(chrom_id, bin_intervals, filters)
  to_fetch = []
  db.cursor_process do |cur|
    bin_intervals.each do |bin, bin_intervals_raw|
      matches = scan_bin(cur, chrom_id, bin, bin_intervals_raw, filters)
      to_fetch.concat(matches)
    end 
  end
  to_fetch
end
scan_bins_parallel(chrom_id, bin_intervals, filters) click to toggle source
# File lib/bio/maf/index.rb, line 622
def scan_bins_parallel(chrom_id, bin_intervals, filters)
  LOG.debug {
    sprintf("Beginning scan of %d bin intervals %s filters.",
            bin_intervals.size,
            filters.empty? ? "without" : "with")
  }
  start = Time.now
  n_threads = ENV['profile'] ? 1 : java.lang.Runtime.runtime.availableProcessors
  jobs = java.util.concurrent.ConcurrentLinkedQueue.new(bin_intervals.to_a)
  completed = java.util.concurrent.LinkedBlockingQueue.new(128)
  threads = []
  n_threads.times do
    threads << make_scan_worker(jobs, completed) do |cur, req|
      bin, intervals = req
      scan_bin(cur, chrom_id, bin, intervals, filters)
    end
  end
  n_completed = 0
  to_fetch = []
  while (n_completed < bin_intervals.size)
    c = completed.poll(5, java.util.concurrent.TimeUnit::SECONDS)
    if c.nil?
      if threads.find { |t| t.alive? }
        next
      else
        raise "No threads alive, completed #{n_completed}/#{bin_intervals.size} jobs!"
      end
    end
    raise "worker failed: #{c}" if c.is_a? Exception
    to_fetch.concat(c)
    n_completed += 1
  end
  threads.each { |t| t.join }
  LOG.debug { sprintf("Matched %d index records with %d threads in %.3f seconds.",
                      to_fetch.size, n_threads, Time.now - start) }
  to_fetch
end
seq_id_for(name) click to toggle source
# File lib/bio/maf/index.rb, line 791
def seq_id_for(name)
  sid = index_sequences[name]
  if ! sid
    @max_sid += 1
    sid = @max_sid
    # "" << foo is hideous but apparently what it takes to get a
    # non-shared copy of a string on JRuby...
    name_copy = "" << name
    db.set("sequence:#{name_copy}", sid.to_s)
    index_sequences[name_copy] = sid
  end
  return sid
end
slice(interval, parser, filter={}) { |slice| ... } click to toggle source
# File lib/bio/maf/index.rb, line 455
def slice(interval, parser, filter={})
  if block_given?
    find([interval], parser, filter) do |block|
      yield block.slice(interval)
    end
  else
    LOG.debug { "accumulating results of #slice" }
    enum_for(:slice, interval, parser, filter)
  end
end
species_id_for_seq(seq) click to toggle source
# File lib/bio/maf/index.rb, line 814
def species_id_for_seq(seq)
  # NB can have multiple dots
  # example: otoGar1.scaffold_104707.1-93001
  parts = seq.split('.', 2)
  if parts.size == 2
    # "" << foo is hideous but apparently what it takes to get a
    # non-shared copy of a string on JRuby...
    species_name = "" << parts[0]
  else
    # not in species.sequence format, apparently
    species_name = "" << seq
  end
  if species.has_key? species_name
    return species[species_name]
  else
    species_id = @species_max_id + 1
    if species_id >= MAX_SPECIES
      raise "cannot index MAF file with more than #{MAX_SPECIES} species"
    end
    species[species_name] = species_id
    db["species:#{species_name}"] = species_id
    @species_max_id = species_id
    return species_id
  end
end
to_s() click to toggle source
# File lib/bio/maf/index.rb, line 498
def to_s
  "#<KyotoIndex path=#{path}>"
end
with_profiling() { || ... } click to toggle source
# File lib/bio/maf/index.rb, line 608
def with_profiling
  if RUBY_PLATFORM == 'java' && ENV['profile']
    rv = nil
    pdata = JRuby::Profiler.profile do
      rv = yield
    end
    printer = JRuby::Profiler::FlatProfilePrinter.new(pdata)
    printer.printProfile(STDERR)
    return rv
  else
    yield
  end
end