class ScbiMapreduce::WorkManager
require 'json'
Public Class Methods
checkpoint()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 292 def self.checkpoint return @@checkpoint end
controlled_exit()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 504 def self.controlled_exit $SERVER_LOG.info("Controlled exit. Workers will be noticed in next round") @@want_to_exit=true end
end_work_manager()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 147 def self.end_work_manager end
get_checkpoint()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 322 def self.get_checkpoint res = 0 begin if File.exists?(CHECKPOINT_FILE) res=File.read(CHECKPOINT_FILE).chomp # puts "read checkpoint #{res}" res = res.to_i end rescue res = 0 end return res end
global_error_received(error_exception)
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 155 def self.global_error_received(error_exception) end
init_work_manager()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 143 def self.init_work_manager end
init_work_manager_internals(checkpointing, keep_order, retry_stuck_jobs,exit_on_many_errors,chunk_size)
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 214 def self.init_work_manager_internals(checkpointing, keep_order, retry_stuck_jobs,exit_on_many_errors,chunk_size) @@stats={} @@count = 0 @@retried_jobs=0 @@sent_chunks=0 @@received_objects=0 @@want_to_exit=false @@chunk_count = 0 @@workers = 0 @@max_workers = 0 @@error_count = 0 @@running_jobs=[] # @@compress=true @@checkpointing=checkpointing @@keep_order=keep_order @@retry_stuck_jobs=retry_stuck_jobs @@exit_on_many_errors=exit_on_many_errors # TODO - Implement a dynamic chunk_size @@chunk_size=chunk_size $SERVER_LOG.info "Processing in chunks of #{@@chunk_size} objects" $SERVER_LOG.info "Checkpointing: #{@@checkpointing}" $SERVER_LOG.info "Keeping output order: #{@@keep_order}" $SERVER_LOG.info "Retrying stuck jobs: #{@@retry_stuck_jobs}" $SERVER_LOG.info "Exiting on too many errors: #{@@exit_on_many_errors}" @@checkpoint=0 if @@checkpointing @@checkpoint=self.get_checkpoint $SERVER_LOG.info "Detected checkpoint at #{@@checkpoint}" end # for statistics: @@total_seconds=0 @@total_manager_time=0 # mean_worker_time=0 @@each_worker_time={} @@each_transmission_time={} @@total_read_time=0 @@total_write_time=0 # mean_transmission_time=0 end
new(*args)
click to toggle source
Calls superclass method
# File lib/scbi_mapreduce/work_manager.rb, line 638 def initialize(*args) super #puts "WORK MANAGER INITIALIZE NEWWWWWWWWWW, ONE per worker" end
save_stats(stats=nil, filename='scbi_mapreduce_stats.json')
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 202 def self.save_stats(stats=nil, filename='scbi_mapreduce_stats.json') f=File.open(filename,'w') if stats.nil? f.puts JSON::pretty_generate @@stats else f.puts JSON::pretty_generate stats end f.close end
stats()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 198 def self.stats @@stats end
work_manager_finished()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 151 def self.work_manager_finished end
Public Instance Methods
checkpointable_job_received(obj)
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 567 def checkpointable_job_received(obj) # find reveived object between sent jobs received_job=@@running_jobs.find{|o| o.job_identifier==obj.job_identifier} # save job if there is was a valid work previously sent if received_job # change this job's status to received, already done in previous method received_job.update_with_received!(obj) # # if there are sufficient jobs, count pending ones # if (@@running_jobs.count>=PENDING_TO_SAVE) # count received objects pending to be written, only until one that is still running is found pending_to_save=0 @@running_jobs.each do |job| if job.status==:received pending_to_save += 1 else break end end # if there are a few pending to save works, or all remaining works are pending, then save if (pending_to_save>=PENDING_TO_SAVE) || (pending_to_save==@@running_jobs.count) # save pending jobs and write to disk to_remove = 0 if @@checkpointing remove_checkpoint end @@running_jobs.each do |job| if job.status==:received # puts "Sent to save: #{job.inspect}" t=Time.now_us work_received(job.data) @@received_objects+=job.data.count @@total_write_time+=(Time.now_us - t) job.status=:saved to_remove += 1 else break end end # if some objects were saved, remove them from the running_jobs if to_remove > 0 to_remove.times do |i| o=@@running_jobs.shift # puts "Job removed #{o.inspect}" o=nil end # print_running_jobs if @@checkpointing && !@@want_to_exit save_checkpoint end end end # end else $SERVER_LOG.warn "Job already processed #{obj.inspect}" end end
each_transmission_time(worker,time)
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 284 def each_transmission_time(worker,time) if @@each_transmission_time[worker].nil? then @@each_transmission_time[worker]=0 end @@each_transmission_time[worker]+=time end
each_worker_time(worker,time)
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 277 def each_worker_time(worker,time) if @@each_worker_time[worker].nil? then @@each_worker_time[worker]=0 end @@each_worker_time[worker]+=time end
error_received(worker_error, obj)
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 171 def error_received(worker_error, obj) end
goto_checkpoint()
click to toggle source
loads a checkpoint
# File lib/scbi_mapreduce/work_manager.rb, line 448 def goto_checkpoint if @@checkpoint>0 $SERVER_LOG.info "Skipping until checkpoint #{@@checkpoint}" checkpoint=load_user_checkpoint(@@checkpoint) # do an automatic checkpoint restore if checkpoint==-1 (@@checkpoint - 1).times do |i| $SERVER_LOG.info "Automatic trashing Chunk #{i+1}" # get next work @@chunk_size.times do obj=next_work end # trash_checkpointed_work end $SERVER_LOG.info "Automatic checkpoint finished" WorkManagerData.job_id=@@checkpoint #user has done the checkpoint restoration elsif checkpoint>0 WorkManagerData.job_id=checkpoint elsif checkpoint==0 $SERVER_LOG.info "Automatic checkpoint not done" end @@checkpoint=0 end end
load_user_checkpoint(checkpoint)
click to toggle source
if this function returns -1, then automatic checkpointing is done. Return 0 to no checkpointing. Return the restored checkpoint number to start in this point.
# File lib/scbi_mapreduce/work_manager.rb, line 186 def load_user_checkpoint(checkpoint) return -1 end
mean_time(h)
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 262 def mean_time(h) r=0 i=0 h.each do |k,v| r+=h[k] i+=1 end if r>0 r=r/i.to_f end return r end
next_work()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 159 def next_work end
post_init()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 485 def post_init @@workers += 1 @@max_workers +=1 # when first worker is connected, do special config if @@workers == 1 @@total_seconds = Time.now_us $SERVER_LOG.info "First worker connected" if @@checkpointing $SERVER_LOG.info "Checking for checkpoint" goto_checkpoint end end $SERVER_LOG.info "#{@@workers} workers connected" send_initial_config send_next_work end
print_running_jobs()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 350 def print_running_jobs jobs=@@running_jobs.map{|j| j.inspect}.join("\n") $SERVER_LOG.debug("Running Jobs:\n#{jobs}") end
read_until_checkpoint(checkpoint)
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 179 def read_until_checkpoint(checkpoint) end
receive_object(obj)
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 510 def receive_object(obj) # check if response is an error if obj.is_a?(Exception) $SERVER_LOG.error("Error in worker #{obj.worker_id} while processing object #{obj.object.inspect}\n" + obj.original_exception.message + ":\n" + obj.original_exception.backtrace.join("\n")) @@error_count += 1 error_received(obj,obj.object.data) # if there are too many errors if (@@count>100) && (@@error_count >= @@count*0.8) # notice programmer res=too_many_errors_received # force exit if too_many_errors_received returns true if @@exit_on_many_errors || res $SERVER_LOG.error("Want to exit due to too many errors") self.controlled_exit end end elsif obj == :waking_up $SERVER_LOG.info("Worker woke up") else # if not using checkpointing obj.received!(obj.data) if @@checkpointing || @@keep_order || @@retry_stuck_jobs # print_running_jobs checkpointable_job_received(obj) else # change this job's status to received t=Time.now_us work_received(obj.data) @@received_objects+=obj.data.count @@total_write_time+=(Time.now_us - t) end # puts obj.worker_identifier,obj.worker_identifier.class # if obj.worker_identifier==0 then # end each_worker_time(obj.worker_identifier, obj.worker_time) each_transmission_time(obj.worker_identifier, obj.transmission_time) end # free mem obj=nil send_next_work end
remove_checkpoint()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 296 def remove_checkpoint if File.exists?(CHECKPOINT_FILE) checkpoint_file = FileUtils.mv(CHECKPOINT_FILE,OLD_CHECKPOINT_FILE) end end
save_checkpoint()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 303 def save_checkpoint checkpoint_file = File.open(CHECKPOINT_FILE,'w') if !@@running_jobs.empty? checkpoint_value = @@running_jobs.first.job_identifier else checkpoint_value = WorkManagerData.job_id end $SERVER_LOG.info "Saving checkpoint: #{checkpoint_value}" checkpoint_file.puts checkpoint_value checkpoint_file.close save_user_checkpoint end
save_user_checkpoint()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 190 def save_user_checkpoint end
send_initial_config()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 338 def send_initial_config config = worker_initial_config if config.nil? obj = :no_initial_config else obj = {:initial_config => config} end send_object(obj) end
send_next_work()
click to toggle source
send next work to worker
# File lib/scbi_mapreduce/work_manager.rb, line 382 def send_next_work # if we need to exit, send quit to workers if @@want_to_exit send_object(:quit) elsif !send_stuck_work #send stuck work objs=[] t=Time.now_us begin # prepare new data @@chunk_size.times do obj=next_work if obj.nil? break else # add to obj array objs << obj end end rescue Exception => e $SERVER_LOG.error("Exception creating next_work. Worker, quit!") send_object(:sleep) self.class.global_error_received(e) #raise e end @@total_read_time+=(Time.now_us - t) # if new was data collected, send it if objs.count>0 @@count += objs.count @@chunk_count += 1 work_data=WorkManagerData.new(objs) send_object(work_data) @@sent_chunks+=1 # to keep order or retry failed job, we need job status if @@keep_order || @@retry_stuck_jobs # do not remove data to be able to sent it again # work_data.data=nil @@running_jobs.push work_data # print_running_jobs end else # otherwise, if @@running_jobs.count >0 $SERVER_LOG.info("Worker, go to sleep") send_object(:sleep) else # send a quit value indicating no more data available send_object(:quit) end end end end
send_stuck_work()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 355 def send_stuck_work sent=false if @@retry_stuck_jobs # $SERVER_LOG.debug("="*40) # print_running_jobs # count stuck jobs and re-sent the first one stuck_works=@@running_jobs.select{|job| job.stuck?} if !stuck_works.empty? jobs=stuck_works.map{|j| j.inspect}.join("\n") $SERVER_LOG.info("Stuck Jobs:\n#{jobs}") # send_object stuck_works.first.sent! send_object(stuck_works.first) @@sent_chunks+=1 @@retried_jobs+=1 $SERVER_LOG.info("Sending stuck work #{stuck_works.first.inspect}") sent=true end end return sent end
stop_work_manager()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 660 def stop_work_manager EM.stop $SERVER_LOG.info "Exiting server" self.class.end_work_manager @@total_seconds = (Time.now_us-@@total_seconds) @@total_manager_time= @@total_manager_time @@total_read_time=@@total_read_time @@total_write_time=@@total_write_time mean_worker_time=mean_time(@@each_worker_time) mean_transmission_time=mean_time(@@each_transmission_time) idle_time=(@@total_seconds - @@total_read_time -@@total_write_time - mean_transmission_time) @@stats={} @@stats[:total_objects]=@@count @@stats[:total_seconds]=@@total_seconds @@stats[:sent_chunks]=@@sent_chunks @@stats[:received_objects]=@@received_objects @@stats[:processing_rate]=(@@count/@@total_seconds.to_f) @@stats[:total_read_time]=@@total_read_time @@stats[:total_write_time]=@@total_write_time @@stats[:mean_worker_time]=mean_worker_time @@stats[:mean_transmission_time]=mean_transmission_time @@stats[:total_manager_idle_time]=idle_time @@stats[:error_count]=@@error_count @@stats[:retried_jobs]=@@retried_jobs @@stats[:chunk_size]=@@chunk_size @@stats[:connected_workers]=@@max_workers @@stats[:each_transmission_time]=@@each_transmission_time @@stats[:each_worker_time]=@@each_worker_time $SERVER_LOG.info "Total processed: #{@@count} objects in #{@@total_seconds} seconds" $SERVER_LOG.info "Total sent chunks: #{@@sent_chunks} objects" $SERVER_LOG.info "Total sent objects: #{@@count} objects" $SERVER_LOG.info "Total received objects: #{@@received_objects} objects" $SERVER_LOG.info "Processing rate: #{"%.2f" % (@@count/@@total_seconds.to_f)} objects per second" $SERVER_LOG.info "Connection rate: #{"%.2f" % (@@chunk_count/@@total_seconds.to_f)} connections per second" $SERVER_LOG.info "Total read time #{@@total_read_time} seconds" $SERVER_LOG.info "Total write time #{@@total_write_time} seconds" # mean_worker_time=mean_worker_time/@@max_workers $SERVER_LOG.info "Total worker time #{mean_worker_time} seconds" $SERVER_LOG.info "Total transmission time #{mean_transmission_time} seconds" $SERVER_LOG.info "Total manager_idle time #{idle_time} seconds" # $SERVER_LOG.info "Total manager time #{@@total_read_time + @@total_write_time + mean_transmission_time} seconds" $SERVER_LOG.info "Number of errors: #{@@error_count}" $SERVER_LOG.info "Number of retried stuck jobs: #{@@retried_jobs}" $SERVER_LOG.info "Chunk size: #{@@chunk_size}" $SERVER_LOG.info "Total connected workers: #{@@max_workers}" self.class.work_manager_finished end
too_many_errors_received()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 175 def too_many_errors_received end
trash_checkpointed_work()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 193 def trash_checkpointed_work end
unbind()
click to toggle source
A worker has disconected
# File lib/scbi_mapreduce/work_manager.rb, line 644 def unbind @@workers -= 1 #puts @@running_jobs.to_json $SERVER_LOG.info "Worker disconnected. #{@@workers} kept running" # no more workers left, shutdown EM and stop server if @@workers == 0 $SERVER_LOG.info "All workers finished" stop_work_manager end end
work_received(obj)
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 163 def work_received(obj) end
worker_initial_config()
click to toggle source
# File lib/scbi_mapreduce/work_manager.rb, line 167 def worker_initial_config end