class Async::HTTP::Body::Pipe

Public Class Methods

new(input, output = Writable.new, task: Task.current) click to toggle source

If the input stream is closed first, it's likely the output stream will also be closed.

# File lib/async/http/body/pipe.rb, line 33
def initialize(input, output = Writable.new, task: Task.current)
        @input = input
        @output = output
        
        head, tail = IO::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)
        
        @head = IO::Stream.new(head)
        @tail = tail
        
        @reader = nil
        @writer = nil
        
        task.async(transient: true, &self.method(:reader))
        task.async(transient: true, &self.method(:writer))
end

Public Instance Methods

close() click to toggle source
# File lib/async/http/body/pipe.rb, line 53
def close
        @reader&.stop
        @writer&.stop
        
        @tail.close
end
to_io() click to toggle source
# File lib/async/http/body/pipe.rb, line 49
def to_io
        @tail
end

Private Instance Methods

close_head() click to toggle source
# File lib/async/http/body/pipe.rb, line 96
def close_head
        @head.close
        
        # Both tasks are done, don't keep references:
        @reader = nil
        @writer = nil
end
reader(task) click to toggle source

Read from the @input stream and write to the head of the pipe.

# File lib/async/http/body/pipe.rb, line 63
def reader(task)
        @reader = task
        
        task.annotate "#{self.class} reader."
        
        while chunk = @input.read
                @head.write(chunk)
                @head.flush
        end
        
        @head.close_write
ensure
        @input.close($!)
        
        close_head if @writer&.finished?
end
writer(task) click to toggle source

Read from the head of the pipe and write to the @output stream. If the @tail is closed, this will cause chunk to be nil, which in turn will call `@output.close` and `@head.close`

# File lib/async/http/body/pipe.rb, line 82
def writer(task)
        @writer = task
        
        task.annotate "#{self.class} writer."
        
        while chunk = @head.read_partial
                @output.write(chunk)
        end
ensure
        @output.close($!)
        
        close_head if @reader&.finished?
end