class Flatware::Sink::Server
Attributes
checkpoints[R]
completed_jobs[R]
formatter[R]
jobs[R]
queue[R]
sink[R]
workers[R]
Public Class Methods
new(jobs:, formatter:, sink:, worker_count: 0, **)
click to toggle source
# File lib/flatware/sink.rb, line 24 def initialize(jobs:, formatter:, sink:, worker_count: 0, **) @checkpoints = [] @completed_jobs = [] @formatter = formatter @interrupted = false @jobs = group_jobs(jobs, worker_count).freeze @queue = @jobs.dup @sink = sink @workers = Set.new(worker_count.times.to_a) end
Public Instance Methods
checkpoint(checkpoint)
click to toggle source
# File lib/flatware/sink.rb, line 55 def checkpoint(checkpoint) checkpoints << checkpoint end
finished(job)
click to toggle source
# File lib/flatware/sink.rb, line 59 def finished(job) completed_jobs << job formatter.finished(job) check_finished! end
method_missing(name, *args)
click to toggle source
Calls superclass method
# File lib/flatware/sink.rb, line 65 def method_missing(name, *args) super unless formatter.respond_to?(name) Flatware.log(name, *args) formatter.send(name, *args) end
ready(worker)
click to toggle source
# File lib/flatware/sink.rb, line 43 def ready(worker) job = queue.shift if job && !(remaining_work.empty? || interrupted?) workers << worker job else workers.delete worker check_finished! Job.sentinel end end
respond_to_missing?(name, include_all)
click to toggle source
# File lib/flatware/sink.rb, line 71 def respond_to_missing?(name, include_all) formatter.respond_to?(name, include_all) end
start()
click to toggle source
# File lib/flatware/sink.rb, line 35 def start Signal.listen(formatter, &method(:on_interrupt)) formatter.jobs jobs DRb.start_service(sink, self, verbose: Flatware.verbose?) DRb.thread.join !(failures? || interrupted?) end
Private Instance Methods
check_finished!()
click to toggle source
# File lib/flatware/sink.rb, line 86 def check_finished! return unless [workers, remaining_work].all?(&:empty?) DRb.stop_service summarize end
failures?()
click to toggle source
# File lib/flatware/sink.rb, line 93 def failures? checkpoints.any?(&:failures?) || completed_jobs.any?(&:failed?) end
group_jobs(jobs, worker_count)
click to toggle source
# File lib/flatware/sink.rb, line 112 def group_jobs(jobs, worker_count) return jobs unless worker_count > 1 jobs .group_by .with_index { |_, i| i % worker_count } .values .map do |job_group| Job.new(job_group.map(&:id).flatten, jobs.first.args) end end
interrupted?()
click to toggle source
# File lib/flatware/sink.rb, line 82 def interrupted? @interrupted end
on_interrupt()
click to toggle source
# File lib/flatware/sink.rb, line 77 def on_interrupt @interrupted = true summarize_remaining end
remaining_work()
click to toggle source
# File lib/flatware/sink.rb, line 104 def remaining_work jobs - completed_jobs end
summarize()
click to toggle source
# File lib/flatware/sink.rb, line 108 def summarize formatter.summarize(checkpoints) end
summarize_remaining()
click to toggle source
# File lib/flatware/sink.rb, line 97 def summarize_remaining summarize return if remaining_work.empty? formatter.summarize_remaining remaining_work end