class RedditGet::Scheduler
Attributes
readable[RW]
Public Class Methods
new()
click to toggle source
# File lib/scheduler.rb, line 18 def initialize @readable = {} @writable = {} @waiting = {} @closed = false @lock = Mutex.new @blocking = 0 @ready = [] @urgent = IO.pipe end
Public Instance Methods
block(_blocker, timeout = nil)
click to toggle source
Used when blocking on synchronization (Mutex#lock, Queue#pop, SizedQueue#push, …)
# File lib/scheduler.rb, line 144 def block(_blocker, timeout = nil) # $stderr.puts [__method__, blocker, timeout].inspect if timeout @waiting[Fiber.current] = current_time + timeout begin Fiber.yield ensure # Remove from @waiting in the case #unblock was called before the timeout expired: @waiting.delete(Fiber.current) end else @blocking += 1 begin Fiber.yield ensure @blocking -= 1 end end end
close()
click to toggle source
# File lib/scheduler.rb, line 98 def close raise 'Scheduler already closed!' if @closed run ensure @urgent.each(&:close) @urgent = nil @closed = true # We freeze to detect any unintended modifications after the scheduler is closed: freeze end
closed?()
click to toggle source
# File lib/scheduler.rb, line 112 def closed? @closed end
current_time()
click to toggle source
# File lib/scheduler.rb, line 116 def current_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end
fiber(&block)
click to toggle source
# File lib/scheduler.rb, line 178 def fiber(&block) fiber = Fiber.new(blocking: false, &block) fiber.resume fiber end
io_wait(io, events, _duration)
click to toggle source
# File lib/scheduler.rb, line 127 def io_wait(io, events, _duration) @readable[io] = Fiber.current unless (events & IO::READABLE).zero? @writable[io] = Fiber.current unless (events & IO::WRITABLE).zero? Fiber.yield events end
kernel_sleep(duration = nil)
click to toggle source
Used for Kernel#sleep and Mutex#sleep
# File lib/scheduler.rb, line 137 def kernel_sleep(duration = nil) block(:sleep, duration) true end
next_timeout()
click to toggle source
# File lib/scheduler.rb, line 34 def next_timeout _fiber, timeout = @waiting.min_by { |_key, value| value } if timeout offset = timeout - current_time if offset.negative? 0 else offset end end end
process_wait(pid, flags)
click to toggle source
# File lib/scheduler.rb, line 120 def process_wait(pid, flags) # This is a very simple way to implement a non-blocking wait: Thread.new do Process::Status.wait(pid, flags) end.value end
run()
click to toggle source
# File lib/scheduler.rb, line 48 def run while @readable.any? || @writable.any? || @waiting.any? || @blocking.positive? # Can only handle file descriptors up to 1024... readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout) # puts "readable: #{readable}" if readable&.any? # puts "writable: #{writable}" if writable&.any? readable&.each do |io| if fiber = @readable.delete(io) fiber.resume elsif io == @urgent.first @urgent.first.read_nonblock(1024) end end writable&.each do |io| if fiber = @writable.delete(io) fiber.resume end end if @waiting.any? time = current_time waiting = @waiting @waiting = {} waiting.each do |fiber, timeout| if timeout <= time fiber.resume else @waiting[fiber] = timeout end end end next unless @ready.any? ready = nil @lock.synchronize do ready = @ready @ready = [] end ready.each(&:resume) end end
unblock(_blocker, fiber)
click to toggle source
Used when synchronization wakes up a previously-blocked fiber (Mutex#unlock, Queue#push, …). This might be called from another thread.
# File lib/scheduler.rb, line 167 def unblock(_blocker, fiber) # $stderr.puts [__method__, blocker, fiber].inspect @lock.synchronize do @ready << fiber end io = @urgent.last io.write_nonblock('.') end