class Isomorfeus::Speednode::AttachPipe
Constants
- BUFFER_SIZE
- CONNECTING_STATE
- INSTANCES
- PIPE_TIMEOUT
- READING_STATE
- WRITING_STATE
Public Class Methods
new(pipe_name, block)
click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 64 def initialize(pipe_name, block) @run_block = block @full_pipe_name = "\\\\.\\pipe\\#{pipe_name}" @instances = 1 @events = [] @events_pointer = FFI::MemoryPointer.new(:uintptr_t, @instances + 1) @pipe = {} end
Public Instance Methods
run()
click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 73 def run @running = true create_instance while_loop end
stop()
click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 79 def stop @running = false STDERR.puts("DisconnectNamedPipe failed with #{GetLastError()}") if !DisconnectNamedPipe(@pipe[:instance]) end
Private Instance Methods
connect_to_new_client()
click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 203 def connect_to_new_client pending_io = false @pipe[:request].clear @pipe[:reply].clear connected = ConnectNamedPipe(@pipe[:instance], @pipe[:overlap].to_ptr) last_error = GetLastError() raise "ConnectNamedPipe failed with #{last_error} - #{connected}" if connected != 0 case last_error when ERROR_IO_PENDING pending_io = true when ERROR_PIPE_CONNECTED SetEvent(@pipe[:overlap][:hEvent]) when ERROR_SUCCESS pending_io = true else raise "ConnectNamedPipe failed with error #{last_error}" end pending_io end
create_instance()
click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 86 def create_instance @events[0] = CreateEvent(nil, 1, 1, nil) raise "CreateEvent failed with #{GetLastError()}" unless @events[0] overlap = Overlapped.new overlap[:hEvent] = @events[0] @pipe = { overlap: overlap, instance: nil, request: FFI::Buffer.new(1, BUFFER_SIZE), bytes_read: 0, reply: FFI::Buffer.new(1, BUFFER_SIZE), bytes_to_write: 0, state: nil, pending_io: false } @pipe[:instance] = CreateNamedPipe(@full_pipe_name, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, 4, BUFFER_SIZE, BUFFER_SIZE, PIPE_TIMEOUT, nil) raise "CreateNamedPipe failed with #{GetLastError()}" if @pipe[:instance] == INVALID_HANDLE_VALUE @pipe[:pending_io] = connect_to_new_client @pipe[:state] = @pipe[:pending_io] ? CONNECTING_STATE : READING_STATE @events_pointer.write_array_of_ulong_long(@events) nil end
disconnect_and_reconnect()
click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 194 def disconnect_and_reconnect FlushFileBuffers(@pipe[:instance]) STDERR.puts("DisconnectNamedPipe failed with #{GetLastError()}") if !DisconnectNamedPipe(@pipe[:instance]) @pipe[:pending_io] = connect_to_new_client @pipe[:state] = @pipe[:pending_io] ? CONNECTING_STATE : READING_STATE end
while_loop()
click to toggle source
# File lib/isomorfeus/speednode/attach_pipe.rb, line 110 def while_loop while @running # this sleep gives other ruby threads a chance to run # ~10ms is a ruby thread time slice, so we choose something a bit larger # that ruby or the os is free to switch threads sleep 0.010 if @pipe[:state] != WRITING_STATE && @pipe[:state] != READING_STATE i = MsgWaitForMultipleObjects(@instances, @events_pointer, 0, 1, QS_ALLINPUT) if @pipe[:state] != WRITING_STATE if i > 0 next end if @pipe[:pending_io] bytes_transferred = FFI::MemoryPointer.new(:ulong) success = GetOverlappedResult(@pipe[:instance], @pipe[:overlap], bytes_transferred, false) case @pipe[:state] when CONNECTING_STATE raise "Error #{GetLastError()}" unless success @pipe[:state] = READING_STATE when READING_STATE if !success || bytes_transferred.read_ulong == 0 disconnect_and_reconnect(i) next else @pipe[:bytes_read] = bytes_transferred.read_ulong @pipe[:state] = WRITING_STATE end when WRITING_STATE if !success || bytes_transferred.read_ulong != @pipe[:bytes_to_write] disconnect_and_reconnect(i) next else @pipe[:state] = READING_STATE end else raise "Invalid pipe state." end end case @pipe[:state] when READING_STATE bytes_read = FFI::MemoryPointer.new(:ulong) success = ReadFile(@pipe[:instance], @pipe[:request], BUFFER_SIZE, bytes_read, @pipe[:overlap].to_ptr) if success && bytes_read.read_ulong != 0 @pipe[:pending_io] = false @pipe[:state] = WRITING_STATE next end err = GetLastError() if !success && err == ERROR_IO_PENDING @pipe[:pending_io] = true next end disconnect_and_reconnect when WRITING_STATE @pipe[:reply] = @run_block.call(@pipe[:request].get_string(0)) @pipe[:bytes_to_write] = @pipe[:reply].bytesize bytes_written = FFI::MemoryPointer.new(:ulong) success = WriteFile(@pipe[:instance], @pipe[:reply], @pipe[:bytes_to_write], bytes_written, @pipe[:overlap].to_ptr) if success && bytes_written.read_ulong == @pipe[:bytes_to_write] @pipe[:pending_io] = false @pipe[:state] = READING_STATE next end err = GetLastError() if !success && err == ERROR_IO_PENDING @pipe[:pending_io] = true next end disconnect_and_reconnect else raise "Invalid pipe state." end end end