class Mmtrix::Agent::PipeChannelManager::Pipe

Expected initial sequence of events for Pipe usage:

  1. Pipe is created in parent process (read and write ends open)

  2. Parent process forks

  3. An after_fork hook is invoked in the child

  4. From after_fork hook, child closes read end of pipe, and writes a ready marker on the pipe (after_fork_in_child).

  5. The parent receives the ready marker, and closes the write end of the pipe in response (after_fork_in_parent).

After this sequence of steps, an exit (whether clean or not) of the child will result in the pipe being marked readable again, and giving an EOF marker (nil) when read. Note that closing of the unused ends of the pipe in the parent and child processes is essential in order for the EOF to be correctly triggered. The ready marker mechanism is used because there’s no easy hook for after_fork in the parent process.

This class provides message framing (separation of individual messages), but not serialization. Serialization / deserialization is the responsibility of clients.

Message framing works like this:

Each message sent across the pipe is preceded by a length tag that specifies the length of the message that immediately follows, in bytes. The length tags are serialized as unsigned big-endian long values, (4 bytes each). This means that the maximum theoretical message size is 4 GB - much larger than we’d ever need or want for this application.

Constants

NUM_LENGTH_BYTES
READY_MARKER

Attributes

in[RW]
last_read[R]
out[RW]
parent_pid[R]

Public Class Methods

new() click to toggle source
# File lib/mmtrix/agent/pipe_channel_manager.rb, line 65
def initialize
  @out, @in = IO.pipe
  if defined?(::Encoding::ASCII_8BIT)
    @in.set_encoding(::Encoding::ASCII_8BIT)
  end
  @last_read = Time.now
  @parent_pid = $$
end

Public Instance Methods

after_fork_in_child() click to toggle source
# File lib/mmtrix/agent/pipe_channel_manager.rb, line 116
def after_fork_in_child
  @out.close unless @out.closed?
  write(READY_MARKER)
end
after_fork_in_parent() click to toggle source
# File lib/mmtrix/agent/pipe_channel_manager.rb, line 121
def after_fork_in_parent
  @in.close unless @in.closed?
end
close() click to toggle source
# File lib/mmtrix/agent/pipe_channel_manager.rb, line 74
def close
  @out.close unless @out.closed?
  @in.close unless @in.closed?
end
closed?() click to toggle source
# File lib/mmtrix/agent/pipe_channel_manager.rb, line 125
def closed?
  @out.closed? && @in.closed?
end
deserialize_message_length(data) click to toggle source
# File lib/mmtrix/agent/pipe_channel_manager.rb, line 83
def deserialize_message_length(data)
  data.unpack("L>").first
end
eof?() click to toggle source
# File lib/mmtrix/agent/pipe_channel_manager.rb, line 112
def eof?
  !@out.closed? && @out.eof?
end
read() click to toggle source
# File lib/mmtrix/agent/pipe_channel_manager.rb, line 93
def read
  @in.close unless @in.closed?
  @last_read = Time.now
  length_bytes = @out.read(NUM_LENGTH_BYTES)
  if length_bytes
    message_length = deserialize_message_length(length_bytes)
    if message_length
      @out.read(message_length)
    else
      length_hex = length_bytes.bytes.map { |b| b.to_s(16) }.join(' ')
      Mmtrix::Agent.logger.error("Failed to deserialize message length from pipe. Bytes: [#{length_hex}]")
      nil
    end
  else
    Mmtrix::Agent.logger.error("Failed to read bytes for length from pipe.")
    nil
  end
end
serialize_message_length(data) click to toggle source
# File lib/mmtrix/agent/pipe_channel_manager.rb, line 79
def serialize_message_length(data)
  [data.bytesize].pack("L>")
end
write(data) click to toggle source
# File lib/mmtrix/agent/pipe_channel_manager.rb, line 87
def write(data)
  @out.close unless @out.closed?
  @in << serialize_message_length(data)
  @in << data
end