class CapistranoMulticonfigParallel::CelluloidWorker

worker that will spawn a child process in order to execute a capistrano job and monitor that process

@!attribute job

@return [Hash] options used for executing capistrano task
@option options [String] :id The id of the job ( will ge automatically generated by CapistranoMulticonfigParallel::CelluloidManager when delegating job)
@option options [String] :app The application name that will be deployed
@option options [String] :env The stage used for that application
@option options [String] :action The action that this action will be doing (deploy, or other task)
@option options [Hash] :env_options  options that are available  in the environment variable ENV when this task is going to be executed
@option options [Array] :task_arguments arguments to the task

@!attribute manager

@return [CapistranoMulticonfigParallel::CelluloidManager] the instance of the manager that delegated the job to this worker

Constants

ATTRIBUTE_LIST

Public Class Methods

new(*args) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 34
def initialize(*args)
end

Public Instance Methods

check_child_proces() click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 113
def check_child_proces
  @child_process = CapistranoMulticonfigParallel::ProcessRunner.new
  Actor.current.link @child_process
  @child_process
end
check_gitflow() click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 123
def check_gitflow
  return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK)
  mark_for_dispatching_new_job
  @manager.dispatch_new_job(@job, stage: 'production')
end
execute_after_succesfull_subscription() click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 92
def execute_after_succesfull_subscription
  async.execute_deploy
  @manager.async.wait_task_confirmations_worker(@job)
end
execute_deploy() click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 105
def execute_deploy
  log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}")
  check_child_proces
  command = job.fetch_deploy_command
  log_to_file("worker #{@job_id} executes: #{command}")
  @child_process.async.work(@job, command, actor: Actor.current, silent: true, process_sync: :async, runner_status_klass: CapistranoMulticonfigParallel::ChildProcessStatus)
end
executed_task?(task) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 153
def executed_task?(task)
  rake_tasks.present? && rake_tasks.index(task.to_s).present?
end
finish_worker(exit_status) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 189
def finish_worker(exit_status)
  log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}")
  @manager.mark_completed_remaining_tasks(@job) if Actor.current.alive?
  update_machine_state('FINISHED') if exit_status == 0
  @manager.workers_terminated.signal('completed') if !@job.marked_for_dispatching_new_job? && @manager.present? && @manager.alive? && @manager.all_workers_finished?
end
handle_subscription(message) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 129
def handle_subscription(message)
  if message_is_about_a_task?(message)
    check_gitflow
    save_tasks_to_be_executed(message)
    async.update_machine_state(message['task']) # if message['action'] == 'invoke'
    log_to_file("worker #{@job_id} state is #{@machine.state}")
    task_approval(message)
  elsif message_is_for_stdout?(message)
    result = Celluloid::Actor[:terminal_server].show_confirmation(message['question'], message['default'])
    publish_rake_event(message.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin'))
  elsif message_from_bundler?(message)

    #gem_messsage = job.gem_specs.find{|spec| message['task'].include?(spec.name) }
    # if gem_messsage.present?
    #     async.update_machine_state("insta")
    # else
    async.update_machine_state(message['task'])
    #end
  else
    log_to_file(message, job_id: @job_id)
  end
end
invocation_chain() click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 101
def invocation_chain
  @invocation_chain ||= []
end
notify_finished(exit_status, _runner_status) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 197
def notify_finished(exit_status, _runner_status)
  @job.mark_for_dispatching_new_job if exit_status != 0
  @job.exit_status = exit_status
  finish_worker(exit_status)
  return if exit_status == 0
  error_message = "worker #{@job_id} task  failed with exit status #{exit_status.inspect}  "
  raise(CapistranoMulticonfigParallel::TaskFailed.new(error_message), error_message)
end
on_close(code, reason) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 119
def on_close(code, reason)
  log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}")
end
on_message(message) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 82
def on_message(message)
  log_to_file("worker #{@job_id} received:  #{message.inspect}")
  if @socket_connection.succesfull_subscription?(message)
    @successfull_subscription = true
    execute_after_succesfull_subscription
  else
    handle_subscription(message)
  end
end
publish_rake_event(data) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 77
def publish_rake_event(data)
  log_to_file("worker #{@job_id} rties to publish into channel #{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id} data #{data.inspect}")
  @socket_connection.publish("#{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id}", data)
end
rake_tasks() click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 97
def rake_tasks
  @rake_tasks ||= []
end
save_tasks_to_be_executed(message) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 170
def save_tasks_to_be_executed(message)
  log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}")
  rake_tasks << message['task'] if rake_tasks.last != message['task']
  invocation_chain << message['task'] if invocation_chain.last != message['task']
end
send_msg(channel, message = nil) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 183
def send_msg(channel, message = nil)
  message = message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, message: message }
  log_to_file("worker #{@job_id} triest to send to #{channel} #{message}")
  publish channel, message
end
start_task() click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 69
def start_task
  log_to_file("exec worker #{@job_id} starts task and subscribes to #{@subscription_channel}")
  if @old_job.present? && @old_job.is_a?(CapistranoMulticonfigParallel::Job)
    @old_job.new_jobs_dispatched << @job.id
  end
  @socket_connection = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil))
end
state_colour() click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 61
def state_colour
  if job.status.to_s.downcase != 'dead' && Actor.current.alive?
    return :green
  else
    return :red
  end
end
task_approval(message) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 157
def task_approval(message)
  job_conditions = @manager.job_to_condition[@job_id]
  log_to_file("worker #{@job_id} checks if task : #{message['task'].inspect} is included in #{configuration.task_confirmations.inspect}")
  if job_conditions.present? && configuration.task_confirmations.include?(message['task']) && message['action'] == 'invoke'
    log_to_file("worker #{@job_id} signals approval for task : #{message['task'].inspect}")
    task_confirmation = job_conditions[message['task']]
    task_confirmation[:status] = 'confirmed'
    task_confirmation[:condition].signal(message['task'])
  else
    publish_rake_event(message.merge('approved' => 'yes'))
  end
end
update_machine_state(name, options = {}) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 176
def update_machine_state(name, options = {})
  log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to  #{name}") unless options[:bundler]
  @machine.go_to_transition(name.to_s, options)
  error_message = "worker #{@job_id} task #{name} failed "
  raise(CapistranoMulticonfigParallel::TaskFailed.new(error_message), error_message) if job.failed? # force worker to rollback
end
work(job, manager, old_job) click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 37
def work(job, manager, old_job)
  @job = job
  @old_job = old_job
  @job_id = job.id
  @worker_state = job.status
  @manager = manager
  @job_confirmation_conditions = []
  log_to_file("worker #{@job_id} received #{job.inspect} and #{old_job.inspect}")
  @subscription_channel = "#{CapistranoSentinel::RequestHooks::PUBLISHER_PREFIX}#{@job_id}"
  @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current)
  @manager.setup_worker_conditions(@job)
  manager.register_worker_for_job(job, Actor.current)
end
worker_state() click to toggle source
# File lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb, line 52
def worker_state
  if job.status.to_s.downcase != 'dead' && Actor.current.alive?
    @machine.state.to_s
  else
    job.status = 'dead'
    job.status.upcase
  end
end