class TengineJobAgent::Watchdog
Public Class Methods
new(logger, args, config = {})
click to toggle source
# File lib/tengine_job_agent/watchdog.rb, line 14 def initialize(logger, args, config = {}) @uuid = UUID.new.generate @logger = logger @pid_output = config['pid_output'] || STDOUT @pid_path, @program, *@args = *args @config = config end
Public Instance Methods
fire_finished(pid, process_status)
click to toggle source
# File lib/tengine_job_agent/watchdog.rb, line 104 def fire_finished(pid, process_status) exit_status = process_status.exitstatus # killされた場合にnilの可能性がある level_key = exit_status == 0 ? :info : :error @logger.info("#{self.class.name}#fire_finished starting #{pid} #{level_key}(#{exit_status})") event_properties = { "execution_id" => ENV['MM_SCHEDULE_ID'], "root_jobnet_id" => ENV['MM_ROOT_JOBNET_ID'], "target_jobnet_id" => ENV['MM_TARGET_JOBNET_ID'], "target_job_id" => ENV['MM_ACTUAL_JOB_ID'], "pid" => pid, "exit_status" => exit_status, "command" => [@program, @args].flatten.join(" "), } user_stdout_path = output_filepath("stdout", pid) user_stderr_path = output_filepath("stderr", pid) condition = (level_key == :info) ? lambda{|src| File.size(src) > 0} : nil event_properties[:stdout_log] = copy_file(@stdout.path, user_stdout_path, &condition) event_properties[:stderr_log] = copy_file(@stderr.path, user_stderr_path, &condition) if level_key == :error event_properties[:message] = "Job process failed. STDOUT and STDERR were redirected to files.\n" << "You can see them at '#{user_stdout_path}' and '#{user_stderr_path}'\n" << "on the server '#{ENV['MM_SERVER_NAME']}'" end sender.fire("finished.process.job.tengine", { :key => @uuid, :level_key => level_key, :source_name => source_name(pid), :sender_name => sender_name, :properties => event_properties, }) @logger.info("#{self.class.name}#fire_finished complete") sender.stop end
fire_heartbeat(pid, &block)
click to toggle source
# File lib/tengine_job_agent/watchdog.rb, line 143 def fire_heartbeat pid, &block sender.fire("job.heartbeat.tengine", { :key => @uuid, :level_key => :debug, :sender_name => sender_name, :source_name => source_name(pid), :occurred_at => Time.now, :properties => { "root_jobnet_id" => ENV['MM_ROOT_JOBNET_ID'], "target_jobnet_id" => ENV['MM_TARGET_JOBNET_ID'], "target_job_id" => ENV['MM_ACTUAL_JOB_ID'], "pid" => pid, "command" => [@program, @args].flatten.join(" "), }, :keep_connection => true, :retry_count => 0, }, &block) @logger.debug("#{self.class.name}#fire_heartbeat #{pid}") end
process()
click to toggle source
# File lib/tengine_job_agent/watchdog.rb, line 22 def process @logger.info("process start") pid, process_status = nil, nil @logger.debug("#{__FILE__}##{__LINE__} before with_tmp_outs") with_tmp_outs do |stdout, stderr| @logger.debug("#{__FILE__}##{__LINE__} before EM.run") EM.run do @logger.debug("#{__FILE__}##{__LINE__} before sender.mq_suite.send :ensures, :connection") sender.mq_suite.send :ensures, :connection do @logger.debug("before spawn_process") begin @logger.debug("#{__FILE__}##{__LINE__} before spawn_process") pid = spawn_process @logger.debug("#{__FILE__}##{__LINE__} before output pid") File.open(@pid_path, "a"){|f| f.puts(pid)} # 起動したPIDを呼び出し元に返す @logger.debug("#{__FILE__}##{__LINE__} before start_wait_process") start_wait_process(pid) @logger.debug("#{__FILE__}##{__LINE__} after start_wait_process") rescue Exception => e @logger.error("[#{e.class.name}] #{e.message}") File.open(@pid_path, "a"){|f| f.puts("[#{e.class.name}] #{e.message}")} EM.stop end @logger.debug("#{__FILE__}##{__LINE__}") end @logger.debug("#{__FILE__}##{__LINE__} after sender.mq_suite.send :ensures, :connection") end @logger.debug("#{__FILE__}##{__LINE__} after EM.run") end @logger.debug("#{__FILE__}##{__LINE__} after with_tmp_outs") end
sender()
click to toggle source
# File lib/tengine_job_agent/watchdog.rb, line 163 def sender unless @sender sender_config = {logger: @logger}.update((@config || {}).deep_symbolize_keys) c = sender_config[:sender] ||= {} c[:keep_connection] = true @logger.info("config for sender: #{sender_config.inspect}") @sender = Tengine::Event::Sender.new(sender_config) @logger.info("#{self.class.name}@sender.default_keep_connection # => #{@sender.default_keep_connection.inspect}") end @sender end
spawn_process()
click to toggle source
# File lib/tengine_job_agent/watchdog.rb, line 54 def spawn_process options = { :out => @stdout.path, :err => @stderr.path, :pgroup => true} args = [@program, *(@args + [options])] @logger.info("Process.spawn(*#{args.inspect})") pid = Process.spawn(*args) @logger.info("spawned process PID: #{pid}") return pid rescue Exception => e @logger.error("[#{e.class.name}] #{e.message}\n " << e.backtrace.join("\n ")) raise end
start_wait_process(pid)
click to toggle source
# File lib/tengine_job_agent/watchdog.rb, line 69 def start_wait_process(pid) @logger.info("#{self.class.name}#start_wait_process(#{pid}) begin") fire_heartbeat pid do @logger.info("\e[31mbegin block for fire_heartbeat(#{pid})") timer = nil @logger.info("#{__FILE__}##{__LINE__}") int = @config["heartbeat"]["job"]["interval"] @logger.info("#{__FILE__}##{__LINE__}") if int and int > 0 @logger.info("before EM.add_periodic_timer(#{int.inspect})") timer = EM.add_periodic_timer int do fire_heartbeat pid do end # <- rspecを黙らせるための無駄なブロック end end @logger.info("\e[31mbefore EM.defer ...") EM.defer( lambda { @logger.info("before Process.waitpid2 #{pid} ...") res = Process.waitpid2 pid @logger.info("$?: " << $?.inspect) res }, lambda {|a| @logger.info("process finished: " << a[1].exitstatus.inspect) EM.cancel_timer timer if timer fire_finished(*a) } ) @logger.info("after EM.defer ...") end @logger.info("#{self.class.name}#start_wait_process(#{pid}) end") end
Private Instance Methods
copy_file(src, dest) { |src| ... }
click to toggle source
# File lib/tengine_job_agent/watchdog.rb, line 206 def copy_file(src, dest) return nil unless File.exist?(src) if block_given? return nil unless yield(src) end FileUtils.cp(src, dest) dest end
output_filepath(prefix, pid)
click to toggle source
# File lib/tengine_job_agent/watchdog.rb, line 184 def output_filepath(prefix, pid) File.expand_path("#{prefix}-#{pid}.log", @config['log_dir']) end
sender_name()
click to toggle source
# File lib/tengine_job_agent/watchdog.rb, line 176 def sender_name @sender_name ||= sprintf "agent:%s/%d/tengine_job_agent", Tengine::Event.host_name, Process.pid end
source_name(pid)
click to toggle source
# File lib/tengine_job_agent/watchdog.rb, line 180 def source_name pid sprintf "job:%s/%d/%s/%s", ENV['MM_SERVER_NAME'], pid, ENV['MM_ROOT_JOBNET_ID'], ENV['MM_ACTUAL_JOB_ID'] end
with_tmp_outs() { || ... }
click to toggle source
# File lib/tengine_job_agent/watchdog.rb, line 188 def with_tmp_outs Tempfile.open("stdout-#{Process.pid}.log") do |tmp_stdout| @stdout = tmp_stdout begin Tempfile.open("stderr-#{Process.pid}.log") do |tmp_stderr| @stderr = tmp_stderr begin yield ensure @stderr = nil end end ensure @stdout = nil end end end