class EventCore::MainLoop

Core data structure for handling and polling Sources.

Public Class Methods

new() click to toggle source
# File lib/event_core.rb, line 341
def initialize
  # We use a monitor, not a mutex, becuase Ruby mutexes are not reentrant,
  # and we need reentrancy to be able to add sources from within trigger callbacks
  @monitor = Monitor.new

  @monitor.synchronize {
    @sources = []
    @quit_handlers = []

    # Only ever set @do_quit through the quit() method!
    # Otherwise the state of the loop will be undefiend
    @do_quit = false
    @control_source = PipeSource.new
    @control_source.trigger { |event|
      # We can get multiple control messages in one event,
      # so generally it is a "string of control chars", hence the include? and not ==
      # If event is nil, it means the pipe has been closed
      @do_quit = true if event.nil? || event.include?('q')
    }
    @sources << @control_source

    @sigchld_source = nil
    @children = []

    @thread = nil
  }
end

Public Instance Methods

add_fiber(&block) click to toggle source

Schedule a block of code to be run inside a Ruby Fiber. If the block calls loop.yield without any argument the fiber will simply be resumed repeatedly in subsequent iterations of the loop, until it terminates. If loop.yield is called with a block it signals that the proc should be executed as an async task and the result of the task delivered as return value from loop.yield. The block supplied must take a single argument which is a FiberTask instance. When the task is complete you must call task.done to return to the yielded fiber. The (optional) argument you supply to task.done(result) will be passed back to the yielded fiber.

Example:

loop.add_fiber {
  puts 'Waiting for slow result...'
  slow_result = loop.yield { |task|
                  Thread.new { sleep 10; task.done('This took 10s') }
                }
  puts slow_result
}

Note: You can call this method from any thread. Since Ruby Fibers must be created from the same thread that runs them, EventCore will ensure the the fiber is created on the same thread as the main loop is running.

# File lib/event_core.rb, line 501
def add_fiber(&block)
  # Fibers must be created on the same thread that resumes them.
  # So if we're not on the main loop thread we get on it before
  # creating the fiber
  if Thread.current == @thread
    source = FiberSource.new(self, block)
    add_source(source)
  else
    source = FiberSource.new(self)
    add_once {
      source.create_fiber(block)
      add_source(source)
    }
    source
  end
end
add_idle(&block) click to toggle source

Add an idle callback to the loop. Will be removed like any other if it returns with ‘next false’. For one-off dispatches into the main loop, fx. for callbacks from another thread add_once() is even more convenient. Returns the source, so you can close!() it when no longer needed.

# File lib/event_core.rb, line 387
def add_idle(&block)
  source = IdleSource.new
  source.trigger { next false if block.call == false }
  add_source(source)
end
add_once(delay_secs=nil, &block) click to toggle source

Add an idle callback that is removed after its first invocation, no matter how it returns. Returns the source, for API consistency, but it is not really useful, as it will be auto-closed on next mainloop iteration.

# File lib/event_core.rb, line 397
def add_once(delay_secs=nil, &block)
  source = delay_secs.nil? ? IdleSource.new : TimeoutSource.new(delay_secs)
  source.trigger { block.call; next false  }
  add_source(source)
end
add_quit(&block) click to toggle source

Add a callback to invoke when the loop is quitting, before it becomes invalid. Sources added during the callback will not be invoked, but will be cleaned up.

# File lib/event_core.rb, line 554
def add_quit(&block)
  @monitor.synchronize {
    @quit_handlers << block
  }
end
add_read(io, &block) click to toggle source

Asynchronously read an IO calling the block each time data is ready. The block receives to arguments: the read buffer, and an exception. The read buffer will be nil when EOF has been reached in which case the IO will be closed and the source removed from the loop. Returns the source so you can cancel the read with source.close!

# File lib/event_core.rb, line 454
def add_read(io, &block)
  source = IOSource.new(io, :read)
  source.trigger {
    begin
      loop do
        buf = io.read_nonblock(4096*4) # 4 pages
        block.call(buf, nil)
      end
    rescue IO::WaitReadable
      # All good, wait until we're writable again
      next true
    rescue EOFError
      block.call(nil, nil)
      next false
    rescue => e
      block.call(nil, e)
      next false
    end
  }
  add_source(source)
end
add_source(source) click to toggle source

Add an event source to check in the loop. You can do this from any thread, or from trigger callbacks, or whenever you please. Returns the source, so you can close!() it when no longer needed.

# File lib/event_core.rb, line 372
def add_source(source)
  @monitor.synchronize {
    wakeup_needed = !@thread.nil? && @thread != Thread.current
    raise "Unable to add source - loop terminated" if @sources.nil?
    @sources << source
    send_wakeup if wakeup_needed
  }
  source
end
add_timeout(secs, &block) click to toggle source

Add a timeout function to be called periodically, or until it returns with ‘next false’. The timeout is in seconds and the first call is fired after it has elapsed. Returns the source, so you can close!() it when no longer needed.

# File lib/event_core.rb, line 406
def add_timeout(secs, &block)
  source = TimeoutSource.new(secs)
  source.trigger { next false if block.call == false }
  add_source(source)
end
add_unix_signal(*signals, &block) click to toggle source

Add a unix signal handler that is dispatched in the main loop. The handler will receive an array of signal numbers that was triggered since last step in the loop. You can provide one or more signals to listen for, given as integers or names. Returns the source, so you can close!() it when no longer needed.

# File lib/event_core.rb, line 417
def add_unix_signal(*signals, &block)
  source = UnixSignalSource.new(*signals)
  source.trigger { |signals|  next false if block.call(signals) == false }
  add_source(source)
end
add_write(io, buf, &block) click to toggle source

Asynchronously write buf to io. Invokes block when complete, giving any encountered exception as argument, nil on success. Returns the source so you can close! it to cancel.

# File lib/event_core.rb, line 426
def add_write(io, buf, &block)
  source = IOSource.new(io, :write)
  source.trigger {
    begin
      # Note: because of string encoding snafu, Ruby can report more bytes read than buf.length!
      len = io.write_nonblock(buf)
      if len == buf.bytesize
        block.call(nil) unless block.nil?
        next false
      end
      buf = buf.byteslice(len..-1)
      next true
    rescue IO::WaitWritable
      # All good, wait until we're writable again
      next true
    rescue => e
      block.call(e) unless block.nil?
      next false
    end
  }
  add_source(source)
end
quit() click to toggle source

Safe and clean shutdown of the loop. Note that the loop will only shut down on next iteration, not immediately.

# File lib/event_core.rb, line 597
def quit
  # Does not require locking. If any data comes through in what ever form,
  # we quit the loop
  send_control('q')
end
run() click to toggle source

Start the loop, and do not return before some calls quit(). When the loop returns (via quit) it will call close! on all sources.

# File lib/event_core.rb, line 605
def run
  @thread = Thread.current

  loop do
    step
    break if @do_quit
  end

  @monitor.synchronize {
    @quit_handlers.each { |block| block.call }

    @children.each { |child| Process.detach(child[:pid]) }
    @children = nil

    @sources.each { |source| source.close! }
    @sources = nil

    @control_source.close!

    @thread = nil
  }
end
running?() click to toggle source

Returns true iff a thread is currently iterating the loop with the run() method.

# File lib/event_core.rb, line 591
def running?
  !@thread.nil?
end
send_wakeup() click to toggle source

Expert: wake up the main loop, forcing it to check all sources. Useful if you’re twiddling readyness of sources “out of band”.

# File lib/event_core.rb, line 698
def send_wakeup
  send_control('.')
end
spawn(*args, &block) click to toggle source

Like Process.spawn(), invoking the given block in the main loop when the process child process exits. The block is called with the Process::Status object of the child.

WARNING: The main loop install a SIGCHLD handler to automatically wait() on processes started this way. So this function will not work correctly if you tamper with SIGCHLD yourself.

When you quit the loop any non-waited for children will be detached with Process.detach() to prevent zombies.

Returns the PID of the child (that you should /not/ wait() on).

# File lib/event_core.rb, line 572
def spawn(*args, &block)
  if @sigchld_source.nil?
    @sigchld_source = add_unix_signal("CHLD") {
      reap_children
    }
  end

  pid = Process.spawn(*args)
  @children << {:pid => pid, :block => block}
  pid
end
step() click to toggle source

Expert: Run a single iteration of the main loop.

# File lib/event_core.rb, line 629
def step
  # Collect sources
  ready_sources = []
  select_sources_by_ios = {}
  read_ios = []
  write_ios = []
  timeout = nil

  @monitor.synchronize {
    @sources.delete_if do |source|
      if source.closed?
        true
      else
        source_ready = source.ready?
        ready_sources << source if source_ready

        io = source.select_io
        unless io.nil? || io.closed?
          case source.select_type
            when :read
              read_ios << io
            when :write
              write_ios << io
            else
              raise "Invalid source select_type: #{source.select_type}"
          end

          select_sources_by_ios[io] = source
        end

        dt = source_ready ? 0 : source.timeout
        timeout = timeout.nil? ?
            dt : (dt.nil? ? timeout : (timeout < dt ? timeout : dt))

        false
      end
    end
  }

  # Release lock while we're selecting so users can add sources. add_source() will see
  # that we are stuck in a select() and do send_wakeup().
  # Note: Only select() without locking, everything else must be locked!
  read_ios, write_ios, exception_ios = IO.select(read_ios, write_ios, [], timeout)

  @monitor.synchronize {
    # On timeout read_ios will be nil
    unless read_ios.nil?
      read_ios.each { |io|
        ready_sources << select_sources_by_ios[io]
      }
    end

    unless write_ios.nil?
      write_ios.each { |io|
        ready_sources << select_sources_by_ios[io]
      }
    end
  }

  # Dispatch all sources marked ready
  ready_sources.each { |source|
    source.notify_trigger
  }

  @do_quit = true if @control_source.closed?
end
thread() click to toggle source

The Thread instance currently iterating the run() method. nil if the loop is not running

# File lib/event_core.rb, line 586
def thread
  @thread
end
yield(&block) click to toggle source

Must only be called from inside a fiber added with loop.add_fiber. Without arguments simply passes control back to the mainloop and resumes execution in next mainloop iteration. If passed a block, the block must take exactly one argument, which is a FiberTask. The block will be executed and the fiber scheduled for resumption when task.done is called. If an argument is passed to task.done then this will become the return value of the yield.

# File lib/event_core.rb, line 525
def yield(&block)
  raise "Blocks passed to loop.yield must have arity 1" unless block.nil? or block.arity == 1
  Fiber.yield block
end
yield_from_thread(&block) click to toggle source

Must only be called from inside a fiber added with loop.add_fiber. Convenience function on top of loop.yield, returning the result of a block run in a new thread. Unlike loop.yield the block must not take any arguments; it is simply the raw result from the block that is send back to the yielding fiber.

# File lib/event_core.rb, line 534
def yield_from_thread(&block)
  raise 'A block must be provided' if block.nil?
  raise "Block must take exactly 0 arguments: #{block.arity}" unless block.arity == 0

  self.yield do |task|
    thread = Thread.new {
      begin
        result = block.call
      ensure
        add_once {
          task.done(result)
          thread.join
        }
      end
    }
  end
end

Private Instance Methods

reap_children() click to toggle source
# File lib/event_core.rb, line 709
def reap_children
  # Waiting on pid -1, to reap any child would be tempting, but that could conflict
  # with other parts of code, not using EventCore, trying to wait() on those pids.
  # In stead we have to check each child explicitly spawned via loop.spawn(). This
  # is O(N) in the number of children, naturally, but I haven't found a better way
  # that is robust.
  @children.delete_if {|child|
    if Process.wait(child[:pid], Process::WNOHANG)
      status = $?
      child[:block].call(status) unless child[:block].nil?
      true
    else
      false
    end
  }
end
send_control(char) click to toggle source
# File lib/event_core.rb, line 703
def send_control(char)
  raise "Illegal control character '#{char}'" unless ['.', 'q'].include?(char)
  @control_source.write(char)
end