class OfflineSort::Sorter
Constants
- DEFAULT_CHUNK_IO_CLASS
- DEFAULT_CHUNK_SIZE
Attributes
chunk_input_output_class[R]
chunk_size[R]
enumerable[R]
sort_by[R]
Public Class Methods
new(enumerable, chunk_input_output_class: DEFAULT_CHUNK_IO_CLASS, chunk_size: DEFAULT_CHUNK_SIZE, &sort_by)
click to toggle source
# File lib/offline_sort/offline_sort.rb, line 17 def initialize(enumerable, chunk_input_output_class: DEFAULT_CHUNK_IO_CLASS, chunk_size: DEFAULT_CHUNK_SIZE, &sort_by) @enumerable = enumerable @chunk_input_output_class = chunk_input_output_class @chunk_size = chunk_size @sort_by = sort_by end
Public Instance Methods
sort()
click to toggle source
# File lib/offline_sort/offline_sort.rb, line 27 def sort merge(split) end
Private Instance Methods
merge(sorted_chunk_ios)
click to toggle source
TODO: optimization for when there is less than a single full chunk of data
# File lib/offline_sort/offline_sort.rb, line 34 def merge(sorted_chunk_ios) pq = [] chunk_enumerators = sorted_chunk_ios.map(&:each) chunk_enumerators.each_with_index do |chunk, index| entry = chunk.next pq.push(ChunkEntry.new(index, entry)) end entry_sort_by = Proc.new { |entry| sort_by.call(entry.data) } pq = FixedSizeMinHeap.new(pq, &entry_sort_by) Enumerator.new do |yielder| while (item = pq.pop) yielder.yield(item.data) begin entry = chunk_enumerators[item.chunk_number].next pq.push(ChunkEntry.new(item.chunk_number, entry)) rescue StopIteration sorted_chunk_ios[item.chunk_number].close end end end end
split()
click to toggle source
# File lib/offline_sort/offline_sort.rb, line 60 def split sorted_chunks = [] chunk_entries = [] enumerable.each do |entry| chunk_entries << entry if chunk_entries.size == chunk_size sorted_chunks << write_sorted_chunk(chunk_entries) chunk_entries.clear end end sorted_chunks << write_sorted_chunk(chunk_entries) unless chunk_entries.empty? sorted_chunks end
write_sorted_chunk(entries)
click to toggle source
# File lib/offline_sort/offline_sort.rb, line 78 def write_sorted_chunk(entries) file = Tempfile.open('sort-chunk-') file.binmode chunk_io = chunk_input_output_class.new(file) entries.sort_by(&sort_by).each { |entry| chunk_io.write_entry(entry) } chunk_io.rewind chunk_io end