class Goru::Reactor

public

Attributes

status[R]
public

Public Class Methods

new(queue:, scheduler:) click to toggle source
# File lib/goru/reactor.rb, line 14
def initialize(queue:, scheduler:)
  @queue = queue
  @scheduler = scheduler
  @routines = []
  @finished = []
  @timers = Timers::Group.new
  @selector = NIO::Selector.new
  @status = nil
  @stopped = false
end

Public Instance Methods

run() click to toggle source
public
# File lib/goru/reactor.rb, line 31
def run
  @status = :running

  until @stopped
    @routines.each do |routine|
      call_routine(routine)
    end

    # TODO: Remove the monitor from the selector.
    #
    cleanup_finished_routines

    begin
      if (routine = @queue.pop(true))
        adopt_routine(routine)
      end
    rescue ThreadError
      interval = @timers.wait_interval

      if interval.nil?
        if @routines.empty?
          if @selector.empty?
            @status = :looking
            @scheduler.signal(self)
            if (routine = @queue.pop)
              adopt_routine(routine)
            end
          else
            # TODO: The issue doing this is that this reactor won't grab new routines. Will calling `@selector.wakeup`
            # from the scheduler when a routine is added to the queue resolve this?
            #
            @selector.select do |monitor|
              monitor.value.call
            end
          end
        else
          @selector.select(0) do |monitor|
            monitor.value.call
          end
        end
      elsif interval > 0
        if @selector.empty?
          Timers::Wait.for(interval) do |remaining|
            if (routine = @queue.pop_with_timeout(remaining))
              adopt_routine(routine)
              break
            end
          rescue ThreadError
            # nothing to do
          end
        else
          @selector.select(interval) do |monitor|
            monitor.value.call
          end
        end
      end

      @timers.fire
    end
  end
ensure
  @selector.close
  @status = :finished
end
sleep(routine, seconds) click to toggle source
public
# File lib/goru/reactor.rb, line 108
def sleep(routine, seconds)
  @timers.after(seconds) {
    routine.wake
  }
end
stop() click to toggle source
public
# File lib/goru/reactor.rb, line 98
def stop
  @stopped = true

  unless @selector.closed?
    @selector.wakeup
  end
end

Private Instance Methods

adopt_routine(routine) click to toggle source
# File lib/goru/reactor.rb, line 114
        def adopt_routine(routine)
  routine.reactor = self

  case routine
  when Routines::IO
    monitor = @selector.register(routine.io, routine.intent)

    monitor.value = proc {
      # TODO: Try to combine this with `call_routine` below.
      #
      case routine.status
      when :selecting
        routine.call
      else
        @finished << routine
      end
    }
  else
    @routines << routine
  end
end
call_routine(routine) click to toggle source
# File lib/goru/reactor.rb, line 136
        def call_routine(routine)
  case routine.status
  when :running
    routine.call
  when :sleeping, :selecting
    # ignore these
  else
    @finished << routine
  end
end
cleanup_finished_routines() click to toggle source
# File lib/goru/reactor.rb, line 147
        def cleanup_finished_routines
  until @finished.empty?
    @routines.delete(@finished.pop)
  end
end