class Bio::MAF::KyotoIndex
Constants
- CHUNK_THRESHOLD_BLOCKS
- CHUNK_THRESHOLD_BYTES
- COMPRESSION_KEY
- FILE_KEY
- FORMAT_VERSION
- FORMAT_VERSION_KEY
- MAX_SPECIES
- REF_SEQ_KEY
Attributes
Public Class Methods
Build a new index from the MAF
file being parsed by ‘parser`, and store it in `path`. @param [Parser] parser MAF
parser for file to index @param [String] path path to index file to create @return [KyotoIndex]
# File lib/bio/maf/index.rb, line 398 def self.build(parser, path, ref_only=true) idx = self.new(path) idx.build(parser, ref_only) return idx end
KyotoIndex Internals
@api private
# File lib/bio/maf/index.rb, line 469 def initialize(path, db_arg=nil) @species = {} @species_max_id = -1 @index_sequences = {} @max_sid = -1 if db_arg || ((path.size > 1) and File.exist?(path)) mode = KyotoCabinet::DB::OREADER else mode = KyotoCabinet::DB::OWRITER | KyotoCabinet::DB::OCREATE end @db = db_arg || KyotoCabinet::DB.new @path = path path_str = "#{path.to_s}#opts=ls#dfunit=100000" unless db_arg || db.open(path_str, mode) raise "Could not open DB file!" end if mode == KyotoCabinet::DB::OREADER version = db[FORMAT_VERSION_KEY].to_i if version != FORMAT_VERSION raise "Index #{path} is version #{version}, expecting version #{FORMAT_VERSION}!" end @maf_file = db[FILE_KEY] self.ref_seq = db[REF_SEQ_KEY] load_index_sequences load_species end @mutex = Mutex.new end
Open an existing index for reading. @param [String] path path to existing Kyoto Cabinet index @return [KyotoIndex]
# File lib/bio/maf/index.rb, line 389 def self.open(path) return KyotoIndex.new(path) end
Public Instance Methods
# File lib/bio/maf/index.rb, line 742 def build(parser, ref_only=true) prep(parser.file_spec, parser.compression, ref_only) n = 0 acc = [] acc_bytes = 0 parser.each_block do |block| acc << block acc_bytes += block.size if acc_bytes > CHUNK_THRESHOLD_BYTES \ || acc.size > CHUNK_THRESHOLD_BLOCKS index_blocks(acc) acc = [] acc_bytes = 0 end n += 1 end index_blocks(acc) LOG.debug { "Created index for #{n} blocks and #{@index_sequences.size} sequences." } db.synchronize(true) end
# File lib/bio/maf/index.rb, line 840 def build_block_value(block) bits = block.sequences.collect {|s| 1 << species_id_for_seq(s.source) } vec = bits.reduce(0, :|) return [block.offset, block.size, block.text_size, block.sequences.size, vec].pack(VAL_FMT) end
Close the underlying Kyoto Cabinet database handle.
# File lib/bio/maf/index.rb, line 451 def close db.close end
# File lib/bio/maf/index.rb, line 507 def dump(stream=$stdout) bgzf = (db[COMPRESSION_KEY] == 'bgzf') stream.puts "KyotoIndex dump: #{@path}" stream.puts if db.count == 0 stream.puts "Empty database!" return end db.cursor_process do |cur| stream.puts "== Metadata ==" cur.jump('') while true k, v = cur.get(false) raise "unexpected end of records!" unless k break if k[0] == "\xff" stream.puts "#{k}: #{v}" unless cur.step raise "could not advance cursor!" end end stream.puts "== Index records ==" while pair = cur.get(true) _, chr, bin, s_start, s_end = pair[0].unpack(KEY_FMT) offset, len, text_size, n_seq, species_vec = pair[1].unpack(VAL_FMT) stream.puts "#{chr} [bin #{bin}] #{s_start}:#{s_end}" stream.puts " offset #{offset}, length #{len}" if bgzf block = Bio::BGZF.vo_block_offset(offset) data = Bio::BGZF.vo_data_offset(offset) stream.puts " BGZF block offset #{block}, data offset #{data}" end stream.puts " text size: #{text_size}" stream.puts " sequences in block: #{n_seq}" stream.printf(" species vector: %016x\n", species_vec) end end end
# File lib/bio/maf/index.rb, line 850 def entries_for(block) begin unless block.ref_seq.source == @ref_seq raise "Inconsistent reference sequence: expected #{@ref_seq}, got #{block.ref_seq.source}" end h = {} val = build_block_value(block) to_index = ref_only ? [block.sequences.first] : block.sequences to_index.each do |seq| seq_id = seq_id_for(seq.source) # size 0 occurs in e.g. upstream1000.maf.gz next if seq.size == 0 seq_end = seq.start + seq.size bin = Bio::Ucsc::UcscBin.bin_from_range(seq.start, seq_end) key = [255, seq_id, bin, seq.start, seq_end].pack(KEY_FMT) h[key] = val end return h rescue Exception => e LOG.error "Failed to index block at offset #{block.offset}:\n#{block}" raise e end end
Build a fetch list of alignment blocks to read, given an array of Bio::GenomicInterval
objects
# File lib/bio/maf/index.rb, line 567 def fetch_list(intervals, filter_spec={}) filter_spec ||= {} filters = Filters.build(filter_spec, self) chrom = intervals.first.chrom chrom_id = index_sequences[chrom] unless chrom_id raise "chromosome #{chrom} not indexed!" end if intervals.find { |i| i.chrom != chrom } raise "all intervals must be for the same chromosome!" end # for each bin, build a list of the intervals to look for there bin_intervals = Hash.new { |h, k| h[k] = [] } intervals.each do |i| i.bin_all.each do |bin| bin_intervals[bin] << (i.zero_start...i.zero_end) end end bin_intervals.values.each do |intervals| intervals.sort_by! {|i| i.begin} end matches = if RUBY_PLATFORM == 'java' && bin_intervals.size > 4 scan_bins_parallel(chrom_id, bin_intervals, filters) else scan_bins(chrom_id, bin_intervals, filters) end matches.sort_by! { |e| e[0] } # sort by offset in file end
Find all alignment blocks in the genomic regions in the list of Bio::GenomicInterval
objects, and parse them with the given parser.
An optional Hash of filters may be passed in. The following keys are used:
* `:with_all_species => ["sp1", "sp2", ...]` Only match alignment blocks containing all given species. * `:at_least_n_sequences => n` Only match alignment blocks with at least N sequences. * `:min_size => n` Only match alignment blocks with text size at least N. * `:max_size => n` Only match alignment blocks with text size at most N.
@param [Enumerable<Bio::GenomicInterval>] intervals genomic
intervals to parse.
@param [Parser] parser MAF
parser for file to fetch blocks
from.
@param [Hash] filter Block
filter expression. @yield [block] each {Block} matched, in turn @return [Enumerable<Block>] each matching {Block}, if no block given @api public
# File lib/bio/maf/index.rb, line 435 def find(intervals, parser, filter={}, &blk) start = Time.now fl = fetch_list(intervals, filter) LOG.debug { sprintf("Built fetch list of %d items in %.3fs.", fl.size, Time.now - start) } if ! fl.empty? parser.fetch_blocks(fl, &blk) else if ! block_given? [] end end end
# File lib/bio/maf/index.rb, line 766 def index_blocks(blocks) h = @mutex.synchronize do if ! @seen_first # set the reference sequence from the first block first_block = blocks.first self.ref_seq = first_block.sequences.first.source db[REF_SEQ_KEY] = ref_seq @seen_first = true end blocks.map { |b| entries_for(b) }.reduce(:merge!) end db.set_bulk(h, false) end
# File lib/bio/maf/index.rb, line 780 def load_index_sequences h = {} db.match_prefix("sequence:").each do |key| _, name = key.split(':', 2) id = db[key].to_i h[name] = id end @index_sequences = h @max_sid = @index_sequences.values.max end
# File lib/bio/maf/index.rb, line 805 def load_species db.match_prefix("species:").each do |key| _, name = key.split(':', 2) id = db[key].to_i @species[name] = id end @species_max_id = @species.values.sort.last || -1 end
# File lib/bio/maf/index.rb, line 660 def make_scan_worker(jobs, completed) Thread.new do with_profiling do db.cursor_process do |cur| while true req = jobs.poll break unless req begin result = yield(cur, req) completed.put(result) rescue Exception => e completed.put(e) LOG.error "Worker failing: #{e.class}: #{e}" LOG.error e raise e end end end end end end
# File lib/bio/maf/index.rb, line 721 def overlaps?(gi, i_start, i_end) g_start = gi.begin (i_start <= g_start && g_start < i_end) \ || gi.include?(i_start) end
# File lib/bio/maf/index.rb, line 731 def prep(file_spec, compression, ref_only) db[FORMAT_VERSION_KEY] = FORMAT_VERSION db[FILE_KEY] = File.basename(file_spec) @maf_file = db[FILE_KEY] if compression db[COMPRESSION_KEY] = compression.to_s end @ref_only = ref_only @seen_first = false end
Reopen the same DB handle read-only. Only useful for unit tests.
# File lib/bio/maf/index.rb, line 503 def reopen KyotoIndex.new(@path, @db) end
# File lib/bio/maf/index.rb, line 682 def scan_bin(cur, chrom_id, bin, bin_intervals, filters) # bin_intervals is sorted by zero_start # compute the start and end of all intervals of interest spanning_start = bin_intervals.first.begin spanning_end = bin_intervals.map {|i| i.end}.max # scan from the start of the bin cur.jump(bin_start_prefix(chrom_id, bin)) matches = [] while pair = cur.get(true) c_chr, c_bin, c_start, c_end = pair[0].unpack(KEY_SCAN_FMT) if (c_chr != chrom_id) \ || (c_bin != bin) \ || c_start >= spanning_end # we've hit the next bin, or chromosome, or gone past # the spanning interval, so we're done with this bin break end if c_end >= spanning_start # possible overlap # any intervals that end before the start of the current # block are no longer relevant while bin_intervals.first.end < c_start bin_intervals.shift end bin_intervals.each do |i| i_start = i.begin break if i_start > c_end if ((c_start <= i_start && i_start < c_end) \ || i.include?(c_start)) \ && filters.match(pair) # match matches << extract_index_offset(pair) break end end end end matches end
Scan the index for blocks matching the given bins and intervals.
# File lib/bio/maf/index.rb, line 597 def scan_bins(chrom_id, bin_intervals, filters) to_fetch = [] db.cursor_process do |cur| bin_intervals.each do |bin, bin_intervals_raw| matches = scan_bin(cur, chrom_id, bin, bin_intervals_raw, filters) to_fetch.concat(matches) end end to_fetch end
# File lib/bio/maf/index.rb, line 622 def scan_bins_parallel(chrom_id, bin_intervals, filters) LOG.debug { sprintf("Beginning scan of %d bin intervals %s filters.", bin_intervals.size, filters.empty? ? "without" : "with") } start = Time.now n_threads = ENV['profile'] ? 1 : java.lang.Runtime.runtime.availableProcessors jobs = java.util.concurrent.ConcurrentLinkedQueue.new(bin_intervals.to_a) completed = java.util.concurrent.LinkedBlockingQueue.new(128) threads = [] n_threads.times do threads << make_scan_worker(jobs, completed) do |cur, req| bin, intervals = req scan_bin(cur, chrom_id, bin, intervals, filters) end end n_completed = 0 to_fetch = [] while (n_completed < bin_intervals.size) c = completed.poll(5, java.util.concurrent.TimeUnit::SECONDS) if c.nil? if threads.find { |t| t.alive? } next else raise "No threads alive, completed #{n_completed}/#{bin_intervals.size} jobs!" end end raise "worker failed: #{c}" if c.is_a? Exception to_fetch.concat(c) n_completed += 1 end threads.each { |t| t.join } LOG.debug { sprintf("Matched %d index records with %d threads in %.3f seconds.", to_fetch.size, n_threads, Time.now - start) } to_fetch end
# File lib/bio/maf/index.rb, line 791 def seq_id_for(name) sid = index_sequences[name] if ! sid @max_sid += 1 sid = @max_sid # "" << foo is hideous but apparently what it takes to get a # non-shared copy of a string on JRuby... name_copy = "" << name db.set("sequence:#{name_copy}", sid.to_s) index_sequences[name_copy] = sid end return sid end
# File lib/bio/maf/index.rb, line 455 def slice(interval, parser, filter={}) if block_given? find([interval], parser, filter) do |block| yield block.slice(interval) end else LOG.debug { "accumulating results of #slice" } enum_for(:slice, interval, parser, filter) end end
# File lib/bio/maf/index.rb, line 814 def species_id_for_seq(seq) # NB can have multiple dots # example: otoGar1.scaffold_104707.1-93001 parts = seq.split('.', 2) if parts.size == 2 # "" << foo is hideous but apparently what it takes to get a # non-shared copy of a string on JRuby... species_name = "" << parts[0] else # not in species.sequence format, apparently species_name = "" << seq end if species.has_key? species_name return species[species_name] else species_id = @species_max_id + 1 if species_id >= MAX_SPECIES raise "cannot index MAF file with more than #{MAX_SPECIES} species" end species[species_name] = species_id db["species:#{species_name}"] = species_id @species_max_id = species_id return species_id end end
# File lib/bio/maf/index.rb, line 498 def to_s "#<KyotoIndex path=#{path}>" end
# File lib/bio/maf/index.rb, line 608 def with_profiling if RUBY_PLATFORM == 'java' && ENV['profile'] rv = nil pdata = JRuby::Profiler.profile do rv = yield end printer = JRuby::Profiler::FlatProfilePrinter.new(pdata) printer.printProfile(STDERR) return rv else yield end end