class Beanpicker::Worker::Child

Child is the class that handle the job. Every job can be one or more childs

Attributes

fork_every[R]

Should fork every job?

fork_every_pid[R]

The pid of running job

fork_master[R]

Should fork the child process?

fork_master_pid[R]

The pid of forked child process

job_name[R]

The name of job

number[R]

The number of job(generated by Child::process)

opts[R]

The default options merged with argument options

worker[R]

The Worker (father?) or nil

Public Class Methods

new(job, opts={}, number=0, worker=nil, &blk) click to toggle source

Create a new job, start a fork if @opts and start the work

# File lib/beanpicker/job_server.rb, line 157
def initialize(job, opts={}, number=0, worker=nil, &blk)
  @job_name    = job
  @opts        = {
    :childs      => Beanpicker::default_childs_number,
    :fork_every  => Beanpicker::default_fork_every,
    :fork_master => Beanpicker::default_fork_master
  }.merge(opts)
  @number      = number
  @blk         = blk
  @loop        = nil
  @beanstalk   = Beanpicker::new_beanstalk
  @run         = true
  @job         = nil
  @worker      = worker
  if @opts[:fork]
    _fork_every  = @opts[:fork].to_s == 'every'
    _fork_master = @opts[:fork].to_s == 'master'
  else
    _fork_every  = !!@opts[:fork_every]
    _fork_master = !!@opts[:fork_master]
  end
  #really need self
  self.log_handler = @opts[:log_file] unless @opts[:log_file].nil?
  @fork_every  = Beanpicker::fork_every.nil?  ? _fork_every  : Beanpicker::fork_every
  @fork_master = Beanpicker::fork_master.nil? ? _fork_master : Beanpicker::fork_master
  @fork_master_pid = nil
  @fork_every_pid  = nil
  start_watch
  start_loop
end
process(job, opts={}, worker=nil, &blk) click to toggle source

Process job. Use opts or Beanpicker::default_childs_number to determine how many childs should be created

# File lib/beanpicker/job_server.rb, line 132
def self.process(job, opts={}, worker=nil, &blk)
  (opts[:childs] || Beanpicker::default_childs_number).times.map do |i|
    Child.new(job, opts, i, worker, &blk)
  end
end

Public Instance Methods

at_exit_to_every_child_fork() click to toggle source

Create a at_exit to bury the job(if any) and exit

Used by fork

# File lib/beanpicker/job_server.rb, line 341
def at_exit_to_every_child_fork
  at_exit do
    Thread.new do
      sleep 1
      Kernel.exit!
    end
    BEANPICKER_FORK[:job].bury rescue nil if BEANPICKER_FORK[:job]
  end
end
at_exit_to_master_child_fork() click to toggle source

Create a at_exit to kill the child(if any)

Used by fork_master_child_and_monitor

# File lib/beanpicker/job_server.rb, line 321
def at_exit_to_master_child_fork
  at_exit do
    pid = BEANPICKER_FORK[:child_pid]
    if pid and pid > 0
      if Process.running?(pid)
        Process.kill "TERM", pid
        sleep 0.1
        if Process.running?(pid)
          sleep 2
          Process.kill "KILL", pid if Process.running?(pid)
        end
      end
    end
    Kernel.exit!
  end
end
beanstalk() click to toggle source

Return the beanstalk connection(Child don’t use the global connection)

# File lib/beanpicker/job_server.rb, line 189
def beanstalk
  @beanstalk
end
die!() click to toggle source

Stop running, kill the thread and kill master os every process

# File lib/beanpicker/job_server.rb, line 283
def die!
  @run = false
  @loop.kill if @loop and @loop.alive?

  kill_pid = nil
  if @fork_master
    kill_pid = @fork_master_pid
  elsif @fork_every
    kill_pid = @fork_every_pid
  end

  if kill_pid and kill_pid.is_a?(Integer) and Process.running?(kill_pid)
    debug "Killing child with pid #{kill_pid}"
    Process.kill "TERM", kill_pid
  end

end
fork(&blk) click to toggle source

Fork the process if @fork_every and wait or only call the block

# File lib/beanpicker/job_server.rb, line 255
def fork(&blk)
  if @fork_every
    @fork_every_pid = pid = Kernel.fork do
      BEANPICKER_FORK[:child_every] = true
      Process.die_with_parent
      at_exit_to_every_child_fork
      $0 = "Beanpicker job child #{@job_name}##{@number} of #{Process.ppid}"
      blk.call
    end
    if BEANPICKER_FORK[:child_master]
      BEANPICKER_FORK[:child_pid] = pid
      Process.waitpid pid
      BEANPICKER_FORK[:child_pid] = nil
    else
      Process.waitpid pid
    end
    @fork_every_pid = nil
  else
    blk.call
  end
end
fork_master_child_and_monitor() click to toggle source

Crete a new fork, change the name and create a thread to restart the fork if it die

Called by start_loop when fork_master

# File lib/beanpicker/job_server.rb, line 304
def fork_master_child_and_monitor
  @fork_master_pid = Kernel.fork do
    at_exit_to_master_child_fork
    Process.die_with_parent
    BEANPICKER_FORK[:child_master] = true
    $0 = "Beanpicker master child #{@job_name}##{@number}"
    work_loop(self)
  end
  @loop = Thread.new(self) do |child|
    Process.waitpid @fork_master_pid
    child.fork_master_child_and_monitor if child.running?
  end
end
log_handler() click to toggle source

Return the own log_handler, or the Worker log_handler or the global log_handler

# File lib/beanpicker/job_server.rb, line 352
def log_handler
  #'@log_handler || ' go to worker/global log_handler even if @log_handler is defined
  defined?(@log_handler) ? @log_handler : @worker.nil? ? Beanpicker::log_handler : @worker.log_handler
end
running?() click to toggle source

The child still working?

# File lib/beanpicker/job_server.rb, line 278
def running?
  @run
end
start_loop() click to toggle source

Start the loop, fork if needed

# File lib/beanpicker/job_server.rb, line 202
def start_loop
  return false if @loop and @loop.alive?
  if @fork_master
    fork_master_child_and_monitor
  else
    @loop = Thread.new(self) do |child|
      work_loop(child)
    end
  end
end
start_watch() click to toggle source

Watch the tube with job name and ignore all others

# File lib/beanpicker/job_server.rb, line 194
def start_watch
  beanstalk.watch(@job_name)
  beanstalk.list_tubes_watched.each do |server, tubes|
    tubes.each { |tube| beanstalk.ignore(tube) unless tube == @job_name }
  end
end
start_work(child=self) click to toggle source

Here is all the magic :)

Call fork and start the job and delete on end, bury the job if a error is raised

# File lib/beanpicker/job_server.rb, line 221
def start_work(child=self)
  fork do
    begin
      @job = child.beanstalk.reserve
      BEANPICKER_FORK[:job] = @job if BEANPICKER_FORK[:child_every]
      data  = @job.ybody

      if not data.is_a?(Hash) or [:args, :next_jobs] - data.keys != []
        data = { :args => data, :next_jobs => [] }
      end

      t=Time.now
      debug "Running #{@job_name}##{@number} with args #{data[:args]}; next jobs #{data[:next_jobs]}"
      r = @blk.call(data[:args].clone)
      debug "Job #{@job_name}##{@number} finished in #{Time.now-t} seconds with return #{r}"
      data[:args].merge!(r) if r.is_a?(Hash) and data[:args].is_a?(Hash)

      @job.delete

      Beanpicker.enqueue(data[:next_jobs], data[:args]) if r and not data[:next_jobs].empty?
    rescue => e
      fatal Beanpicker::exception_message(e, "in loop of #{@job_name}##{@number} with pid #{Process.pid}")
      if BEANPICKER_FORK[:child_every]
        exit
      else
        Thread.new(@job) { |j| j.bury rescue nil }
      end
    ensure
      @job = nil
    end
  end
end
work_loop(child) click to toggle source

call start_work passing itself(child) while @run

# File lib/beanpicker/job_server.rb, line 214
def work_loop(child)
  start_work(child) while @run
end