class MapReduce::Mapper

The MapReduce::Mapper class runs the mapping part of your map-reduce job.

Attributes

partitions[R]

Public Class Methods

new(implementation, partitioner: HashPartitioner.new(32), memory_limit: 100 * 1024 * 1024) click to toggle source

Initializes a new mapper.

@param implementation Your map-reduce implementation, i.e. an object

which responds to #map and #reduce.

@param partitioner [#call] A partitioner, i.e. an object which responds

to #call and calculates a partition for the passed key.

@param memory_limit [#to_i] The memory limit, i.e. the buffer size in

bytes.

@example

MapReduce::Mapper.new(MyImplementation.new, partitioner: HashPartitioner.new(16), memory_limit: 100.megabytes)
Calls superclass method
# File lib/map_reduce/mapper.rb, line 23
def initialize(implementation, partitioner: HashPartitioner.new(32), memory_limit: 100 * 1024 * 1024)
  super()

  @implementation = implementation
  @partitioner = partitioner
  @memory_limit = memory_limit.to_i

  @buffer_size = 0
  @buffer = []
  @chunks = []
end

Public Instance Methods

map(*args, **kwargs) click to toggle source

Passes the received key to your map-reduce implementation and adds yielded key-value pair to a buffer. When the memory limit is reached, the chunk is sorted and written to a tempfile.

@param key The key to pass to the map-reduce implementation.

@example

mapper.map("some_key")
mapper.map("other_key")
# File lib/map_reduce/mapper.rb, line 45
def map(*args, **kwargs)
  @implementation.map(*args, **kwargs) do |new_key, new_value|
    synchronize do
      @buffer.push([new_key, new_value])

      @buffer_size += JSON.generate([new_key, new_value]).bytesize

      write_chunk if @buffer_size >= @memory_limit
    end
  end
end
shuffle(&block) click to toggle source

Performs a k-way-merge of the sorted chunks written to tempfiles while already reducing the result using your map-reduce implementation and splitting the dataset into partitions. Finally yields each partition with the tempfile containing the data of the partition.

@example

mapper.shuffle do |partition, tempfile|
  # store data e.g. on s3
end
# File lib/map_reduce/mapper.rb, line 67
def shuffle(&block)
  return enum_for(:shuffle) unless block_given?

  write_chunk if @buffer_size > 0

  partitions = {}

  reduce_chunk(k_way_merge(@chunks), @implementation).each do |pair|
    partition = @partitioner.call(pair[0])

    (partitions[partition] ||= Tempfile.new).puts(JSON.generate(pair))
  end

  @chunks.each { |tempfile| tempfile.close(true) }
  @chunks = []

  partitions.each_value(&:rewind)

  partitions.each do |partition, tempfile|
    block.call(partition, tempfile)
  end

  partitions.each_value { |tempfile| tempfile.close(true) }

  nil
end

Private Instance Methods

write_chunk() click to toggle source
# File lib/map_reduce/mapper.rb, line 96
def write_chunk
  tempfile = Tempfile.new

  @buffer.sort_by! { |item| JSON.generate(item.first) }

  reduce_chunk(@buffer, @implementation).each do |pair|
    tempfile.puts JSON.generate(pair)
  end

  tempfile.rewind

  @chunks.push(tempfile)

  @buffer_size = 0
  @buffer = []
end