module ActiveWorker::Behavior::ExecuteConcurrently

Public Instance Methods

after_fork(param) click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 118
def after_fork(param)
  self.role = FORKED
  cleanup_after_children
  set_process_name(param)
  reset_mongoid
  reset_resque
end
cleanup_after_children() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 75
def cleanup_after_children
  @pids = []
  @threads = []
end
execute_concurrently(params) click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 53
def execute_concurrently(params)
  new_threads = params.map do |param|
    case local_worker_mode
      when THREADED_MODE
        execute_thread param
      when FORKING_MODE
        execute_fork param
    end
  end

  threads.concat new_threads
  new_threads
end
execute_fork(param) click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 96
def execute_fork(param)
  pid = fork do
    in_fork(param)
  end
  pids << pid
  Process.detach(pid)
end
execute_thread(param) click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 90
def execute_thread(param)
  Thread.new do
    in_thread(param)
  end
end
forked?() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 41
def forked?
  role == FORKED
end
forking?() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 25
def forking?
  local_worker_mode == FORKING_MODE
end
in_fork(param) click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 108
def in_fork(param)
  after_fork(param)
  execute(param)
rescue SignalException
  self.handle_termination([param.to_param])
  exit
rescue Exception => e
  self.handle_error(e, :in_fork, [param.to_param])
end
in_thread(param) click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 104
def in_thread(param)
  execute(param)
end
kill_children() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 80
def kill_children
  pids.each do |pid|
    begin
      Process.kill("TERM", pid) if pid
    rescue Errno::ESRCH
      puts "PID: #{pid} did not exist when we went to kill it"
    end
  end
end
local_worker_mode() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 17
def local_worker_mode
  @@local_worker_mode ||= FORKING_MODE
end
local_worker_mode=(mode) click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 13
def local_worker_mode=(mode)
  @@local_worker_mode = mode
end
parent?() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 37
def parent?
  role == PARENT
end
pids() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 45
def pids
  @pids ||= []
end
reset_mongoid() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 130
def reset_mongoid
  Mongoid::Sessions.clear
end
reset_resque() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 134
def reset_resque
  Resque.redis.client.reconnect
  trap("TERM", "DEFAULT")
end
role() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 29
def role
  @@role ||= PARENT
end
role=(role) click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 33
def role=(role)
  @@role = role
end
set_process_name(param) click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 126
def set_process_name(param)
  $0 = "ActiveWorker Forked from #{Process.ppid} for #{param}"
end
threaded?() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 21
def threaded?
  local_worker_mode == THREADED_MODE
end
threads() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 49
def threads
  @threads ||= []
end
wait_for_children() click to toggle source
# File lib/active_worker/behavior/execute_concurrently.rb, line 67
def wait_for_children
  threads.each do |thread|
    thread.join if thread
  end

  cleanup_after_children
end