class Plum::Rack::Session

Attributes

app[R]
plum[R]

Public Class Methods

new(svc, sock, plum) click to toggle source
# File lib/plum/rack/session.rb, line 10
def initialize(svc, sock, plum)
  @svc = svc
  @app = svc.app
  @sock = sock
  @plum = plum
  @logger = svc.logger
  @config = svc.config
  @remote_addr = sock.peeraddr.last
  @threadpool = svc.threadpool

  setup_plum
rescue Errno::ENOTCONN
  # TCP connection reset while doing TLS handshake
end

Public Instance Methods

run() click to toggle source
# File lib/plum/rack/session.rb, line 33
def run
  while !@sock.closed? && !@sock.eof?
    @plum << @sock.readpartial(1024)
  end
ensure
  stop
end
stop() click to toggle source
# File lib/plum/rack/session.rb, line 25
def stop
  @plum.close
end
to_io() click to toggle source
# File lib/plum/rack/session.rb, line 29
def to_io
  @sock.to_io
end

Private Instance Methods

check_window(stream) click to toggle source
# File lib/plum/rack/session.rb, line 84
def check_window(stream)
  ws = @plum.local_settings[:initial_window_size]
  stream.window_update(ws) if stream.recv_remaining_window < (ws / 2)
  @plum.window_update(ws) if @plum.recv_remaining_window < (ws / 2)
end
extract_headers(r_status, r_h) click to toggle source
# File lib/plum/rack/session.rb, line 203
def extract_headers(r_status, r_h)
  rbase = {
    ":status" => r_status,
    "server" => "plum/#{::Plum::VERSION}",
  }
  rext = {}

  r_h.each do |key, v_|
    if key.include?(".")
      rext[key] = v_
    else
      key = key.downcase

      if "set-cookie" == key
        rbase[key] = v_.gsub("\n", "; ") # RFC 7540 8.1.2.5
      elsif !INVALID_HEADERS.member?(key)
        rbase[key] = v_.tr("\n", ",") # RFC 7230 7
      end
    end
  end

  [rbase, rext]
end
extract_push(reqheaders, extheaders) click to toggle source
# File lib/plum/rack/session.rb, line 107
def extract_push(reqheaders, extheaders)
  pushs = extheaders["plum.serverpush"]
  return nil unless pushs

  authority = reqheaders.find { |k, v| k == ":authority" }[1]
  scheme = reqheaders.find { |k, v| k == ":scheme" }[1]

  pushs.split(";").map { |push|
    method, path = push.split(" ", 2)
    {
      ":authority" => authority,
      ":method" => method.upcase,
      ":scheme" => scheme,
      ":path" => path
    }
  }
end
handle_request(stream, headers, data) click to toggle source
# File lib/plum/rack/session.rb, line 125
def handle_request(stream, headers, data)
  env = new_env(headers, data)
  r_status, r_rawheaders, r_body = @app.call(env)
  r_headers, r_extheaders = extract_headers(r_status, r_rawheaders)
  if @config[:server_push] && @plum.push_enabled?
    push_preqs = extract_push(headers, r_extheaders)
  end

  no_body = r_body.respond_to?(:empty?) && r_body.empty?

  stream.send_headers(r_headers, end_stream: no_body)

  if push_preqs
    push_preqs.map! { |preq|
      [stream.promise(preq), preq]
    }
  end

  send_body(stream, r_body) unless no_body

  if push_preqs
    push_preqs.each { |st, preq|
      penv = new_env(preq, "".b)
      p_status, p_h, p_body = @app.call(penv)
      p_headers, _ = extract_headers(p_status, p_h)
      pno_body = p_body.respond_to?(:empty?) && p_body.empty?
      st.send_headers(p_headers, end_stream: pno_body)
      send_body(st, p_body) unless pno_body
    }
  end
end
new_env(h, data) click to toggle source
# File lib/plum/rack/session.rb, line 157
def new_env(h, data)
  ebase = {
    "rack.version"      => ::Rack::VERSION,
    "rack.input"        => StringIO.new(data),
    "rack.errors"       => $stderr,
    "rack.multithread"  => true,
    "rack.multiprocess" => false,
    "rack.run_once"     => false,
    "rack.hijack?"      => false,
    "SCRIPT_NAME"       => "",
    "REMOTE_ADDR"       => @remote_addr,
    "HTTP_VERSION"      => "HTTP/2.0", # Rack::CommonLogger uses
  }

  h.each { |k, v|
    case k
    when ":method"
      ebase["REQUEST_METHOD"] = v
    when ":path"
      cpath_name, cpath_query = v.split("?", 2)
      ebase["PATH_INFO"] = cpath_name
      ebase["QUERY_STRING"] = cpath_query || ""
    when ":authority"
      chost, cport = v.split(":", 2)
      ebase["SERVER_NAME"] = chost
      ebase["SERVER_PORT"] = cport || "443"
    when ":scheme"
      ebase["rack.url_scheme"] = v
    else
      unless k.start_with?(":") # ignore unknown pseudo-headers
        if k == "cookie" && ebase["HTTP_COOKIE"]
          if ebase["HTTP_COOKIE"].frozen?
            (ebase["HTTP_COOKIE"] += "; ") << v
          else
            ebase["HTTP_COOKIE"] << "; " << v
          end
        else
          ebase["HTTP_" + k.tr("-", "_").upcase!] = v
        end
      end
    end
  }

  ebase
end
send_body(stream, body) click to toggle source
# File lib/plum/rack/session.rb, line 90
def send_body(stream, body)
  begin
    if body.respond_to?(:to_str)
      stream.send_data(body, end_stream: true)
    elsif body.respond_to?(:readpartial) && body.respond_to?(:eof?)
      until body.eof?
        stream.send_data(body.readpartial(65536), end_stream: body.eof?)
      end
    else
      body.each { |part| stream.send_data(part, end_stream: false) }
      stream.send_data(end_stream: true)
    end
  ensure
    body.close if body.respond_to?(:close)
  end
end
setup_plum() click to toggle source
# File lib/plum/rack/session.rb, line 42
def setup_plum
  @plum.on(:connection_error) { |ex| @logger.error(ex) }

  # @plum.on(:stream) { |stream| @logger.debug("new stream: #{stream}") }
  @plum.on(:stream_error) { |stream, ex|
    if [:cancel, :refused_stream].include?(ex.http2_error_type)
      @logger.debug("stream was cancelled: #{stream}")
    else
      @logger.error(ex)
    end
  }

  reqs = {}
  @plum.on(:headers) { |stream, h|
    reqs[stream] = { headers: h, data: String.new.force_encoding(Encoding::BINARY) }
  }

  @plum.on(:data) { |stream, d|
    reqs[stream][:data] << d # TODO: store to file?
    check_window(stream)
  }

  @plum.on(:end_stream) { |stream|
    req = reqs.delete(stream)
    err = proc { |err|
      stream.send_headers({ ":status" => 500 }, end_stream: true)
      @logger.error(err)
    }
    if @threadpool
      @threadpool.acquire(err) {
        handle_request(stream, req[:headers], req[:data])
      }
    else
      begin
        handle_request(stream, req[:headers], req[:data])
      rescue
        err.call($!)
      end
    end
  }
end