class ScbiMapreduce::WorkerLauncher
Attributes
server_ip[RW]
server_port[RW]
Public Class Methods
new(server_ip,server_port, server_ip_list,workers, worker_file, log_file=nil, init_env_file=nil)
click to toggle source
# File lib/scbi_mapreduce/worker_launcher.rb, line 15 def initialize(server_ip,server_port, server_ip_list,workers, worker_file, log_file=nil, init_env_file=nil) @server_ip = server_ip @server_port = server_port @worker_file = worker_file @workers=workers @init_env_file=init_env_file @server_ip_list=server_ip_list if log_file.nil? log_file = "logs/launcher_global_log.txt" end FileUtils.mkdir_p(File.dirname(log_file)) if ((log_file!=STDOUT) && (!File.exists?(File.dirname(log_file)))) $LAUNCHER_LOG = Logger.new(log_file) $LAUNCHER_LOG.datetime_format = "%Y-%m-%d %H:%M:%S" end
Public Instance Methods
find_common_ip(machine_ip,ip_list)
click to toggle source
# File lib/scbi_mapreduce/worker_launcher.rb, line 97 def find_common_ip(machine_ip,ip_list) def left_largest_common_substr(s1,s2) res='' s2.scan(/./).each_with_index do |l1,i| if s1[i]==l1 res << l1 else break end end res end def remove_final_dot(s) res=s # remove final dot if res[res.length-1]=='.' res=res[0,res.length-1] end return res end res='' common_ip='' ip_list.each do |ip| res=left_largest_common_substr(ip,machine_ip) res=remove_final_dot(res) if res.length>common_ip.length common_ip=res end end return common_ip end
launch_external_workers(workers)
click to toggle source
# File lib/scbi_mapreduce/worker_launcher.rb, line 141 def launch_external_workers(workers) #skip if slurm detected if system("which srun > /dev/null 2>&1") return end puts "Launching #{workers.count} external workers: #{workers}" puts "INIT_ENV_FILE: #{@init_env_file}" # This sleep is necessary to leave time to lustre fylesystems to sync the log folder between all nodes. If not, external workers will not be launched. if !workers.empty? puts "SLEEP 10 for logs folder sync in lustre fs" sleep 10 end init='' if @init_env_file init_path = File.expand_path(@init_env_file) # path = File.join($ROOT_PATH) # puts "init_env file: #{path}" if File.exists?(init_path) puts "File #{init_path} exists, using it" init=". #{init_path}; " end end init_dir=Dir.pwd cd ='' if File.exists?(init_dir) cd = "cd #{init_dir}; " end workers.each do |machine| log_file=File.join(init_dir,'logs',"launcher_#{@@worker_id}") log_dir=File.join(init_dir,'logs') # if server_ip is not in valid ips if !@server_ip_list.include?(@server_ip) # find matching ip between server and worker machine_ip = Resolv.getaddress(machine) matching_ip=find_common_ip(machine_ip,@server_ip_list) found_ip=@server_ip_list.select{|one_ip| one_ip.index(matching_ip)==0}.first else found_ip=@server_ip end if !found_ip.nil? # cmd = "ssh #{machine} \"#{init} #{cd} #{INTERPRETER} #{File.join(File.dirname(__FILE__),'main_worker.rb')} #{worker_id.to_s} #{@server_ip} #{@server_port} #{@worker_file}\"" # cmd = "ssh #{machine} \"nohup #{File.join(File.dirname(__FILE__),'launcher.sh')} #{worker_id.to_s} #{@server_ip} #{@server_port} #{@worker_file} #{init_dir} #{init_path} </dev/null >> #{log_file} 2>> #{log_file} & \"" cmd = "ssh #{machine} \"nohup #{File.join(File.dirname(__FILE__),'launcher.sh')} #{@@worker_id.to_s} #{found_ip} #{@server_port} #{@worker_file} #{init_dir} #{init_path} </dev/null >> #{log_file} 2>> #{log_file} & \"" $LAUNCHER_LOG.info cmd pid=fork{ exec(cmd) } @@worker_id+=1 else $LAUNCHER_LOG.error("Couldn't find a matching ip between worker (#{machine_ip}) and server #{ip_list.to_json}") end end end
launch_worker(worker_id, server_ip, server_port)
click to toggle source
override this
# File lib/scbi_mapreduce/worker_launcher.rb, line 90 def launch_worker(worker_id, server_ip, server_port) cmd = "#{INTERPRETER} #{File.join(File.dirname(__FILE__),'main_worker.rb')} #{worker_id.to_s} #{server_ip} #{server_port} #{@worker_file}" puts cmd exec(cmd) end
launch_workers()
click to toggle source
# File lib/scbi_mapreduce/worker_launcher.rb, line 42 def launch_workers if system("which srun > /dev/null 2>&1") && (!ENV['SLURM_PROCID'].to_s.empty?) $LAUNCHER_LOG.info "SLURM DETECTED" $LAUNCHER_LOG.info "Launching #{@workers} workers via srun" launch_workers_srun else $LAUNCHER_LOG.info "Launching #{@workers} workers via SSH" launch_workers_ssh end end
launch_workers_and_wait()
click to toggle source
# File lib/scbi_mapreduce/worker_launcher.rb, line 37 def launch_workers_and_wait launch_workers Process.waitall end
launch_workers_srun()
click to toggle source
# File lib/scbi_mapreduce/worker_launcher.rb, line 55 def launch_workers_srun # TODO - si aqui falla algo, no peta, se bloquea $LAUNCHER_LOG.info "Launching #{@workers} srun workers" pid=fork{ $LAUNCHER_LOG.info "Connecting #{@workers} srun workers to #{@server_ip}:#{@server_port}" cmd = "srun #{File.join(File.dirname(__FILE__),'main_worker.rb')} auto #{server_ip} #{server_port} #{@worker_file}" $LAUNCHER_LOG.info cmd exec(cmd) } $LAUNCHER_LOG.info "All workers launched" end
launch_workers_ssh()
click to toggle source
# File lib/scbi_mapreduce/worker_launcher.rb, line 70 def launch_workers_ssh # TODO - si aqui falla algo, no peta, se bloquea $LAUNCHER_LOG.info "Launching #{@workers} local workers" if @workers > 0 $LAUNCHER_LOG.info "Connecting #{@workers} local workers to #{@server_ip}:#{@server_port}" threads = [] @workers.times do |i| pid=fork{ launch_worker(@@worker_id,@server_ip,@server_port) $LAUNCHER_LOG.info "Worker #{i} launched [#{@server_ip}:#{@server_port}]" } @@worker_id+=1 #threads.each { |aThread| aThread.join } end #Process.waitall $LAUNCHER_LOG.info "All workers launched" end end
left_largest_common_substr(s1,s2)
click to toggle source
# File lib/scbi_mapreduce/worker_launcher.rb, line 99 def left_largest_common_substr(s1,s2) res='' s2.scan(/./).each_with_index do |l1,i| if s1[i]==l1 res << l1 else break end end res end
remove_final_dot(s)
click to toggle source
# File lib/scbi_mapreduce/worker_launcher.rb, line 113 def remove_final_dot(s) res=s # remove final dot if res[res.length-1]=='.' res=res[0,res.length-1] end return res end