class Async::HTTP::Protocol::HTTP2::Output

Attributes

trailer[R]

Public Class Methods

new(stream, body, trailer = nil) click to toggle source
# File lib/async/http/protocol/http2/output.rb, line 30
def initialize(stream, body, trailer = nil)
        @stream = stream
        @body = body
        @trailer = trailer
        
        @task = nil
        
        @window_updated = Async::Condition.new
end

Public Instance Methods

close(error = nil) click to toggle source

This method should only be called from within the context of the output task.

# File lib/async/http/protocol/http2/output.rb, line 71
def close(error = nil)
        if @stream
                @stream.finish_output(error)
                @stream = nil
        end
end
start(parent: Task.current) click to toggle source
# File lib/async/http/protocol/http2/output.rb, line 42
def start(parent: Task.current)
        raise "Task already started!" if @task
        
        if @body.stream?
                @task = parent.async(&self.method(:stream))
        else
                @task = parent.async(&self.method(:passthrough))
        end
end
stop(error) click to toggle source

This method should only be called from within the context of the HTTP/2 stream.

# File lib/async/http/protocol/http2/output.rb, line 79
def stop(error)
        @task&.stop
        @task = nil
end
window_updated(size) click to toggle source
# File lib/async/http/protocol/http2/output.rb, line 52
def window_updated(size)
        @window_updated.signal
end
write(chunk) click to toggle source
# File lib/async/http/protocol/http2/output.rb, line 56
def write(chunk)
        until chunk.empty?
                maximum_size = @stream.available_frame_size
                
                while maximum_size <= 0
                        @window_updated.wait
                        
                        maximum_size = @stream.available_frame_size
                end
                
                break unless chunk = send_data(chunk, maximum_size)
        end
end

Private Instance Methods

passthrough(task) click to toggle source

Reads chunks from the given body and writes them to the stream as fast as possible.

# File lib/async/http/protocol/http2/output.rb, line 97
def passthrough(task)
        task.annotate("Writing #{@body} to #{@stream}.")
        
        while chunk = @body&.read
                self.write(chunk)
                # TODO this reduces memory usage?
                # chunk.clear unless chunk.frozen?
                # GC.start
        end
        
        self.close
ensure
        @body&.close($!)
        @body = nil
end
send_data(chunk, maximum_size) click to toggle source

Send `maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk. @param maximum_size [Integer] send up to this many bytes of data. @param stream [Stream] the stream to use for sending data frames. @return [String, nil] any data that could not be written.

# File lib/async/http/protocol/http2/output.rb, line 117
def send_data(chunk, maximum_size)
        if chunk.bytesize <= maximum_size
                @stream.send_data(chunk, maximum_size: maximum_size)
        else
                @stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size)
                
                # The window was not big enough to send all the data, so we save it for next time:
                return chunk.byteslice(maximum_size, chunk.bytesize - maximum_size)
        end
        
        return nil
end
stream(task) click to toggle source
# File lib/async/http/protocol/http2/output.rb, line 86
def stream(task)
        task.annotate("Streaming #{@body} to #{@stream}.")
        
        input = @stream.wait_for_input
        
        @body.call(Body::Stream.new(input, self))
rescue Async::Stop
        # Ignore.
end