class Beanpicker::Worker::Child
Child
is the class that handle the job. Every job can be one or more childs
Attributes
Should fork every job?
The pid of running job
Should fork the child process?
The pid of forked child process
The name of job
The number of job(generated by Child::process
)
The default options merged with argument options
The Worker
(father?) or nil
Public Class Methods
Create a new job, start a fork if @opts and start the work
# File lib/beanpicker/job_server.rb, line 157 def initialize(job, opts={}, number=0, worker=nil, &blk) @job_name = job @opts = { :childs => Beanpicker::default_childs_number, :fork_every => Beanpicker::default_fork_every, :fork_master => Beanpicker::default_fork_master }.merge(opts) @number = number @blk = blk @loop = nil @beanstalk = Beanpicker::new_beanstalk @run = true @job = nil @worker = worker if @opts[:fork] _fork_every = @opts[:fork].to_s == 'every' _fork_master = @opts[:fork].to_s == 'master' else _fork_every = !!@opts[:fork_every] _fork_master = !!@opts[:fork_master] end #really need self self.log_handler = @opts[:log_file] unless @opts[:log_file].nil? @fork_every = Beanpicker::fork_every.nil? ? _fork_every : Beanpicker::fork_every @fork_master = Beanpicker::fork_master.nil? ? _fork_master : Beanpicker::fork_master @fork_master_pid = nil @fork_every_pid = nil start_watch start_loop end
Process
job. Use opts or Beanpicker::default_childs_number to determine how many childs should be created
# File lib/beanpicker/job_server.rb, line 132 def self.process(job, opts={}, worker=nil, &blk) (opts[:childs] || Beanpicker::default_childs_number).times.map do |i| Child.new(job, opts, i, worker, &blk) end end
Public Instance Methods
Create a at_exit to bury the job(if any) and exit
Used by fork
# File lib/beanpicker/job_server.rb, line 341 def at_exit_to_every_child_fork at_exit do Thread.new do sleep 1 Kernel.exit! end BEANPICKER_FORK[:job].bury rescue nil if BEANPICKER_FORK[:job] end end
Create a at_exit to kill the child(if any)
Used by fork_master_child_and_monitor
# File lib/beanpicker/job_server.rb, line 321 def at_exit_to_master_child_fork at_exit do pid = BEANPICKER_FORK[:child_pid] if pid and pid > 0 if Process.running?(pid) Process.kill "TERM", pid sleep 0.1 if Process.running?(pid) sleep 2 Process.kill "KILL", pid if Process.running?(pid) end end end Kernel.exit! end end
Return the beanstalk connection(Child
don’t use the global connection)
# File lib/beanpicker/job_server.rb, line 189 def beanstalk @beanstalk end
Stop running, kill the thread and kill master os every process
# File lib/beanpicker/job_server.rb, line 283 def die! @run = false @loop.kill if @loop and @loop.alive? kill_pid = nil if @fork_master kill_pid = @fork_master_pid elsif @fork_every kill_pid = @fork_every_pid end if kill_pid and kill_pid.is_a?(Integer) and Process.running?(kill_pid) debug "Killing child with pid #{kill_pid}" Process.kill "TERM", kill_pid end end
Fork the process if @fork_every and wait or only call the block
# File lib/beanpicker/job_server.rb, line 255 def fork(&blk) if @fork_every @fork_every_pid = pid = Kernel.fork do BEANPICKER_FORK[:child_every] = true Process.die_with_parent at_exit_to_every_child_fork $0 = "Beanpicker job child #{@job_name}##{@number} of #{Process.ppid}" blk.call end if BEANPICKER_FORK[:child_master] BEANPICKER_FORK[:child_pid] = pid Process.waitpid pid BEANPICKER_FORK[:child_pid] = nil else Process.waitpid pid end @fork_every_pid = nil else blk.call end end
Crete a new fork, change the name and create a thread to restart the fork if it die
Called by start_loop
when fork_master
# File lib/beanpicker/job_server.rb, line 304 def fork_master_child_and_monitor @fork_master_pid = Kernel.fork do at_exit_to_master_child_fork Process.die_with_parent BEANPICKER_FORK[:child_master] = true $0 = "Beanpicker master child #{@job_name}##{@number}" work_loop(self) end @loop = Thread.new(self) do |child| Process.waitpid @fork_master_pid child.fork_master_child_and_monitor if child.running? end end
Return the own log_handler
, or the Worker
log_handler
or the global log_handler
# File lib/beanpicker/job_server.rb, line 352 def log_handler #'@log_handler || ' go to worker/global log_handler even if @log_handler is defined defined?(@log_handler) ? @log_handler : @worker.nil? ? Beanpicker::log_handler : @worker.log_handler end
The child still working?
# File lib/beanpicker/job_server.rb, line 278 def running? @run end
Start the loop, fork if needed
# File lib/beanpicker/job_server.rb, line 202 def start_loop return false if @loop and @loop.alive? if @fork_master fork_master_child_and_monitor else @loop = Thread.new(self) do |child| work_loop(child) end end end
Watch the tube with job name and ignore all others
# File lib/beanpicker/job_server.rb, line 194 def start_watch beanstalk.watch(@job_name) beanstalk.list_tubes_watched.each do |server, tubes| tubes.each { |tube| beanstalk.ignore(tube) unless tube == @job_name } end end
Here is all the magic :)
Call fork and start the job and delete on end, bury the job if a error is raised
# File lib/beanpicker/job_server.rb, line 221 def start_work(child=self) fork do begin @job = child.beanstalk.reserve BEANPICKER_FORK[:job] = @job if BEANPICKER_FORK[:child_every] data = @job.ybody if not data.is_a?(Hash) or [:args, :next_jobs] - data.keys != [] data = { :args => data, :next_jobs => [] } end t=Time.now debug "Running #{@job_name}##{@number} with args #{data[:args]}; next jobs #{data[:next_jobs]}" r = @blk.call(data[:args].clone) debug "Job #{@job_name}##{@number} finished in #{Time.now-t} seconds with return #{r}" data[:args].merge!(r) if r.is_a?(Hash) and data[:args].is_a?(Hash) @job.delete Beanpicker.enqueue(data[:next_jobs], data[:args]) if r and not data[:next_jobs].empty? rescue => e fatal Beanpicker::exception_message(e, "in loop of #{@job_name}##{@number} with pid #{Process.pid}") if BEANPICKER_FORK[:child_every] exit else Thread.new(@job) { |j| j.bury rescue nil } end ensure @job = nil end end end
call start_work
passing itself(child) while @run
# File lib/beanpicker/job_server.rb, line 214 def work_loop(child) start_work(child) while @run end