class OfflineSort::Sorter

Constants

DEFAULT_CHUNK_IO_CLASS
DEFAULT_CHUNK_SIZE

Attributes

chunk_input_output_class[R]
chunk_size[R]
enumerable[R]
sort_by[R]

Public Class Methods

new(enumerable, chunk_input_output_class: DEFAULT_CHUNK_IO_CLASS, chunk_size: DEFAULT_CHUNK_SIZE, &sort_by) click to toggle source
# File lib/offline_sort/offline_sort.rb, line 17
def initialize(enumerable,
               chunk_input_output_class: DEFAULT_CHUNK_IO_CLASS,
               chunk_size: DEFAULT_CHUNK_SIZE,
               &sort_by)
  @enumerable = enumerable
  @chunk_input_output_class = chunk_input_output_class
  @chunk_size = chunk_size
  @sort_by = sort_by
end

Public Instance Methods

sort() click to toggle source
# File lib/offline_sort/offline_sort.rb, line 27
def sort
  merge(split)
end

Private Instance Methods

merge(sorted_chunk_ios) click to toggle source

TODO: optimization for when there is less than a single full chunk of data

# File lib/offline_sort/offline_sort.rb, line 34
def merge(sorted_chunk_ios)
  pq = []
  chunk_enumerators = sorted_chunk_ios.map(&:each)

  chunk_enumerators.each_with_index do |chunk, index|
    entry = chunk.next
    pq.push(ChunkEntry.new(index, entry))
  end

  entry_sort_by = Proc.new { |entry| sort_by.call(entry.data) }
  pq = FixedSizeMinHeap.new(pq, &entry_sort_by)

  Enumerator.new do |yielder|
    while (item = pq.pop)
      yielder.yield(item.data)

      begin
        entry = chunk_enumerators[item.chunk_number].next
        pq.push(ChunkEntry.new(item.chunk_number, entry))
      rescue StopIteration
        sorted_chunk_ios[item.chunk_number].close
      end
    end
  end
end
split() click to toggle source
# File lib/offline_sort/offline_sort.rb, line 60
def split
  sorted_chunks = []
  chunk_entries = []

  enumerable.each do |entry|
    chunk_entries << entry

    if chunk_entries.size == chunk_size
      sorted_chunks << write_sorted_chunk(chunk_entries)
      chunk_entries.clear
    end
  end

  sorted_chunks << write_sorted_chunk(chunk_entries) unless chunk_entries.empty?

  sorted_chunks
end
write_sorted_chunk(entries) click to toggle source
# File lib/offline_sort/offline_sort.rb, line 78
def write_sorted_chunk(entries)
  file = Tempfile.open('sort-chunk-')
  file.binmode

  chunk_io = chunk_input_output_class.new(file)
  entries.sort_by(&sort_by).each { |entry| chunk_io.write_entry(entry) }

  chunk_io.rewind
  chunk_io
end