class Reactomatic::Reactor

Constants

SELECTOR_TIMEOUT

Public Class Methods

new(opts = {}) click to toggle source
# File lib/reactomatic/reactor.rb, line 5
def initialize(opts = {})
  @selector = NIO::Selector.new
  @selector_timeout = opts[:selector_timeout] || SELECTOR_TIMEOUT
  @run_lock = Mutex.new
  @next_tick_queue = Queue.new
end

Public Instance Methods

deregister(io) click to toggle source
# File lib/reactomatic/reactor.rb, line 42
def deregister(io)
  @selector.deregister(io)

  nil
end
next_tick(callback = nil, &block) click to toggle source
# File lib/reactomatic/reactor.rb, line 52
def next_tick(callback = nil, &block)
  func = callback || block

  @next_tick_queue.push(func)

  nil
end
register(io, interest, target) click to toggle source
# File lib/reactomatic/reactor.rb, line 35
def register(io, interest, target)
  monitor = @selector.register(io, interest)
  monitor.value = target

  nil
end
registered?(io) click to toggle source
# File lib/reactomatic/reactor.rb, line 48
def registered?(io)
  @selector.registered?(io)
end
schedule(callback = nil, &block) click to toggle source
# File lib/reactomatic/reactor.rb, line 60
def schedule(callback = nil, &block)
  func = callback || block

  if Thread.current == @thread
    func.call
  else
    next_tick(func)
  end

  nil
end
start(opts = {}) click to toggle source
# File lib/reactomatic/reactor.rb, line 12
def start(opts = {})
  @run_lock.synchronize do
    raise AlreadyStarted if @thread
    @thread = Thread.new do
      begin
        while !@selector.closed?
          process_next_tick_queue
          monitor_io_objects
        end
      rescue Exception => e
        exception_handler(e)
      end
    end
  end
end
stop() click to toggle source
# File lib/reactomatic/reactor.rb, line 28
def stop
  @run_lock.synchronize do
    @selector.close
    @thread.join
  end
end

Private Instance Methods

exception_handler(e) click to toggle source
# File lib/reactomatic/reactor.rb, line 90
def exception_handler(e)
  puts "EXCEPTION #{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}"
end
monitor_io_objects() click to toggle source
# File lib/reactomatic/reactor.rb, line 84
def monitor_io_objects
  @selector.select(SELECTOR_TIMEOUT) do |monitor|
    monitor.value.call(monitor)
  end
end
process_next_tick_queue() click to toggle source
# File lib/reactomatic/reactor.rb, line 74
def process_next_tick_queue
  @next_tick_queue.length.times do
    begin
      @next_tick_queue.pop.call
    rescue Exception => e
      exception_handler(e)
    end
  end
end