class Plum::Stream
Attributes
children[R]
The child (depending on this stream) streams.
connection[R]
exclusive[R]
id[R]
parent[RW]
state[R]
weight[R]
Public Class Methods
new(con, id, state: :idle, weight: 16, parent: nil, exclusive: false)
click to toggle source
# File lib/plum/stream.rb, line 15 def initialize(con, id, state: :idle, weight: 16, parent: nil, exclusive: false) @connection = con @id = id @state = state @continuation = [] @children = Set.new initialize_flow_control(send: @connection.remote_settings[:initial_window_size], recv: @connection.local_settings[:initial_window_size]) update_dependency(weight: weight, parent: parent, exclusive: exclusive) end
Public Instance Methods
close()
click to toggle source
Closes this stream. Sends RST_STREAM frame to the peer.
# File lib/plum/stream.rb, line 60 def close @state = :closed callback(:close) end
receive_frame(frame)
click to toggle source
Processes received frames for this stream. Internal use. @private
# File lib/plum/stream.rb, line 29 def receive_frame(frame) validate_received_frame(frame) consume_recv_window(frame) case frame.type when :data receive_data(frame) when :headers receive_headers(frame) when :priority receive_priority(frame) when :rst_stream receive_rst_stream(frame) when :window_update receive_window_update(frame) when :continuation receive_continuation(frame) when :push_promise receive_push_promise(frame) when :ping, :goaway, :settings raise RemoteConnectionError.new(:protocol_error) # stream_id MUST be 0x00 else # MUST ignore unknown frame end rescue RemoteStreamError => e callback(:stream_error, e) send_immediately Frame.rst_stream(id, e.http2_error_type) close end
set_state(state)
click to toggle source
@api private
# File lib/plum/stream.rb, line 66 def set_state(state) @state = state end
update_dependency(weight: nil, parent: nil, exclusive: nil)
click to toggle source
@api private
# File lib/plum/stream.rb, line 71 def update_dependency(weight: nil, parent: nil, exclusive: nil) raise RemoteStreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self if weight @weight = weight end if parent @parent = parent @parent.children << self end if exclusive != nil @exclusive = exclusive if @parent && exclusive @parent.children.to_a.each do |child| next if child == self @parent.children.delete(child) child.parent = self @children << child end end end end
Private Instance Methods
callback(name, *args)
click to toggle source
override EventEmitter
Calls superclass method
Plum::EventEmitter#callback
# File lib/plum/stream.rb, line 243 def callback(name, *args) super(name, *args) @connection.callback(name, self, *args) end
receive_complete_headers(frames)
click to toggle source
# File lib/plum/stream.rb, line 134 def receive_complete_headers(frames) first = frames.shift payload = first.payload if first.padded? padding_length = payload.uint8 payload = payload.byteslice(1, payload.bytesize - padding_length - 1) else padding_length = 0 payload = payload.dup end if first.priority? receive_priority_payload(payload.byteshift(5)) end if padding_length > payload.bytesize raise RemoteConnectionError.new(:protocol_error, "padding is too long") end frames.each do |frame| payload << frame.payload end begin decoded_headers = @connection.hpack_decoder.decode(payload) rescue => e raise RemoteConnectionError.new(:compression_error, e) end callback(:headers, decoded_headers) receive_end_stream if first.end_stream? end
receive_continuation(frame)
click to toggle source
# File lib/plum/stream.rb, line 204 def receive_continuation(frame) # state error mustn't happen: server_connection validates @continuation << frame if frame.end_headers? receive_complete_headers(@continuation) @continuation.clear end end
receive_data(frame)
click to toggle source
# File lib/plum/stream.rb, line 116 def receive_data(frame) if @state != :open && @state != :half_closed_local raise RemoteStreamError.new(:stream_closed) end if frame.padded? padding_length = frame.payload.uint8 if padding_length >= frame.length raise RemoteConnectionError.new(:protocol_error, "padding is too long") end callback(:data, frame.payload.byteslice(1, frame.length - padding_length - 1)) else callback(:data, frame.payload) end receive_end_stream if frame.end_stream? end
receive_end_stream()
click to toggle source
# File lib/plum/stream.rb, line 111 def receive_end_stream callback(:end_stream) @state = :half_closed_remote end
receive_headers(frame)
click to toggle source
# File lib/plum/stream.rb, line 169 def receive_headers(frame) if @state == :reserved_local raise RemoteConnectionError.new(:protocol_error) elsif @state == :half_closed_remote raise RemoteStreamError.new(:stream_closed) elsif @state == :closed raise RemoteConnectionError.new(:stream_closed) elsif @state == :closed_implicitly raise RemoteConnectionError.new(:protocol_error) elsif @state == :idle && self.id.even? raise RemoteConnectionError.new(:protocol_error) end @state = :open callback(:open) if frame.end_headers? receive_complete_headers([frame]) else @continuation << frame end end
receive_priority(frame)
click to toggle source
# File lib/plum/stream.rb, line 214 def receive_priority(frame) if frame.length != 5 raise RemoteStreamError.new(:frame_size_error) end receive_priority_payload(frame.payload) end
receive_priority_payload(payload)
click to toggle source
# File lib/plum/stream.rb, line 221 def receive_priority_payload(payload) esd = payload.uint32 e = (esd >> 31) == 1 dependency_id = esd & ~(1 << 31) weight = payload.uint8(4) update_dependency(weight: weight, parent: @connection.streams[dependency_id], exclusive: e) end
receive_push_promise(frame)
click to toggle source
# File lib/plum/stream.rb, line 192 def receive_push_promise(frame) raise NotImplementedError if promised_stream.state == :closed_implicitly # 5.1.1 An endpoint that receives an unexpected stream identifier MUST respond with a connection error of type PROTOCOL_ERROR. raise RemoteConnectionError.new(:protocol_error) elsif promised_id.odd? # 5.1.1 Streams initiated by the server MUST use even-numbered stream identifiers. raise RemoteConnectionError.new(:protocol_error) end end
receive_rst_stream(frame)
click to toggle source
# File lib/plum/stream.rb, line 230 def receive_rst_stream(frame) if frame.length != 4 raise RemoteConnectionError.new(:frame_size_error) elsif @state == :idle raise RemoteConnectionError.new(:protocol_error) end @state = :closed # MUST NOT send RST_STREAM error_code = frame.payload.uint32 callback(:rst_stream, HTTPError::ERROR_CODES.key(error_code)) end
send_immediately(frame)
click to toggle source
# File lib/plum/stream.rb, line 97 def send_immediately(frame) @connection.send(frame) end
validate_received_frame(frame)
click to toggle source
# File lib/plum/stream.rb, line 101 def validate_received_frame(frame) if frame.length > @connection.local_settings[:max_frame_size] if [:headers, :push_promise, :continuation].include?(frame.type) raise RemoteConnectionError.new(:frame_size_error) else raise RemoteStreamError.new(:frame_size_error) end end end