class Abricot::Worker
Attributes
redis[RW]
redis_sub[RW]
runner_threads[RW]
Public Class Methods
new(options={})
click to toggle source
# File lib/abricot/worker.rb, line 8 def initialize(options={}) @redis = Redis.new(:url => options[:redis]) @redis_sub = Redis.new(:url => options[:redis]) @runner_threads = {} end
Public Instance Methods
exec_and_capture(job_id, *args)
click to toggle source
# File lib/abricot/worker.rb, line 85 def exec_and_capture(job_id, *args) args = args.map(&:to_s) IO.popen('-') do |io| unless io trap("SIGINT", "IGNORE") trap("SIGTERM", "IGNORE") $stderr.reopen($stdout) begin exec(*args) rescue Exception => e STDERR.puts "#{e} while running #{args}" end exit! 1 end status = nil output = [] loop do unless @runner_threads[job_id] STDERR.puts "WARNING: Killing Running Job!" Process.kill('KILL', io.pid) break end if IO.select([io], [], [], 0.1) buffer = io.read break if buffer.empty? output << buffer end end _, status = Process.waitpid2(io.pid) [output.join, status.exitstatus] end end
kill_all_jobs()
click to toggle source
# File lib/abricot/worker.rb, line 34 def kill_all_jobs runner_threads.keys.each { |k| kill_job(k) } end
kill_job(id)
click to toggle source
# File lib/abricot/worker.rb, line 38 def kill_job(id) if thread = @runner_threads.delete(id.to_s) thread.join unless thread == Thread.current end end
listen()
click to toggle source
# File lib/abricot/worker.rb, line 14 def listen trap(:INT) { puts; exit } redis_sub.subscribe('abricot:slave_control') do |on| on.message do |channel, message| msg = JSON.parse(message) id = msg['id'] case msg['type'] when 'killall' then kill_all_jobs when 'kill' then kill_job(id) else if redis.incr("abricot:job:#{id}:num_workers") <= msg['num_workers'] kill_job(id) @runner_threads[id] = Thread.new { run(msg) } end end end end end
run(options)
click to toggle source
# File lib/abricot/worker.rb, line 44 def run(options) id = options['id'].to_s STDERR.puts "-" * 80 STDERR.puts "Running job: #{options}" STDERR.puts "-" * 80 redis.publish("abricot:job:#{id}:progress", {'type' => 'start'}.to_json) output, status = case options['type'] when 'exec' then exec_and_capture(id, *options['args']) when 'script' then file = Tempfile.new('abricot-') begin file.write(options['script']) file.chmod(0755) file.close exec_and_capture(id, file.path) ensure file.delete end else raise "Unknown type" end return unless status STDERR.puts output STDERR.puts "exited with #{status}" STDERR.puts "-" * 80 STDERR.puts "" payload = {'type' => 'done'} payload['status'] = status payload['output'] = output if status != 0 redis.publish("abricot:job:#{id}:progress", payload.to_json) kill_job(id) rescue Exception => e STDERR.puts e end