class Plum::Connection

Constants

CLIENT_CONNECTION_PREFACE
DEFAULT_SETTINGS

Attributes

hpack_decoder[R]
hpack_encoder[R]
local_settings[R]
remote_settings[R]
state[R]
streams[R]

Public Class Methods

new(writer, local_settings = {}) click to toggle source
# File lib/plum/connection.rb, line 24
def initialize(writer, local_settings = {})
  @state = :open
  @writer = writer
  @local_settings = Hash.new {|hash, key| DEFAULT_SETTINGS[key] }.merge!(local_settings)
  @remote_settings = Hash.new {|hash, key| DEFAULT_SETTINGS[key] }
  @buffer = String.new
  @streams = {}
  @hpack_decoder = HPACK::Decoder.new(@local_settings[:header_table_size])
  @hpack_encoder = HPACK::Encoder.new(@remote_settings[:header_table_size])
  initialize_flow_control(send: @remote_settings[:initial_window_size],
                          recv: @local_settings[:initial_window_size])
  @max_stream_ids = [0, -1] # [even, odd]
end

Public Instance Methods

<<(new_data)
Alias for: receive
close() click to toggle source

Emits :close event. Doesn’t actually close socket.

# File lib/plum/connection.rb, line 39
def close
  return if @state == :closed
  @state = :closed
  # TODO: server MAY wait streams
  callback(:close)
end
receive(new_data) click to toggle source

Receives the specified data and process. @param new_data [String] The data received from the peer.

# File lib/plum/connection.rb, line 48
def receive(new_data)
  return if @state == :closed
  return if new_data.empty?
  @buffer << new_data
  consume_buffer
rescue RemoteConnectionError => e
  callback(:connection_error, e)
  goaway(e.http2_error_type)
  close
end
Also aliased as: <<
stream(stream_id, update_max_id = true) click to toggle source

Returns a Stream object with the specified ID. @param stream_id [Integer] the stream id @return [Stream] the stream

# File lib/plum/connection.rb, line 63
def stream(stream_id, update_max_id = true)
  raise ArgumentError, "stream_id can't be 0" if stream_id == 0

  stream = @streams[stream_id]
  if stream
    if stream.state == :idle && stream_id < @max_stream_ids[stream_id % 2]
      stream.set_state(:closed_implicitly)
    end
  elsif stream_id > @max_stream_ids[stream_id % 2]
    @max_stream_ids[stream_id % 2] = stream_id if update_max_id
    stream = Stream.new(self, stream_id, state: :idle)
    callback(:stream, stream)
    @streams[stream_id] = stream
  else
    stream = Stream.new(self, stream_id, state: :closed_implicitly)
    callback(:stream, stream)
  end

  stream
end

Private Instance Methods

apply_remote_settings(old_remote_settings) click to toggle source
# File lib/plum/connection.rb, line 181
def apply_remote_settings(old_remote_settings)
  @hpack_encoder.limit = @remote_settings[:header_table_size]
  update_send_initial_window_size(@remote_settings[:initial_window_size] - old_remote_settings[:initial_window_size])
end
consume_buffer() click to toggle source
# File lib/plum/connection.rb, line 85
def consume_buffer
  while frame = Frame.parse!(@buffer)
    callback(:frame, frame)
    receive_frame(frame)
  end
end
receive_control_frame(frame) click to toggle source
# File lib/plum/connection.rb, line 137
def receive_control_frame(frame)
  if frame.length > @local_settings[:max_frame_size]
    raise RemoteConnectionError.new(:frame_size_error)
  end

  case frame.type
  when :settings
    receive_settings(frame)
  when :window_update
    receive_window_update(frame)
  when :ping
    receive_ping(frame)
  when :goaway
    receive_goaway(frame)
  when :data, :headers, :priority, :rst_stream, :push_promise, :continuation
    raise Plum::RemoteConnectionError.new(:protocol_error)
  else
    # MUST ignore unknown frame type.
  end
end
receive_frame(frame) click to toggle source
# File lib/plum/connection.rb, line 126
def receive_frame(frame)
  validate_received_frame(frame)
  consume_recv_window(frame)

  if frame.stream_id == 0
    receive_control_frame(frame)
  else
    stream(frame.stream_id, frame.type == :headers).receive_frame(frame)
  end
end
receive_goaway(frame) click to toggle source
# File lib/plum/connection.rb, line 198
def receive_goaway(frame)
  callback(:goaway, frame)
  goaway
  close

  last_id = frame.payload.uint32(0)
  error_code = frame.payload.uint32(4)
  message = frame.payload.byteslice(8, frame.length - 8)
  if error_code > 0
    raise LocalConnectionError.new(HTTPError::ERROR_CODES.key(error_code), message)
  end
end
receive_ping(frame) click to toggle source
# File lib/plum/connection.rb, line 186
def receive_ping(frame)
  raise Plum::RemoteConnectionError.new(:frame_size_error) if frame.length != 8

  if frame.ack?
    callback(:ping_ack)
  else
    opaque_data = frame.payload
    callback(:ping, opaque_data)
    send_immediately Frame.ping(:ack, opaque_data)
  end
end
receive_settings(frame, send_ack: true) click to toggle source
# File lib/plum/connection.rb, line 158
def receive_settings(frame, send_ack: true)
  if frame.ack?
    raise RemoteConnectionError.new(:frame_size_error) if frame.length != 0
    callback(:settings_ack)
    return
  else
    raise RemoteConnectionError.new(:frame_size_error) if frame.length % 6 != 0
  end

  old_remote_settings = @remote_settings.dup
  @remote_settings.merge!(frame.parse_settings)
  apply_remote_settings(old_remote_settings)

  callback(:remote_settings, @remote_settings, old_remote_settings)

  send_immediately Frame.settings(:ack) if send_ack

  if @state == :waiting_settings
    @state = :open
    callback(:negotiated)
  end
end
send_immediately(frame) click to toggle source
# File lib/plum/connection.rb, line 92
def send_immediately(frame)
  callback(:send_frame, frame)

  if frame.length <= @remote_settings[:max_frame_size]
    @writer.call(frame.assemble)
  else
    frame.split(@remote_settings[:max_frame_size]) { |splitted|
      @writer.call(splitted.assemble)
    }
  end
end
validate_received_frame(frame) click to toggle source
# File lib/plum/connection.rb, line 104
def validate_received_frame(frame)
  if @state == :waiting_settings && frame.type != :settings
    raise RemoteConnectionError.new(:protocol_error)
  end

  if @state == :waiting_continuation
    if frame.type != :continuation || frame.stream_id != @continuation_id
      raise RemoteConnectionError.new(:protocol_error)
    end
    if frame.end_headers?
      @state = :open
    end
  end

  if frame.type == :headers || frame.type == :push_promise
    if !frame.end_headers?
      @state = :waiting_continuation
      @continuation_id = frame.stream_id
    end
  end
end