class Plum::Stream

Attributes

children[R]

The child (depending on this stream) streams.

connection[R]
exclusive[R]
id[R]
parent[RW]
state[R]
weight[R]

Public Class Methods

new(con, id, state: :idle, weight: 16, parent: nil, exclusive: false) click to toggle source
# File lib/plum/stream.rb, line 15
def initialize(con, id, state: :idle, weight: 16, parent: nil, exclusive: false)
  @connection = con
  @id = id
  @state = state
  @continuation = []
  @children = Set.new

  initialize_flow_control(send: @connection.remote_settings[:initial_window_size],
                          recv: @connection.local_settings[:initial_window_size])
  update_dependency(weight: weight, parent: parent, exclusive: exclusive)
end

Public Instance Methods

close() click to toggle source

Closes this stream. Sends RST_STREAM frame to the peer.

# File lib/plum/stream.rb, line 60
def close
  @state = :closed
  callback(:close)
end
receive_frame(frame) click to toggle source

Processes received frames for this stream. Internal use. @private

# File lib/plum/stream.rb, line 29
def receive_frame(frame)
  validate_received_frame(frame)
  consume_recv_window(frame)

  case frame.type
  when :data
    receive_data(frame)
  when :headers
    receive_headers(frame)
  when :priority
    receive_priority(frame)
  when :rst_stream
    receive_rst_stream(frame)
  when :window_update
    receive_window_update(frame)
  when :continuation
    receive_continuation(frame)
  when :push_promise
    receive_push_promise(frame)
  when :ping, :goaway, :settings
    raise RemoteConnectionError.new(:protocol_error) # stream_id MUST be 0x00
  else
    # MUST ignore unknown frame
  end
rescue RemoteStreamError => e
  callback(:stream_error, e)
  send_immediately Frame.rst_stream(id, e.http2_error_type)
  close
end
set_state(state) click to toggle source

@api private

# File lib/plum/stream.rb, line 66
def set_state(state)
  @state = state
end
update_dependency(weight: nil, parent: nil, exclusive: nil) click to toggle source

@api private

# File lib/plum/stream.rb, line 71
def update_dependency(weight: nil, parent: nil, exclusive: nil)
  raise RemoteStreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self

  if weight
    @weight = weight
  end

  if parent
    @parent = parent
    @parent.children << self
  end

  if exclusive != nil
    @exclusive = exclusive
    if @parent && exclusive
      @parent.children.to_a.each do |child|
        next if child == self
        @parent.children.delete(child)
        child.parent = self
        @children << child
      end
    end
  end
end

Private Instance Methods

callback(name, *args) click to toggle source

override EventEmitter

Calls superclass method Plum::EventEmitter#callback
# File lib/plum/stream.rb, line 243
def callback(name, *args)
  super(name, *args)
  @connection.callback(name, self, *args)
end
receive_complete_headers(frames) click to toggle source
# File lib/plum/stream.rb, line 134
def receive_complete_headers(frames)
  first = frames.shift
  payload = first.payload

  if first.padded?
    padding_length = payload.uint8
    payload = payload.byteslice(1, payload.bytesize - padding_length - 1)
  else
    padding_length = 0
    payload = payload.dup
  end

  if first.priority?
    receive_priority_payload(payload.byteshift(5))
  end

  if padding_length > payload.bytesize
    raise RemoteConnectionError.new(:protocol_error, "padding is too long")
  end

  frames.each do |frame|
    payload << frame.payload
  end

  begin
    decoded_headers = @connection.hpack_decoder.decode(payload)
  rescue => e
    raise RemoteConnectionError.new(:compression_error, e)
  end

  callback(:headers, decoded_headers)

  receive_end_stream if first.end_stream?
end
receive_continuation(frame) click to toggle source
# File lib/plum/stream.rb, line 204
def receive_continuation(frame)
  # state error mustn't happen: server_connection validates
  @continuation << frame

  if frame.end_headers?
    receive_complete_headers(@continuation)
    @continuation.clear
  end
end
receive_data(frame) click to toggle source
# File lib/plum/stream.rb, line 116
def receive_data(frame)
  if @state != :open && @state != :half_closed_local
    raise RemoteStreamError.new(:stream_closed)
  end

  if frame.padded?
    padding_length = frame.payload.uint8
    if padding_length >= frame.length
      raise RemoteConnectionError.new(:protocol_error, "padding is too long")
    end
    callback(:data, frame.payload.byteslice(1, frame.length - padding_length - 1))
  else
    callback(:data, frame.payload)
  end

  receive_end_stream if frame.end_stream?
end
receive_end_stream() click to toggle source
# File lib/plum/stream.rb, line 111
def receive_end_stream
  callback(:end_stream)
  @state = :half_closed_remote
end
receive_headers(frame) click to toggle source
# File lib/plum/stream.rb, line 169
def receive_headers(frame)
  if @state == :reserved_local
    raise RemoteConnectionError.new(:protocol_error)
  elsif @state == :half_closed_remote
    raise RemoteStreamError.new(:stream_closed)
  elsif @state == :closed
    raise RemoteConnectionError.new(:stream_closed)
  elsif @state == :closed_implicitly
    raise RemoteConnectionError.new(:protocol_error)
  elsif @state == :idle && self.id.even?
    raise RemoteConnectionError.new(:protocol_error)
  end

  @state = :open
  callback(:open)

  if frame.end_headers?
    receive_complete_headers([frame])
  else
    @continuation << frame
  end
end
receive_priority(frame) click to toggle source
# File lib/plum/stream.rb, line 214
def receive_priority(frame)
  if frame.length != 5
    raise RemoteStreamError.new(:frame_size_error)
  end
  receive_priority_payload(frame.payload)
end
receive_priority_payload(payload) click to toggle source
# File lib/plum/stream.rb, line 221
def receive_priority_payload(payload)
  esd = payload.uint32
  e = (esd >> 31) == 1
  dependency_id = esd & ~(1 << 31)
  weight = payload.uint8(4)

  update_dependency(weight: weight, parent: @connection.streams[dependency_id], exclusive: e)
end
receive_push_promise(frame) click to toggle source
# File lib/plum/stream.rb, line 192
def receive_push_promise(frame)
  raise NotImplementedError

  if promised_stream.state == :closed_implicitly
    # 5.1.1 An endpoint that receives an unexpected stream identifier MUST respond with a connection error of type PROTOCOL_ERROR.
    raise RemoteConnectionError.new(:protocol_error)
  elsif promised_id.odd?
    # 5.1.1 Streams initiated by the server MUST use even-numbered stream identifiers.
    raise RemoteConnectionError.new(:protocol_error)
  end
end
receive_rst_stream(frame) click to toggle source
# File lib/plum/stream.rb, line 230
def receive_rst_stream(frame)
  if frame.length != 4
    raise RemoteConnectionError.new(:frame_size_error)
  elsif @state == :idle
    raise RemoteConnectionError.new(:protocol_error)
  end
  @state = :closed # MUST NOT send RST_STREAM

  error_code = frame.payload.uint32
  callback(:rst_stream, HTTPError::ERROR_CODES.key(error_code))
end
send_immediately(frame) click to toggle source
# File lib/plum/stream.rb, line 97
def send_immediately(frame)
  @connection.send(frame)
end
validate_received_frame(frame) click to toggle source
# File lib/plum/stream.rb, line 101
def validate_received_frame(frame)
  if frame.length > @connection.local_settings[:max_frame_size]
    if [:headers, :push_promise, :continuation].include?(frame.type)
      raise RemoteConnectionError.new(:frame_size_error)
    else
      raise RemoteStreamError.new(:frame_size_error)
    end
  end
end