class ScbiMapreduce::WorkManager

Public Instance Methods

send_next_work() click to toggle source
# File lib/full_lengther_next/my_worker_manager_fln.rb, line 13
def send_next_work
          sleep = false
      # if we need to exit, send quit to workers
      if @@want_to_exit
        send_object(:quit)
        
      elsif !send_stuck_work
        
      #send stuck work
        objs=[]
        
        t=Time.now_us
        
        begin
        # prepare new data
        @@chunk_size.times do
          obj=next_work
          if obj.nil?
            break
          elsif obj == :sleep
              #send_object(:sleep)
              sleep = true
              break      
          else
            # add to obj array
            objs << obj
          end
        end
        rescue Exception => e
          $SERVER_LOG.error("Exception creating next_work. Worker, quit!")
          $SERVER_LOG.error("#{e.message}\n#{e.backtrace.join("\n")}")
          send_object(:sleep)
          self.class.global_error_received(e)

          #raise e
        end
        
        @@total_read_time+=(Time.now_us - t)
        
        # if new was data collected, send it
        if objs.count>0
          @@count += objs.count
          @@chunk_count += 1

          work_data=WorkManagerData.new(objs)
          send_object(work_data)
          @@sent_chunks+=1

          # to keep order or retry failed job, we need job status
          if @@keep_order || @@retry_stuck_jobs
            # do not remove data to be able to sent it again
            # work_data.data=nil
            @@running_jobs.push work_data
            # print_running_jobs
          end
        else
          # otherwise,
          if @@running_jobs.count >0 || sleep
            $SERVER_LOG.info("Worker, go to sleep")
            send_object(:sleep)
            
          else
            # send a quit value indicating no more data available
            send_object(:quit)
          end
        end
      end
end