class Flatware::Sink::Server

Attributes

checkpoints[R]
completed_jobs[R]
formatter[R]
jobs[R]
queue[R]
sink[R]
workers[R]

Public Class Methods

new(jobs:, formatter:, sink:, worker_count: 0, **) click to toggle source
# File lib/flatware/sink.rb, line 24
def initialize(jobs:, formatter:, sink:, worker_count: 0, **)
  @checkpoints = []
  @completed_jobs = []
  @formatter = formatter
  @interrupted = false
  @jobs = group_jobs(jobs, worker_count).freeze
  @queue = @jobs.dup
  @sink = sink
  @workers = Set.new(worker_count.times.to_a)
end

Public Instance Methods

checkpoint(checkpoint) click to toggle source
# File lib/flatware/sink.rb, line 55
def checkpoint(checkpoint)
  checkpoints << checkpoint
end
finished(job) click to toggle source
# File lib/flatware/sink.rb, line 59
def finished(job)
  completed_jobs << job
  formatter.finished(job)
  check_finished!
end
method_missing(name, *args) click to toggle source
Calls superclass method
# File lib/flatware/sink.rb, line 65
def method_missing(name, *args)
  super unless formatter.respond_to?(name)
  Flatware.log(name, *args)
  formatter.send(name, *args)
end
ready(worker) click to toggle source
# File lib/flatware/sink.rb, line 43
def ready(worker)
  job = queue.shift
  if job && !(remaining_work.empty? || interrupted?)
    workers << worker
    job
  else
    workers.delete worker
    check_finished!
    Job.sentinel
  end
end
respond_to_missing?(name, include_all) click to toggle source
# File lib/flatware/sink.rb, line 71
def respond_to_missing?(name, include_all)
  formatter.respond_to?(name, include_all)
end
start() click to toggle source
# File lib/flatware/sink.rb, line 35
def start
  Signal.listen(formatter, &method(:on_interrupt))
  formatter.jobs jobs
  DRb.start_service(sink, self, verbose: Flatware.verbose?)
  DRb.thread.join
  !(failures? || interrupted?)
end

Private Instance Methods

check_finished!() click to toggle source
# File lib/flatware/sink.rb, line 86
def check_finished!
  return unless [workers, remaining_work].all?(&:empty?)

  DRb.stop_service
  summarize
end
failures?() click to toggle source
# File lib/flatware/sink.rb, line 93
def failures?
  checkpoints.any?(&:failures?) || completed_jobs.any?(&:failed?)
end
group_jobs(jobs, worker_count) click to toggle source
# File lib/flatware/sink.rb, line 112
def group_jobs(jobs, worker_count)
  return jobs unless worker_count > 1

  jobs
    .group_by
    .with_index { |_, i| i % worker_count }
    .values
    .map do |job_group|
    Job.new(job_group.map(&:id).flatten, jobs.first.args)
  end
end
interrupted?() click to toggle source
# File lib/flatware/sink.rb, line 82
def interrupted?
  @interrupted
end
on_interrupt() click to toggle source
# File lib/flatware/sink.rb, line 77
def on_interrupt
  @interrupted = true
  summarize_remaining
end
remaining_work() click to toggle source
# File lib/flatware/sink.rb, line 104
def remaining_work
  jobs - completed_jobs
end
summarize() click to toggle source
# File lib/flatware/sink.rb, line 108
def summarize
  formatter.summarize(checkpoints)
end
summarize_remaining() click to toggle source
# File lib/flatware/sink.rb, line 97
def summarize_remaining
  summarize
  return if remaining_work.empty?

  formatter.summarize_remaining remaining_work
end