class ScbiMapreduce::Manager

Attributes

checkpointing[RW]
chunk_size[RW]
exit_on_many_errors[RW]
keep_order[RW]
retry_stuck_jobs[RW]

Public Class Methods

new(server_ip, port, workers, work_manager_class,custom_worker_file,log_file=nil, init_env_file=nil) click to toggle source

initialize Manager

# File lib/scbi_mapreduce/manager.rb, line 23
def initialize(server_ip, port, workers, work_manager_class,custom_worker_file,log_file=nil, init_env_file=nil)
  @port=port

  if log_file.nil?
    log_file = File.join('logs','server_log.txt')
  end
  
  if ((log_file!=STDOUT) && (!File.exists?(File.dirname(log_file))))
    FileUtils.mkdir_p(File.dirname(log_file)) 
    $SERVER_LOG.info("Creating logs folder")
  end
  
  $SERVER_LOG = Logger.new(log_file)


  ip_list = Socket.ip_address_list.select{|e| e.ipv4?}.map{|e| e.ip_address}
  # La forma de abajo no encuentra la myrinet
  # ip_list = Socket::getaddrinfo(Socket.gethostname, "echo", Socket::AF_INET).map{|x| x[3]}.uniq
  ip_list << '127.0.0.1'

  $SERVER_LOG.info("Available IPs: #{ip_list}")

  ip=ip_list.select{|one_ip| one_ip.index(server_ip)==0}.first


  if !ip
    $SERVER_LOG.info("Ip #{server_ip} not found in available IPs: #{ip_list}")
    ip='0.0.0.0'
    # gets
  end

  @ip = ip

  port = 0


  @checkpointing=false
  @keep_order=false
  @retry_stuck_jobs=false
  @exit_on_many_errors=true
  
  @chunk_size=1


  @worker_names=[]
  if workers.is_a?(Integer)
    @workers=workers
  else # workers is a file with names, or an array
    
    # read file
    if workers.is_a?(String) && File.exists?(workers)
      $SERVER_LOG.info("Loading workers file: #{workers}")
      workers = File.read(workers).split("\n").map{|w| w.chomp}
    end
    
    # puts "find worker_names"
    host_name=`hostname`.chomp
    @workers=workers.count(host_name)

    @worker_names=workers
    @worker_names.delete(host_name)
    # puts @workers
  end

  @work_manager_class = work_manager_class
        
  @worker_launcher = WorkerLauncher.new(@ip,port,ip_list,@workers,custom_worker_file,log_file,init_env_file)

  $SERVER_LOG.info("Local workers: #{@workers}")
  $SERVER_LOG.info("Remote workers: #{@worker_names}")


  $SERVER_LOG.datetime_format = "%Y-%m-%d %H:%M:%S"

end

Public Instance Methods

save_stats(stats=nil, filename='scbi_mapreduce_stats.json') click to toggle source
# File lib/scbi_mapreduce/manager.rb, line 143
def save_stats(stats=nil, filename='scbi_mapreduce_stats.json')
  @work_manager_class.save_stats(stats,filename)
end
start_server() click to toggle source

Start a EventMachine loop acting as a server for incoming workers connections

# File lib/scbi_mapreduce/manager.rb, line 103
def start_server

  # set a custom error handler, otherwise errors are silently ignored when they occurs inside a callback.
  EM.error_handler{ |e|
    $SERVER_LOG.error(e.message + ' => ' + e.backtrace.join("\n"))
    @work_manager_class.global_error_received(e)
  }
  
  # $SERVER_LOG.info("Installing INT and TERM traps in #{@work_manager_class}")
  # Signal.trap("INT")  { puts "TRAP INT";@work_manager_class.controlled_exit; EM.stop}
  # Signal.trap("TERM") { puts "TRAP TERM";@work_manager_class.controlled_exit; EM.stop}

  # start EM loop
  EventMachine::run {

    @work_manager_class.init_work_manager_internals(@checkpointing, @keep_order, @retry_stuck_jobs,@exit_on_many_errors,@chunk_size)

    evm=EventMachine::start_server @ip, @port, @work_manager_class
    dir=Socket.unpack_sockaddr_in( EM.get_sockname( evm ))

    @port = dir[0].to_i
    @ip=dir[1].to_s

    $SERVER_LOG.info 'Server running at : ['+@ip.to_s+':'+@port.to_s+']'
    @worker_launcher.server_port=@port
    @worker_launcher.launch_workers
    @worker_launcher.launch_external_workers(@worker_names)

  }
rescue Exception => e
  $SERVER_LOG.error("Exiting server due to exception:\n" + e.message+"\n"+e.backtrace.join("\n"))
  @work_manager_class.end_work_manager
end
stats() click to toggle source
# File lib/scbi_mapreduce/manager.rb, line 139
def stats
  @work_manager_class.stats
end