class CapistranoMulticonfigParallel::CelluloidWorker
worker that will spawn a child process in order to execute a capistrano job and monitor that process
@!attribute job
@return [Hash] options used for executing capistrano task @option options [String] :id The id of the job ( will ge automatically generated by CapistranoMulticonfigParallel::CelluloidManager when delegating job) @option options [String] :app The application name that will be deployed @option options [String] :env The stage used for that application @option options [String] :action The action that this action will be doing (deploy, or other task) @option options [Hash] :env_options options that are available in the environment variable ENV when this task is going to be executed @option options [Array] :task_arguments arguments to the task
@!attribute manager
@return [CapistranoMulticonfigParallel::CelluloidManager] the instance of the manager that delegated the job to this worker
Constants
- ATTRIBUTE_LIST
Public Class Methods
new(*args)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 34 def initialize(*args) end
Public Instance Methods
check_child_proces()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 113 def check_child_proces @child_process = CapistranoMulticonfigParallel::ProcessRunner.new Actor.current.link @child_process @child_process end
check_gitflow()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 123 def check_gitflow return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK) mark_for_dispatching_new_job @manager.dispatch_new_job(@job, stage: 'production') end
execute_after_succesfull_subscription()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 92 def execute_after_succesfull_subscription async.execute_deploy @manager.async.wait_task_confirmations_worker(@job) end
execute_deploy()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 105 def execute_deploy log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") check_child_proces command = job.fetch_deploy_command log_to_file("worker #{@job_id} executes: #{command}") @child_process.async.work(@job, command, actor: Actor.current, silent: true, process_sync: :async, runner_status_klass: CapistranoMulticonfigParallel::ChildProcessStatus) end
executed_task?(task)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 153 def executed_task?(task) rake_tasks.present? && rake_tasks.index(task.to_s).present? end
finish_worker(exit_status)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 189 def finish_worker(exit_status) log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}") @manager.mark_completed_remaining_tasks(@job) if Actor.current.alive? update_machine_state('FINISHED') if exit_status == 0 @manager.workers_terminated.signal('completed') if !@job.marked_for_dispatching_new_job? && @manager.present? && @manager.alive? && @manager.all_workers_finished? end
handle_subscription(message)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 129 def handle_subscription(message) if message_is_about_a_task?(message) check_gitflow save_tasks_to_be_executed(message) async.update_machine_state(message['task']) # if message['action'] == 'invoke' log_to_file("worker #{@job_id} state is #{@machine.state}") task_approval(message) elsif message_is_for_stdout?(message) result = Celluloid::Actor[:terminal_server].show_confirmation(message['question'], message['default']) publish_rake_event(message.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin')) elsif message_from_bundler?(message) #gem_messsage = job.gem_specs.find{|spec| message['task'].include?(spec.name) } # if gem_messsage.present? # async.update_machine_state("insta") # else async.update_machine_state(message['task']) #end else log_to_file(message, job_id: @job_id) end end
invocation_chain()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 101 def invocation_chain @invocation_chain ||= [] end
notify_finished(exit_status, _runner_status)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 197 def notify_finished(exit_status, _runner_status) @job.mark_for_dispatching_new_job if exit_status != 0 @job.exit_status = exit_status finish_worker(exit_status) return if exit_status == 0 error_message = "worker #{@job_id} task failed with exit status #{exit_status.inspect} " raise(CapistranoMulticonfigParallel::TaskFailed.new(error_message), error_message) end
on_close(code, reason)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 119 def on_close(code, reason) log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") end
on_message(message)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 82 def on_message(message) log_to_file("worker #{@job_id} received: #{message.inspect}") if @socket_connection.succesfull_subscription?(message) @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription(message) end end
publish_rake_event(data)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 77 def publish_rake_event(data) log_to_file("worker #{@job_id} rties to publish into channel #{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id} data #{data.inspect}") @socket_connection.publish("#{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id}", data) end
rake_tasks()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 97 def rake_tasks @rake_tasks ||= [] end
save_tasks_to_be_executed(message)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 170 def save_tasks_to_be_executed(message) log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}") rake_tasks << message['task'] if rake_tasks.last != message['task'] invocation_chain << message['task'] if invocation_chain.last != message['task'] end
send_msg(channel, message = nil)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 183 def send_msg(channel, message = nil) message = message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, message: message } log_to_file("worker #{@job_id} triest to send to #{channel} #{message}") publish channel, message end
start_task()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 69 def start_task log_to_file("exec worker #{@job_id} starts task and subscribes to #{@subscription_channel}") if @old_job.present? && @old_job.is_a?(CapistranoMulticonfigParallel::Job) @old_job.new_jobs_dispatched << @job.id end @socket_connection = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil)) end
state_colour()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 61 def state_colour if job.status.to_s.downcase != 'dead' && Actor.current.alive? return :green else return :red end end
task_approval(message)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 157 def task_approval(message) job_conditions = @manager.job_to_condition[@job_id] log_to_file("worker #{@job_id} checks if task : #{message['task'].inspect} is included in #{configuration.task_confirmations.inspect}") if job_conditions.present? && configuration.task_confirmations.include?(message['task']) && message['action'] == 'invoke' log_to_file("worker #{@job_id} signals approval for task : #{message['task'].inspect}") task_confirmation = job_conditions[message['task']] task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(message['task']) else publish_rake_event(message.merge('approved' => 'yes')) end end
update_machine_state(name, options = {})
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 176 def update_machine_state(name, options = {}) log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") unless options[:bundler] @machine.go_to_transition(name.to_s, options) error_message = "worker #{@job_id} task #{name} failed " raise(CapistranoMulticonfigParallel::TaskFailed.new(error_message), error_message) if job.failed? # force worker to rollback end
work(job, manager, old_job)
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 37 def work(job, manager, old_job) @job = job @old_job = old_job @job_id = job.id @worker_state = job.status @manager = manager @job_confirmation_conditions = [] log_to_file("worker #{@job_id} received #{job.inspect} and #{old_job.inspect}") @subscription_channel = "#{CapistranoSentinel::RequestHooks::PUBLISHER_PREFIX}#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current) @manager.setup_worker_conditions(@job) manager.register_worker_for_job(job, Actor.current) end
worker_state()
click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 52 def worker_state if job.status.to_s.downcase != 'dead' && Actor.current.alive? @machine.state.to_s else job.status = 'dead' job.status.upcase end end