class Mutest::Parallel::Master

Master parallel worker

Constants

MAP

Public Class Methods

call(config) click to toggle source

Run master

@param [Config] config

@return [Actor::Sender]

# File lib/mutest/parallel/master.rb, line 14
def self.call(config)
  config.env.spawn do |mailbox|
    new(config, mailbox).__send__(:run)
  end
end
new(*) click to toggle source

Initialize object

@return [undefined]

Calls superclass method
# File lib/mutest/parallel/master.rb, line 23
def initialize(*)
  super

  @stop        = false
  @workers     = 0
  @active_jobs = Set.new
  @index       = 0
end

Private Instance Methods

handle(message) click to toggle source

Handle messages

@param [Actor::Message] message

@return [undefined]

# File lib/mutest/parallel/master.rb, line 66
def handle(message)
  type    = message.type
  payload = message.payload

  method =
    MAP.fetch(type) do
      raise Actor::ProtocolError, "Unexpected message: #{type.inspect}"
    end
  __send__(method, payload)
end
handle_ready(sender) click to toggle source

Handle ready worker

@param [Actor::Sender] sender

@return [undefined]

# File lib/mutest/parallel/master.rb, line 124
def handle_ready(sender)
  if stop_work?
    stop_worker(sender)
    return
  end

  sender.call(Actor::Message.new(:job, next_job))
end
handle_result(job_result) click to toggle source

Handle result

@param [JobResult] job_result

@return [undefined]

# File lib/mutest/parallel/master.rb, line 103
def handle_result(job_result)
  @active_jobs.delete(job_result.job)
  sink.result(job_result.payload)
end
handle_status(sender) click to toggle source

Handle status

@param [Actor::Sender] sender

@return [undefined]

# File lib/mutest/parallel/master.rb, line 89
def handle_status(sender)
  status = Status.new(
    payload:     sink.status,
    done:        sink.stop? || @workers.zero?,
    active_jobs: @active_jobs.dup.freeze
  )
  sender.call(Actor::Message.new(:status, status))
end
handle_stop(sender) click to toggle source

Handle stop

@param [Actor::Sender] sender

@return [undefined]

# File lib/mutest/parallel/master.rb, line 113
def handle_stop(sender)
  @stop = true
  receive_loop
  sender.call(Actor::Message.new(:stop))
end
next_job() click to toggle source

Next job if any

@return [Job]

if next job is available

@return [nil]

# File lib/mutest/parallel/master.rb, line 139
def next_job
  Job.new(
    index:   @index,
    payload: source.next
  ).tap do |job|
    @index += 1
    @active_jobs << job
  end
end
receive_loop() click to toggle source

Run receive loop

@return [undefined]

# File lib/mutest/parallel/master.rb, line 80
def receive_loop
  handle(mailbox.receiver.call) until @workers.zero? && @stop
end
run() click to toggle source

Run work loop

rubocop:disable MethodLength

@return [undefined]

# File lib/mutest/parallel/master.rb, line 39
def run
  config.jobs.times do
    @workers += 1
    config.env.spawn do |worker_mailbox|
      Worker.run(
        mailbox:   worker_mailbox,
        processor: config.processor,
        parent:    mailbox.sender
      )
    end
  end

  receive_loop
end
sink() click to toggle source

Job result sink

@return [Sink]

# File lib/mutest/parallel/master.rb, line 176
def sink
  config.sink
end
source() click to toggle source

Job source

@return [Source]

# File lib/mutest/parallel/master.rb, line 169
def source
  config.source
end
stop_work?() click to toggle source

Test if scheduling stopped

@return [Boolean]

# File lib/mutest/parallel/master.rb, line 162
def stop_work?
  @stop || !source.next? || sink.stop?
end
stop_worker(sender) click to toggle source

Stop worker

@param [Actor::Sender] sender

@return [undefined]

# File lib/mutest/parallel/master.rb, line 154
def stop_worker(sender)
  @workers -= 1
  sender.call(Actor::Message.new(:stop))
end