class ScbiMapreduce::Worker
Public Class Methods
controlled_exit_worker()
click to toggle source
# File lib/scbi_mapreduce/worker.rb, line 145 def self.controlled_exit_worker @@want_to_exit_worker=true end
new(*args)
click to toggle source
Calls superclass method
# File lib/scbi_mapreduce/worker.rb, line 41 def initialize(*args) super end
start_worker(worker_id,ip,port,log_file=nil)
click to toggle source
# File lib/scbi_mapreduce/worker.rb, line 149 def self.start_worker(worker_id,ip,port,log_file=nil) #puts "NEW WORKER - INIIIIIIIIIIIIIIIIIIIIT #{self}" ip = ip port = port @@count = -1 @@worker_id=worker_id # Signal.trap("INT") { puts "TRAP INT in worker #{@@worker_id}"; controlled_exit_worker; EM.stop} # Signal.trap("TERM") { puts "TRAP TERM in worker #{@@worker_id}";controlled_exit_worker; EM.stop} if log_file.nil? log_file = 'logs/worker'+worker_id+'_'+`hostname`.chomp+'_log.txt' end FileUtils.mkdir_p(File.dirname(log_file)) if ((log_file!=STDOUT) && (!File.exists?(File.dirname(log_file)))) $WORKER_LOG = Logger.new(log_file) $WORKER_LOG.datetime_format = "%Y-%m-%d %H:%M:%S" $LOG = $WORKER_LOG total_seconds = Time.now_us EM.error_handler{ |e| $WORKER_LOG.error(e.message + ' => ' + e.backtrace.join("\n")) } Signal.trap("CONT") do $WORKER_LOG.info("SIGCONT: Worker: #{@@worker_id} with PID: #{Process.pid} sleeping 15 seconds before waking up") puts "SIGCONT: Worker: #{@@worker_id} with PID: #{Process.pid} sleeping 15 seconds before waking up" sleep 15 end EventMachine::run { EventMachine::connect ip, port, self $WORKER_LOG.info "Worker: #{@@worker_id} with PID: #{Process.pid} connected to #{ip}:#{port}" } total_seconds = Time.now_us-total_seconds $WORKER_LOG.info "Client #{@@worker_id} processed: #{@@count} objs" $WORKER_LOG.info "Client #{@@worker_id} proc rate: #{@@count/total_seconds.to_f} objects/seg" end
Public Instance Methods
closing_worker()
click to toggle source
# File lib/scbi_mapreduce/worker.rb, line 35 def closing_worker end
post_init()
click to toggle source
# File lib/scbi_mapreduce/worker.rb, line 45 def post_init $WORKER_LOG.info('WORKER CONNECTED') worker_connected rescue Exception => e $WORKER_LOG.error("Exiting worker #{@@worker_id} due to exception:\n" + e.message+"\n"+e.backtrace.join("\n")) #raise e end
process_object(obj)
click to toggle source
# File lib/scbi_mapreduce/worker.rb, line 21 def process_object(obj) end
receive_initial_config(obj)
click to toggle source
# File lib/scbi_mapreduce/worker.rb, line 15 def receive_initial_config(obj) end
receive_object(obj)
click to toggle source
# File lib/scbi_mapreduce/worker.rb, line 54 def receive_object(obj) if @@count < 0 @@count += 1 # receive initial config if obj != :no_initial_config then receive_initial_config(obj[:initial_config]) $WORKER_LOG.info('Initial config: received') else $WORKER_LOG.info('Initial config: empty config') end # At first iteration, start worker starting_worker else $WORKER_LOG.info("received:"+obj.to_s) if (obj == :quit) || @@want_to_exit_worker $WORKER_LOG.info('Quit received') stop_worker elsif @@want_to_exit_worker $WORKER_LOG.info('Want to exit worker') stop_worker elsif (obj== :sleep) $WORKER_LOG.info('Sleeping 10 secs') sleep 10 send_object(:waking_up) else @@count += 1 obj.worker_identifier=@@worker_id.to_i # OJO - HAY QUE PASAR EL MODIFIED OBJECT # operation = proc { # # calculations # obj=process_object(obj) # #puts '.' + obj.seq_name # #return obj # } # # callback = proc { |modified_obj| # send_object(modified_obj) # } # # EventMachine.defer(operation, callback) #send_object(obj) begin obj.start_worker_time! modified_data=process_object(obj.data) obj.data = modified_data obj.end_worker_time! # if obj.job_identifier==3 # sleep 15 # end send_object(obj) rescue Exception => e $WORKER_LOG.error("Error processing object\n" + e.message + ":\n" + e.backtrace.join("\n")) exception= WorkerError.new('Message',e,@@worker_id,obj) send_object(exception) end end end end
starting_worker()
click to toggle source
# File lib/scbi_mapreduce/worker.rb, line 26 def starting_worker end
stop_worker()
click to toggle source
# File lib/scbi_mapreduce/worker.rb, line 136 def stop_worker $WORKER_LOG.info "Closing connection with WORKER" $WORKER_LOG.info("Worker processed #{@@count} chunks") close_connection EventMachine::stop_event_loop closing_worker end
unbind()
click to toggle source
# File lib/scbi_mapreduce/worker.rb, line 131 def unbind $WORKER_LOG.info "EXITING WORKER" EventMachine::stop_event_loop end
worker_connected()
click to toggle source
# File lib/scbi_mapreduce/worker.rb, line 31 def worker_connected end