class Isomorfeus::Speednode::AttachPipe

Constants

BUFFER_SIZE
CONNECTING_STATE
INSTANCES
PIPE_TIMEOUT
READING_STATE
WRITING_STATE

Public Class Methods

new(pipe_name, block) click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 64
def initialize(pipe_name, block)
  @run_block = block
  @full_pipe_name = "\\\\.\\pipe\\#{pipe_name}"
  @instances = 1
  @events = []
  @events_pointer = FFI::MemoryPointer.new(:uintptr_t, @instances + 1)
  @pipe = {}
end

Public Instance Methods

run() click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 73
def run
  @running = true
  create_instance
  while_loop
end
stop() click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 79
def stop
  @running = false
  STDERR.puts("DisconnectNamedPipe failed with #{GetLastError()}") if !DisconnectNamedPipe(@pipe[:instance])
end

Private Instance Methods

connect_to_new_client() click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 203
def connect_to_new_client
  pending_io = false
  @pipe[:request].clear
  @pipe[:reply].clear
  connected = ConnectNamedPipe(@pipe[:instance], @pipe[:overlap].to_ptr)
  last_error = GetLastError()
  raise "ConnectNamedPipe failed with #{last_error} - #{connected}" if connected != 0
  
  case last_error
  when ERROR_IO_PENDING
    pending_io = true
  when ERROR_PIPE_CONNECTED
    SetEvent(@pipe[:overlap][:hEvent])
  when ERROR_SUCCESS
    pending_io = true
  else
    raise "ConnectNamedPipe failed with error #{last_error}"
  end

  pending_io
end
create_instance() click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 86
def create_instance
  @events[0] = CreateEvent(nil, 1, 1, nil)
  raise "CreateEvent failed with #{GetLastError()}" unless @events[0]

  overlap = Overlapped.new
  overlap[:hEvent] = @events[0]

  @pipe = { overlap: overlap, instance: nil, request: FFI::Buffer.new(1, BUFFER_SIZE), bytes_read: 0, reply: FFI::Buffer.new(1, BUFFER_SIZE), bytes_to_write: 0, state: nil, pending_io: false }
  @pipe[:instance] = CreateNamedPipe(@full_pipe_name, 
                                    PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
                                    PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
                                    4,
                                    BUFFER_SIZE,
                                    BUFFER_SIZE,
                                    PIPE_TIMEOUT,
                                    nil)
  raise "CreateNamedPipe failed with #{GetLastError()}" if @pipe[:instance] == INVALID_HANDLE_VALUE
  @pipe[:pending_io] = connect_to_new_client
  @pipe[:state] = @pipe[:pending_io] ? CONNECTING_STATE : READING_STATE
  
  @events_pointer.write_array_of_ulong_long(@events)
  nil
end
disconnect_and_reconnect() click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 194
def disconnect_and_reconnect
  FlushFileBuffers(@pipe[:instance])
  STDERR.puts("DisconnectNamedPipe failed with #{GetLastError()}") if !DisconnectNamedPipe(@pipe[:instance])
    
  @pipe[:pending_io] = connect_to_new_client
  
  @pipe[:state] = @pipe[:pending_io] ? CONNECTING_STATE : READING_STATE
end
while_loop() click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 110
def while_loop
  while @running
    # this sleep gives other ruby threads a chance to run
    # ~10ms is a ruby thread time slice, so we choose something a bit larger
    # that ruby or the os is free to switch threads
    sleep 0.010 if @pipe[:state] != WRITING_STATE && @pipe[:state] != READING_STATE

    i = MsgWaitForMultipleObjects(@instances, @events_pointer, 0, 1, QS_ALLINPUT) if @pipe[:state] != WRITING_STATE

    if i > 0
      next
    end

    if @pipe[:pending_io]
      bytes_transferred = FFI::MemoryPointer.new(:ulong)
      success = GetOverlappedResult(@pipe[:instance], @pipe[:overlap], bytes_transferred, false)

      case @pipe[:state]
      when CONNECTING_STATE
        raise "Error #{GetLastError()}" unless success
        @pipe[:state] = READING_STATE
      when READING_STATE
        if !success || bytes_transferred.read_ulong == 0
          disconnect_and_reconnect(i)
          next
        else
          @pipe[:bytes_read] = bytes_transferred.read_ulong
          @pipe[:state] = WRITING_STATE
        end
      when WRITING_STATE
        if !success || bytes_transferred.read_ulong != @pipe[:bytes_to_write]
          disconnect_and_reconnect(i)
          next
        else
          @pipe[:state] = READING_STATE
        end
      else
        raise "Invalid pipe state."
      end
    end

    case @pipe[:state]
    when READING_STATE
      bytes_read = FFI::MemoryPointer.new(:ulong)
      success = ReadFile(@pipe[:instance], @pipe[:request], BUFFER_SIZE, bytes_read, @pipe[:overlap].to_ptr)
      if success && bytes_read.read_ulong != 0
        @pipe[:pending_io] = false
        @pipe[:state] = WRITING_STATE
        next
      end

      err = GetLastError()
      if !success && err == ERROR_IO_PENDING
        @pipe[:pending_io] = true
        next
      end

      disconnect_and_reconnect
    when WRITING_STATE
      @pipe[:reply] = @run_block.call(@pipe[:request].get_string(0))
      @pipe[:bytes_to_write] = @pipe[:reply].bytesize
      bytes_written = FFI::MemoryPointer.new(:ulong)
      success = WriteFile(@pipe[:instance], @pipe[:reply], @pipe[:bytes_to_write], bytes_written, @pipe[:overlap].to_ptr)

      if success && bytes_written.read_ulong == @pipe[:bytes_to_write]
        @pipe[:pending_io] = false
        @pipe[:state] = READING_STATE
        next
      end

      err = GetLastError()

      if !success && err == ERROR_IO_PENDING
        @pipe[:pending_io] = true
        next
      end

      disconnect_and_reconnect
    else
      raise "Invalid pipe state."
    end
  end
end