mirror of https://github.com/k3s-io/k3s
Merge pull request #51168 from smarterclayton/fix_transport_wrap
Automatic merge from submit-queue (batch tested with PRs 50775, 51397, 51168, 51465, 51536) Allow bearer requests to be proxied by kubectl proxy Use a fake transport to capture changes to the request and then surface them back to the end user. Fixes #50466 @liggitt no tests yet, but works locallypull/6/head
commit
3019daa1dc
|
@ -158,7 +158,7 @@ func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
|
|||
|
||||
// makeUpgradeTransport creates a transport that explicitly bypasses HTTP2 support
|
||||
// for proxy connections that must upgrade.
|
||||
func makeUpgradeTransport(config *rest.Config) (http.RoundTripper, error) {
|
||||
func makeUpgradeTransport(config *rest.Config) (proxy.UpgradeRequestRoundTripper, error) {
|
||||
transportConfig, err := config.TransportConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -170,7 +170,11 @@ func makeUpgradeTransport(config *rest.Config) (http.RoundTripper, error) {
|
|||
rt := utilnet.SetOldTransportDefaults(&http.Transport{
|
||||
TLSClientConfig: tlsConfig,
|
||||
})
|
||||
return transport.HTTPWrappersForConfig(transportConfig, rt)
|
||||
upgrader, err := transport.HTTPWrappersForConfig(transportConfig, proxy.MirrorRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return proxy.NewUpgradeRequestRoundTripper(rt, upgrader), nil
|
||||
}
|
||||
|
||||
// NewServer creates and installs a new Server.
|
||||
|
|
|
@ -17,9 +17,11 @@ limitations under the License.
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
|
@ -37,6 +39,19 @@ import (
|
|||
"github.com/mxk/go-flowrate/flowrate"
|
||||
)
|
||||
|
||||
// UpgradeRequestRoundTripper provides an additional method to decorate a request
|
||||
// with any authentication or other protocol level information prior to performing
|
||||
// an upgrade on the server. Any response will be handled by the intercepting
|
||||
// proxy.
|
||||
type UpgradeRequestRoundTripper interface {
|
||||
http.RoundTripper
|
||||
// WrapRequest takes a valid HTTP request and returns a suitably altered version
|
||||
// of request with any HTTP level values required to complete the request half of
|
||||
// an upgrade on the server. It does not get a chance to see the response and
|
||||
// should bypass any request side logic that expects to see the response.
|
||||
WrapRequest(*http.Request) (*http.Request, error)
|
||||
}
|
||||
|
||||
// UpgradeAwareHandler is a handler for proxy requests that may require an upgrade
|
||||
type UpgradeAwareHandler struct {
|
||||
// UpgradeRequired will reject non-upgrade connections if true.
|
||||
|
@ -48,7 +63,7 @@ type UpgradeAwareHandler struct {
|
|||
Transport http.RoundTripper
|
||||
// UpgradeTransport, if specified, will be used as the backend transport when upgrade requests are provided.
|
||||
// This allows clients to disable HTTP/2.
|
||||
UpgradeTransport http.RoundTripper
|
||||
UpgradeTransport UpgradeRequestRoundTripper
|
||||
// WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting)
|
||||
WrapTransport bool
|
||||
// InterceptRedirects determines whether the proxy should sniff backend responses for redirects,
|
||||
|
@ -90,6 +105,60 @@ func (r simpleResponder) Error(w http.ResponseWriter, req *http.Request, err err
|
|||
r.responder.Error(err)
|
||||
}
|
||||
|
||||
// upgradeRequestRoundTripper implements proxy.UpgradeRequestRoundTripper.
|
||||
type upgradeRequestRoundTripper struct {
|
||||
http.RoundTripper
|
||||
upgrader http.RoundTripper
|
||||
}
|
||||
|
||||
var (
|
||||
_ UpgradeRequestRoundTripper = &upgradeRequestRoundTripper{}
|
||||
_ utilnet.RoundTripperWrapper = &upgradeRequestRoundTripper{}
|
||||
)
|
||||
|
||||
// WrappedRoundTripper returns the round tripper that a caller would use.
|
||||
func (rt *upgradeRequestRoundTripper) WrappedRoundTripper() http.RoundTripper {
|
||||
return rt.RoundTripper
|
||||
}
|
||||
|
||||
// WriteToRequest calls the nested upgrader and then copies the returned request
|
||||
// fields onto the passed request.
|
||||
func (rt *upgradeRequestRoundTripper) WrapRequest(req *http.Request) (*http.Request, error) {
|
||||
resp, err := rt.upgrader.RoundTrip(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp.Request, nil
|
||||
}
|
||||
|
||||
// onewayRoundTripper captures the provided request - which is assumed to have
|
||||
// been modified by other round trippers - and then returns a fake response.
|
||||
type onewayRoundTripper struct{}
|
||||
|
||||
// RoundTrip returns a simple 200 OK response that captures the provided request.
|
||||
func (onewayRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
return &http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: http.StatusOK,
|
||||
Body: ioutil.NopCloser(&bytes.Buffer{}),
|
||||
Request: req,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// MirrorRequest is a round tripper that can be called to get back the calling request as
|
||||
// the core round tripper in a chain.
|
||||
var MirrorRequest http.RoundTripper = onewayRoundTripper{}
|
||||
|
||||
// NewUpgradeRequestRoundTripper takes two round trippers - one for the underlying TCP connection, and
|
||||
// one that is able to write headers to an HTTP request. The request rt is used to set the request headers
|
||||
// and that is written to the underlying connection rt.
|
||||
func NewUpgradeRequestRoundTripper(connection, request http.RoundTripper) UpgradeRequestRoundTripper {
|
||||
return &upgradeRequestRoundTripper{
|
||||
RoundTripper: connection,
|
||||
upgrader: request,
|
||||
}
|
||||
}
|
||||
|
||||
// NewUpgradeAwareHandler creates a new proxy handler with a default flush interval. Responder is required for returning
|
||||
// errors to the caller.
|
||||
func NewUpgradeAwareHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder ErrorResponder) *UpgradeAwareHandler {
|
||||
|
@ -260,10 +329,14 @@ func (h *UpgradeAwareHandler) Dial(req *http.Request) (net.Conn, error) {
|
|||
}
|
||||
|
||||
func (h *UpgradeAwareHandler) DialForUpgrade(req *http.Request) (net.Conn, error) {
|
||||
if h.UpgradeTransport != nil {
|
||||
return dial(req, h.UpgradeTransport)
|
||||
if h.UpgradeTransport == nil {
|
||||
return dial(req, h.Transport)
|
||||
}
|
||||
return dial(req, h.Transport)
|
||||
updatedReq, err := h.UpgradeTransport.WrapRequest(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dial(updatedReq, h.UpgradeTransport)
|
||||
}
|
||||
|
||||
// dial dials the backend at req.URL and writes req to it.
|
||||
|
|
|
@ -333,6 +333,12 @@ func TestServeHTTP(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type RoundTripperFunc func(req *http.Request) (*http.Response, error)
|
||||
|
||||
func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
return fn(req)
|
||||
}
|
||||
|
||||
func TestProxyUpgrade(t *testing.T) {
|
||||
|
||||
localhostPool := x509.NewCertPool()
|
||||
|
@ -341,8 +347,10 @@ func TestProxyUpgrade(t *testing.T) {
|
|||
}
|
||||
|
||||
testcases := map[string]struct {
|
||||
ServerFunc func(http.Handler) *httptest.Server
|
||||
ProxyTransport http.RoundTripper
|
||||
ServerFunc func(http.Handler) *httptest.Server
|
||||
ProxyTransport http.RoundTripper
|
||||
UpgradeTransport UpgradeRequestRoundTripper
|
||||
ExpectedAuth string
|
||||
}{
|
||||
"http": {
|
||||
ServerFunc: httptest.NewServer,
|
||||
|
@ -393,6 +401,30 @@ func TestProxyUpgrade(t *testing.T) {
|
|||
},
|
||||
ProxyTransport: utilnet.SetTransportDefaults(&http.Transport{Dial: net.Dial, TLSClientConfig: &tls.Config{RootCAs: localhostPool}}),
|
||||
},
|
||||
"https (valid hostname + RootCAs + custom dialer + bearer token)": {
|
||||
ServerFunc: func(h http.Handler) *httptest.Server {
|
||||
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
|
||||
if err != nil {
|
||||
t.Errorf("https (valid hostname): proxy_test: %v", err)
|
||||
}
|
||||
ts := httptest.NewUnstartedServer(h)
|
||||
ts.TLS = &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
}
|
||||
ts.StartTLS()
|
||||
return ts
|
||||
},
|
||||
ProxyTransport: utilnet.SetTransportDefaults(&http.Transport{Dial: net.Dial, TLSClientConfig: &tls.Config{RootCAs: localhostPool}}),
|
||||
UpgradeTransport: NewUpgradeRequestRoundTripper(
|
||||
utilnet.SetOldTransportDefaults(&http.Transport{Dial: net.Dial, TLSClientConfig: &tls.Config{RootCAs: localhostPool}}),
|
||||
RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
|
||||
req = utilnet.CloneRequest(req)
|
||||
req.Header.Set("Authorization", "Bearer 1234")
|
||||
return MirrorRequest.RoundTrip(req)
|
||||
}),
|
||||
),
|
||||
ExpectedAuth: "Bearer 1234",
|
||||
},
|
||||
}
|
||||
|
||||
for k, tc := range testcases {
|
||||
|
@ -406,6 +438,12 @@ func TestProxyUpgrade(t *testing.T) {
|
|||
func() { // Cleanup after each test case.
|
||||
backend := http.NewServeMux()
|
||||
backend.Handle("/hello", websocket.Handler(func(ws *websocket.Conn) {
|
||||
if ws.Request().Header.Get("Authorization") != tc.ExpectedAuth {
|
||||
t.Errorf("%s: unexpected headers on request: %v", k, ws.Request().Header)
|
||||
defer ws.Close()
|
||||
ws.Write([]byte("you failed"))
|
||||
return
|
||||
}
|
||||
defer ws.Close()
|
||||
body := make([]byte, 5)
|
||||
ws.Read(body)
|
||||
|
@ -422,6 +460,7 @@ func TestProxyUpgrade(t *testing.T) {
|
|||
proxyHandler := &UpgradeAwareHandler{
|
||||
Location: serverURL,
|
||||
Transport: tc.ProxyTransport,
|
||||
UpgradeTransport: tc.UpgradeTransport,
|
||||
InterceptRedirects: redirect,
|
||||
Responder: &noErrorsAllowed{t: t},
|
||||
}
|
||||
|
|
|
@ -431,8 +431,6 @@ var _ = SIGDescribe("Kubectl client", func() {
|
|||
})
|
||||
|
||||
It("should support exec through kubectl proxy", func() {
|
||||
Skip("kubernetes/kubernetes#50466: This feature doesn't work for anything but client certs authentication.")
|
||||
|
||||
// Fail if the variable isn't set
|
||||
if framework.TestContext.Host == "" {
|
||||
framework.Failf("--host variable must be set to the full URI to the api server on e2e run.")
|
||||
|
|
Loading…
Reference in New Issue