class CapistranoMulticonfigParallel::CelluloidManager
manager class that handles workers
Attributes
bundler_workers[R]
job_to_condition[RW]
job_to_worker[RW]
jobs[RW]
mutex[RW]
registration_complete[RW]
stderr_buffer[RW]
worker_supervisor[R]
worker_to_job[RW]
workers[R]
workers_terminated[RW]
Public Class Methods
new(job_manager)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 14 def initialize(job_manager) @job_manager = job_manager @registration_complete = false return if configuration.multi_secvential.to_s.downcase == 'true' # start SupervisionGroup @worker_supervisor = setup_supervision_group # Get a handle on the SupervisionGroup::Member @mutex = Mutex.new # http://rubydoc.info/gems/celluloid/Celluloid/SupervisionGroup/Member @workers = setup_pool_of_actor(@worker_supervisor, actor_name: :workers, type: CapistranoMulticonfigParallel::CelluloidWorker, size: 10) #@workers = Celluloid::Actor[:workers].pool Actor.current.link @workers setup_actor_supervision(@worker_supervisor, actor_name: :terminal_server, type: CapistranoMulticonfigParallel::TerminalTable, args: [Actor.current, @job_manager, configuration.fetch(:terminal, {})]) setup_actor_supervision(@worker_supervisor, actor_name: :web_server, type: CapistranoMulticonfigParallel::WebServer, args: websocket_config) # Get a handle on the PoolManager # http://rubydoc.info/gems/celluloid/Celluloid/PoolManager # @workers = workers_pool.actor @stderr_buffer = StringIO.new @conditions = [] @jobs = {} @job_to_worker = {} @worker_to_job = {} @job_to_condition = {} end
Public Instance Methods
all_workers_finished?()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 69 def all_workers_finished? @jobs.all? { |_job_id, job| job.work_done? } end
apply_confirmation_for_job(job)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 115 def apply_confirmation_for_job(job) configuration.apply_stage_confirmation.include?(job.stage) && apply_confirmations? end
apply_confirmations?()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 106 def apply_confirmations? confirmations = configuration.task_confirmations confirmations.is_a?(Array) && confirmations.present? end
can_tag_staging?()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 211 def can_tag_staging? @job_manager.can_tag_staging? && @job_manager.tag_staging_exists? && @jobs.find { |_job_id, job| job.stage == 'production' }.blank? end
check_workers_done?()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 91 def check_workers_done? Thread.new do loop do if Actor.current.alive? && all_workers_finished? @workers_terminated.signal('completed') break end end end end
confirm_task_approval(result, task, processed_job = nil)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 183 def confirm_task_approval(result, task, processed_job = nil) return unless result.present? result = print_confirm_task_approvall(result, task, processed_job) return unless action_confirmed?(result) @jobs.pmap do |job_id, job| worker = get_worker_for_job(job_id) if worker.alive? worker.publish_rake_event('approved' => 'yes', 'action' => 'invoke', 'job_id' => job.id, 'task' => task ) end end end
delegate_job(job, old_job = "")
click to toggle source
call to send an actor a job
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 50 def delegate_job(job, old_job = "") @jobs[job.id] = job # debug(@jobs) # start work and send it to the background @workers.work(job, Actor.current, old_job) end
dispatch_new_job(job, options = {})
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 216 def dispatch_new_job(job, options = {}) return unless job.is_a?(CapistranoMulticonfigParallel::Job) options.stringify_keys! if options.present? env_opts = options['skip_env_options'].present? ? {} : @job_manager.get_app_additional_env_options(job.app, job.stage) new_job_options = job.options.except!('id', 'status', 'exit_status').merge('env_options' => job.env_options.merge(env_opts)) new_job = CapistranoMulticonfigParallel::Job.new(@job_manager, new_job_options.merge(options)) log_to_file("Trying to DiSPATCH new JOB #{new_job.inspect}") async.delegate_job(new_job, job) unless job.rolling_back? end
get_job_status(job)
click to toggle source
lookup status of job by asking actor running it
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 227 def get_job_status(job) status = nil if job.present? if job.is_a?(CapistranoMulticonfigParallel::Job) actor = @job_to_worker[job.id] status = actor.job_status else actor = @job_to_worker[job] status = actor.job_status end end status end
get_worker_for_job(job)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 199 def get_worker_for_job(job) if job.present? if job.is_a?(CapistranoMulticonfigParallel::Job) @job_to_worker[job.id] else @job_to_worker[job] end else return nil end end
mark_completed_remaining_tasks(job)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 128 def mark_completed_remaining_tasks(job) return unless apply_confirmation_for_job(job) configuration.task_confirmations.each_with_index do |task, _index| fake_result = proc { |sum| sum } task_confirmation = @job_to_condition[job.id][task] next unless task_confirmation[:status] != 'confirmed' log_to_file("worker #{job.id} with action #{job.action} status #{job.status} and exit status #{job.exit_status} tries to mark fake the task #{task} with status #{task_confirmation[:status]}") task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(fake_result) end end
print_confirm_task_approvall(result, task, job)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 167 def print_confirm_task_approvall(result, task, job) return if result.is_a?(Proc) message = "Do you want to continue the deployment and execute #{task.upcase}" message += " for JOB #{job.id}" if job.present? message += '?' if Celluloid::Actor[:terminal_server].present? && Celluloid::Actor[:terminal_server].alive? apps_symlink_confirmation = Celluloid::Actor[:terminal_server].show_confirmation(message, 'Y/N') until apps_symlink_confirmation.present? sleep(0.1) # keep current thread alive end apps_symlink_confirmation else 'y' end end
process_jobs()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 73 def process_jobs @workers_terminated = Celluloid::Condition.new if syncronized_confirmation? @job_to_worker.pmap do |_job_id, worker| worker.async.start_task end wait_task_confirmations end terminal_show async.check_workers_done? condition = @workers_terminated.wait until condition.present? sleep(0.1) # keep current thread alive end log_to_file("all jobs have completed #{condition}") terminal_show end
register_worker_for_job(job, worker)
click to toggle source
call back from actor once it has received it's job actor should do this asap
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 59 def register_worker_for_job(job, worker) @job_to_worker[job.id] = worker @worker_to_job[worker.mailbox.address] = job log_to_file("worker #{worker.job_id} registed into manager") Actor.current.link worker worker.async.start_task if !syncronized_confirmation? || job.rolling_back? return unless syncronized_confirmation? @registration_complete = true if @job_manager.jobs.size == @jobs.size end
setup_worker_conditions(job)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 119 def setup_worker_conditions(job) return unless apply_confirmation_for_job(job) hash_conditions = {} configuration.task_confirmations.each do |task| hash_conditions[task] = { condition: Celluloid::Condition.new, status: 'unconfirmed' } end @job_to_condition[job.id] = hash_conditions end
start_bundler_supervision_if_needed()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 41 def start_bundler_supervision_if_needed return if configuration.check_app_bundler_dependencies.to_s.downcase != 'true' @bundler_workers = setup_pool_of_actor(@worker_supervisor, actor_name: :bundler_workers, type: CapistranoMulticonfigParallel::BundlerWorker, size: 10) Actor.current.link @bundler_workers setup_actor_supervision(@worker_supervisor, actor_name: :bundler_terminal_server, type: CapistranoMulticonfigParallel::BundlerTerminalTable, args: [Actor.current, @job_manager, configuration.fetch(:terminal, {})]) end
syncronized_confirmation?()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 111 def syncronized_confirmation? !can_tag_staging? end
terminal_show()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 102 def terminal_show Celluloid::Actor[:terminal_server].async.notify_time_change(CapistranoMulticonfigParallel::TerminalTable.topic, type: 'output') if Celluloid::Actor[:terminal_server].alive? end
wait_condition_for_task(job_id, task)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 148 def wait_condition_for_task(job_id, task) @job_to_condition[job_id][task][:condition].wait end
wait_task_confirmations()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 152 def wait_task_confirmations stage_apply = configuration.apply_stage_confirmation.include?(@job_manager.stage) return if !stage_apply || !syncronized_confirmation? configuration.task_confirmations.each_with_index do |task, _index| results = [] @jobs.pmap do |job_id, _job| result = wait_condition_for_task(job_id, task) results << result end if results.size == @jobs.size && !all_workers_finished? confirm_task_approval(results, task) end end end
wait_task_confirmations_worker(job)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 140 def wait_task_confirmations_worker(job) return if !apply_confirmation_for_job(job) || !syncronized_confirmation? configuration.task_confirmations.each_with_index do |task, _index| result = wait_condition_for_task(job.id, task) confirm_task_approval(result, task, job) end end
worker_died(worker, reason)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb, line 241 def worker_died(worker, reason) job = @worker_to_job[worker.mailbox.address] mailbox = worker.mailbox log_to_file("worker_died: worker job #{job.inspect} with mailbox #{mailbox.inspect} and #{mailbox.address.inspect} died for reason: #{reason}") return true if job.blank? || job.rolling_back? || job.action != 'deploy' #job.rollback_changes_to_application @worker_to_job.delete(mailbox.address) log_to_file("RESTARTING: worker job #{job.inspect} with mailbox #{mailbox.inspect} and #{mailbox.address.inspect} died for reason: #{reason}") dispatch_new_job(job, skip_env_options: true, action: 'deploy:rollback') end