class RedditGet::Scheduler

Attributes

readable[RW]

Public Class Methods

new() click to toggle source
# File lib/scheduler.rb, line 18
def initialize
  @readable = {}
  @writable = {}
  @waiting = {}

  @closed = false

  @lock = Mutex.new
  @blocking = 0
  @ready = []

  @urgent = IO.pipe
end

Public Instance Methods

block(_blocker, timeout = nil) click to toggle source

Used when blocking on synchronization (Mutex#lock, Queue#pop, SizedQueue#push, …)

# File lib/scheduler.rb, line 144
def block(_blocker, timeout = nil)
  # $stderr.puts [__method__, blocker, timeout].inspect

  if timeout
    @waiting[Fiber.current] = current_time + timeout
    begin
      Fiber.yield
    ensure
      # Remove from @waiting in the case #unblock was called before the timeout expired:
      @waiting.delete(Fiber.current)
    end
  else
    @blocking += 1
    begin
      Fiber.yield
    ensure
      @blocking -= 1
    end
  end
end
close() click to toggle source
# File lib/scheduler.rb, line 98
def close
  raise 'Scheduler already closed!' if @closed

  run
ensure
  @urgent.each(&:close)
  @urgent = nil

  @closed = true

  # We freeze to detect any unintended modifications after the scheduler is closed:
  freeze
end
closed?() click to toggle source
# File lib/scheduler.rb, line 112
def closed?
  @closed
end
current_time() click to toggle source
# File lib/scheduler.rb, line 116
def current_time
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
fiber(&block) click to toggle source
# File lib/scheduler.rb, line 178
def fiber(&block)
  fiber = Fiber.new(blocking: false, &block)

  fiber.resume

  fiber
end
io_wait(io, events, _duration) click to toggle source
# File lib/scheduler.rb, line 127
def io_wait(io, events, _duration)
  @readable[io] = Fiber.current unless (events & IO::READABLE).zero?

  @writable[io] = Fiber.current unless (events & IO::WRITABLE).zero?

  Fiber.yield
  events
end
kernel_sleep(duration = nil) click to toggle source

Used for Kernel#sleep and Mutex#sleep

# File lib/scheduler.rb, line 137
def kernel_sleep(duration = nil)
  block(:sleep, duration)

  true
end
next_timeout() click to toggle source
# File lib/scheduler.rb, line 34
def next_timeout
  _fiber, timeout = @waiting.min_by { |_key, value| value }

  if timeout
    offset = timeout - current_time

    if offset.negative?
      0
    else
      offset
    end
  end
end
process_wait(pid, flags) click to toggle source
# File lib/scheduler.rb, line 120
def process_wait(pid, flags)
  # This is a very simple way to implement a non-blocking wait:
  Thread.new do
    Process::Status.wait(pid, flags)
  end.value
end
run() click to toggle source
# File lib/scheduler.rb, line 48
def run
  while @readable.any? || @writable.any? || @waiting.any? || @blocking.positive?
    # Can only handle file descriptors up to 1024...
    readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [],
                                   next_timeout)

    # puts "readable: #{readable}" if readable&.any?
    # puts "writable: #{writable}" if writable&.any?

    readable&.each do |io|
      if fiber = @readable.delete(io)
        fiber.resume
      elsif io == @urgent.first
        @urgent.first.read_nonblock(1024)
      end
    end

    writable&.each do |io|
      if fiber = @writable.delete(io)
        fiber.resume
      end
    end

    if @waiting.any?
      time = current_time
      waiting = @waiting
      @waiting = {}

      waiting.each do |fiber, timeout|
        if timeout <= time
          fiber.resume
        else
          @waiting[fiber] = timeout
        end
      end
    end

    next unless @ready.any?

    ready = nil

    @lock.synchronize do
      ready = @ready
      @ready = []
    end

    ready.each(&:resume)
  end
end
unblock(_blocker, fiber) click to toggle source

Used when synchronization wakes up a previously-blocked fiber (Mutex#unlock, Queue#push, …). This might be called from another thread.

# File lib/scheduler.rb, line 167
def unblock(_blocker, fiber)
  # $stderr.puts [__method__, blocker, fiber].inspect

  @lock.synchronize do
    @ready << fiber
  end

  io = @urgent.last
  io.write_nonblock('.')
end