module Rescheduler::Worker
Constants
- CMD_OPTS
- DEVNULL
¶ ↑
Runner
¶ ↑
- NONCMD_OPTS
Each worker is a process of it own, handled by this singleton module
- WORKERCMD_RESTART
- WORKERCMD_RESUME
- WORKERCMD_STOP
- WORKERCMD_SUSPEND
¶ ↑
Controller
¶ ↑
Attributes
launch_options[RW]
Public Instance Methods
clean_dead_workers()
click to toggle source
# File lib/rescheduler/worker.rb, line 65 def clean_dead_workers hostname = Socket.gethostname # Only can cleanup PIDs on own machine workers = redis.keys(rk_worker('*')) workers.each do |w| opts = redis.hgetall(w) next unless opts['machine'] == hostname next if pid_exists?(opts['pid']) print "Cleaning dead worker: #{w.split(':',3).last}\n" redis.del(w) # Remove the record (Main part of the cleanup) end nil end
cmd_to_pattern(pattern, cmd)
click to toggle source
# File lib/rescheduler/worker.rb, line 32 def cmd_to_pattern(pattern, cmd) ws = [] workers_from_pattern(pattern).each do |wname| redis.lpush(rk_queue(wname), cmd) ws << wname end return ws end
exists?(name)
click to toggle source
# File lib/rescheduler/worker.rb, line 191 def exists?(name) redis.hexists(rk_workers, name) end
handle_command(command)
click to toggle source
# File lib/rescheduler/worker.rb, line 85 def handle_command(command) case command when WORKERCMD_SUSPEND Rescheduler.log_debug "[#{worker_name}] Suspending" @in_suspend = true suspend_loop when WORKERCMD_RESUME if @in_suspend Rescheduler.log_debug "[#{worker_name}] Resuming" @in_suspend = false # This will resume the normal job loop else Rescheduler.log_debug "[#{worker_name}] Resume command received when not suspended" end when WORKERCMD_RESTART restart_self when WORKERCMD_STOP Rescheduler.log_debug "[#{worker_name}] Stopping" Rescheduler.end_job_loop @in_suspend = false else Rescheduler.log_debug "[#{worker_name}] Unknown command: #{command}" end end
inc_job_count()
click to toggle source
# File lib/rescheduler/worker.rb, line 219 def inc_job_count # Only do this if we are launched as a worker if named? jc = redis.hincrby(rk_worker, 'job_count', 1) # Check for respawn_jobs respawn_jobs = @launch_options['respawn_jobs'].to_i restart_self if respawn_jobs > 0 && jc >= respawn_jobs # Check for respawn_time respawn_time = @launch_options['respawn_time'].to_i if respawn_time > 0 created = redis.hget(rk_worker, 'created') restart_self if created && created.to_i + respawn_time < Time.now.to_i end end end
is_respawning?()
click to toggle source
# File lib/rescheduler/worker.rb, line 146 def is_respawning?; @launch_options && @launch_options['respawn']; end
kill(worker_name, force=false)
click to toggle source
Do not run this unless everything else fails, it kills unconditionally
# File lib/rescheduler/worker.rb, line 48 def kill(worker_name, force=false) key = rk_worker(worker_name) opts = redis.hgetall(key) if force || opts['machine'] == Socket.gethostname stop(worker_name) sleep 1 # Give the process a second to quick peacefully redis.del(key) # Actually kill the process begin pid = opts['pid'] # "HUP" does not get recognized in windows Process.kill(9, pid.to_i) if pid && opts['machine'] == Socket.gethostname rescue Errno::ESRCH, Errno::EPERM # No such process, Not permitted, these will be ginored end end end
kill_all(pattern, force=false)
click to toggle source
# File lib/rescheduler/worker.rb, line 41 def kill_all(pattern, force=false) workers_from_pattern(pattern).each do |wname| kill(wname, force) end end
load_env(env)
click to toggle source
# File lib/rescheduler/worker.rb, line 254 def load_env(env) env.is_a?(String) ? Hash[env.split(';').map{|e| a,b=e.split('=', 2); b ||= true; [a,b]}] : env end
named?()
click to toggle source
# File lib/rescheduler/worker.rb, line 215 def named? @launch_options && @launch_options.include?('worker_name') end
opt_to_str(opts)
click to toggle source
# File lib/rescheduler/worker.rb, line 263 def opt_to_str(opts) name = opts['worker_name'] file = opts['worker_file'] args = opts.map do |k,v| next if NONCMD_OPTS.include?(k) if k == 'env' "--env=#{pack_env(v)}" elsif v==true || v == 'true' "--#{k}" else "--#{k}=#{v}" end end args << name args << file return args.join(' ') end
pack_env(env)
click to toggle source
# File lib/rescheduler/worker.rb, line 258 def pack_env(env) return env if env.is_a?(String) env.map {|k,v| "#{k}=#{v}"}.join(';') end
pid_exists?(pid)
click to toggle source
# File lib/rescheduler/worker.rb, line 286 def pid_exists?(pid) Process.kill(0, pid.to_i) return true rescue Errno::ESRCH return false end
redirect_logging(opts)
click to toggle source
# File lib/rescheduler/worker.rb, line 124 def redirect_logging(opts) return if windows_env? # Do not redirect in windows environment, let it run with cmd console logfile = opts['log'] logfile ||= File.join(opts['rails'], "log/#{worker_name}.log") if opts['rails'] logfile ||= DEVNULL # Redirect io unless logfile == DEVNULL # Code inspired by Daemonize::redirect_io begin STDOUT.reopen logfile, "a" File.chmod(0644, logfile) STDOUT.sync = true rescue ::Exception begin; STDOUT.reopen DEVNULL; rescue ::Exception; end end begin; STDERR.reopen STDOUT; rescue ::Exception; end STDERR.sync = true end end
redis()
click to toggle source
# File lib/rescheduler/worker.rb, line 281 def redis Rescheduler.send :redis # Call private method in Rescheduler module end
register(opt)
click to toggle source
# File lib/rescheduler/worker.rb, line 148 def register(opt) @launch_options = opt # Save for respawning if @launch_options['respawn'] wname = @launch_options['worker_name'] # Reset created for respawn purposes. redis.hset(rk_worker(wname), 'created', Time.now.to_i) else name_pattern = @launch_options['worker_name'] widx = 0 # We do not preserve worker_index upon respawn wname = nil last_wname = nil loop do wname = name_pattern.gsub('%', widx.to_s) if wname == last_wname # For the name without %, we add one to the end if clashes wname += widx.to_s name_pattern += '%' end break if redis.hsetnx(rk_worker(wname), 'created', Time.now.to_i) widx += 1 last_wname = wname end @launch_options['worker_name'] = wname # Save this in launch options @launch_options['worker_index'] = widx # Sequence of the same worker end @launch_options['pid'] = Process.pid @launch_options['machine'] = Socket.gethostname # Setup the worker stats redis.multi do redis.hincrby(rk_worker, 'spawn_count', 1) redis.hset(rk_worker, 'job_count', 0) # Reset the job count since respawn redis.mapped_hmset(rk_worker, @launch_options) # Save launch options redis.del(rk_queue) # Clear old control commands end nil end
respawn_if_requested()
click to toggle source
# File lib/rescheduler/worker.rb, line 235 def respawn_if_requested return false unless @respawn @launch_options['respawn'] = true spawn(@launch_options) return true end
restart_self()
click to toggle source
# File lib/rescheduler/worker.rb, line 78 def restart_self Rescheduler.log_debug "[#{worker_name}] Restarting" @respawn = true Rescheduler.end_job_loop @in_suspend = false end
rk_queue(name = nil)
click to toggle source
# File lib/rescheduler/worker.rb, line 200 def rk_queue(name = nil) name ||= worker_name Rescheduler.prefix + 'TMWORKERQUEUE:' + name end
rk_worker(name = nil)
click to toggle source
# File lib/rescheduler/worker.rb, line 195 def rk_worker(name = nil) name ||= worker_name Rescheduler.prefix + 'TMWORKER:' + name end
spawn(opts)
click to toggle source
# File lib/rescheduler/worker.rb, line 242 def spawn(opts) system_options = {} env = load_env(opts['env']) if opts.include?('env') env ||= {} #system_options['chdir'] = opts['chdir'] if opts.include?('chdir') cmd = "rescheduler_launch #{opt_to_str(opts)}" print "EXEC: #{cmd}\n" pid = Kernel.spawn(env, cmd, system_options) Process.detach(pid) end
stats()
click to toggle source
# File lib/rescheduler/worker.rb, line 205 def stats stats = {} workers = redis.keys(rk_worker('*')) kl = rk_worker('').length workers.each do |w| stats[w[kl..-1]] = redis.hgetall(w) end return stats end
suspend_loop()
click to toggle source
# File lib/rescheduler/worker.rb, line 109 def suspend_loop while @in_suspend do result = redis.brpop(rk_queue) if result command = result[1] handle_command(command) end end end
unregister()
click to toggle source
# File lib/rescheduler/worker.rb, line 187 def unregister redis.del(rk_worker) end
windows_env?()
click to toggle source
# File lib/rescheduler/worker.rb, line 285 def windows_env?; RUBY_PLATFORM.end_with?('mingw32'); end
worker_index()
click to toggle source
# File lib/rescheduler/worker.rb, line 12 def worker_index; @launch_options && @launch_options['worker_index'] || -1; end
worker_name()
click to toggle source
# File lib/rescheduler/worker.rb, line 11 def worker_name; @launch_options && @launch_options['worker_name']; end
workers_from_pattern(pattern)
click to toggle source
# File lib/rescheduler/worker.rb, line 25 def workers_from_pattern(pattern) pattern = pattern.gsub('%', '*') workers = redis.keys(rk_worker(pattern)) kl = rk_worker('').length workers.map {|w| w[kl..-1]} end