class ScbiMapreduce::WorkManager
Public Instance Methods
send_next_work()
click to toggle source
# File lib/full_lengther_next/my_worker_manager_fln.rb, line 13 def send_next_work sleep = false # if we need to exit, send quit to workers if @@want_to_exit send_object(:quit) elsif !send_stuck_work #send stuck work objs=[] t=Time.now_us begin # prepare new data @@chunk_size.times do obj=next_work if obj.nil? break elsif obj == :sleep #send_object(:sleep) sleep = true break else # add to obj array objs << obj end end rescue Exception => e $SERVER_LOG.error("Exception creating next_work. Worker, quit!") $SERVER_LOG.error("#{e.message}\n#{e.backtrace.join("\n")}") send_object(:sleep) self.class.global_error_received(e) #raise e end @@total_read_time+=(Time.now_us - t) # if new was data collected, send it if objs.count>0 @@count += objs.count @@chunk_count += 1 work_data=WorkManagerData.new(objs) send_object(work_data) @@sent_chunks+=1 # to keep order or retry failed job, we need job status if @@keep_order || @@retry_stuck_jobs # do not remove data to be able to sent it again # work_data.data=nil @@running_jobs.push work_data # print_running_jobs end else # otherwise, if @@running_jobs.count >0 || sleep $SERVER_LOG.info("Worker, go to sleep") send_object(:sleep) else # send a quit value indicating no more data available send_object(:quit) end end end end