class MapReduce::Mapper

Public Class Methods

new(opts = {}) click to toggle source
# File lib/map_reduce/mapper.rb, line 3
def initialize(opts = {})
  @masters         = opts[:masters] || [::MapReduce::DEFAULT_SOCKET]
  @connection_type = opts[:type]    || :em
  @task_name       = opts[:task]
  @disconnected    = {}
end

Public Instance Methods

emit(key, value, &blk) click to toggle source
# File lib/map_reduce/mapper.rb, line 10
def emit(key, value, &blk)
  raise MapReduce::Exceptions::BlankKey, "Key can't be nil"  if key.nil?

  sock = pick_master(key)
  sock.send_request(["map", key, value, @task_name]) do |res|
    if res
      @disconnected.delete(sock)  if @disconnected[sock]
      if blk
        blk.call(res)
      else 
        return res
      end
    else
      @disconnected[sock] = true
      emit(key, value, &blk)
    end
  end
end
Also aliased as: map
map(key, value, &blk)
Alias for: emit
wait_for_all(&blk) click to toggle source
# File lib/map_reduce/mapper.rb, line 30
def wait_for_all(&blk)
  finished = Hash[socket.map{ |s| [s, false] }]
  sockets.each do |sock|
    sock.send_request(["map_finished", @task_name]) do |message|
      finished[sock] = message[0] == "ok"
      if finished.all?{ |k,v| v }
        if block_given?
          blk.call
        else
          return
        end
      else
        after(1) do
          wait_for_all(&blk)
        end
      end
    end
  end
end

Private Instance Methods

after(sec) { || ... } click to toggle source
# File lib/map_reduce/mapper.rb, line 52
def after(sec)
  klass = if @connection_type == :sync
    EM::Synchrony
  else
    EM
  end

  klass.add_timer(sec) do
    yield
  end
end
pick_master(key) click to toggle source
# File lib/map_reduce/mapper.rb, line 64
def pick_master(key)
  num = Digest::MD5.hexdigest(key.to_s).to_i(16) % sockets.size
  sock = sockets[num]
  # LOL :)
  if @disconnected[sock] && rand(10) != 0
    pick_master(key.to_s.chars.to_a.shuffle.join)
  else
    sock
  end
end
sockets() click to toggle source
# File lib/map_reduce/mapper.rb, line 75
def sockets
  @sockets ||= begin
    klass = if @connection_type == :sync
      EM::Protocols::Zmq2::ReqFiber
    else
      EM::Protocols::Zmq2::ReqCb
    end

    @masters.map do |sock|
      s = klass.new
      s.connect(sock)
      s
    end
  end
end