mirror of https://github.com/k3s-io/k3s
Use the UpgradeAwareProxy in `kubectl proxy`
Requires a separate transport that is guaranteed not to be HTTP/2 for exec/attach/portforward, because otherwise the Go client attempts to upgrade us to HTTP/2 first.pull/6/head
parent
d2b8cdb3c4
commit
0daee3ad22
|
@ -19,7 +19,6 @@ go_test(
|
|||
"generate_test.go",
|
||||
"kubectl_test.go",
|
||||
"namespace_test.go",
|
||||
"proxy_server_test.go",
|
||||
"quota_test.go",
|
||||
"resource_filter_test.go",
|
||||
"rolebinding_test.go",
|
||||
|
@ -97,7 +96,6 @@ go_library(
|
|||
"kubectl.go",
|
||||
"namespace.go",
|
||||
"pdb.go",
|
||||
"proxy_server.go",
|
||||
"quota.go",
|
||||
"resource_filter.go",
|
||||
"rolebinding.go",
|
||||
|
@ -197,6 +195,7 @@ filegroup(
|
|||
"//pkg/kubectl/cmd:all-srcs",
|
||||
"//pkg/kubectl/metricsutil:all-srcs",
|
||||
"//pkg/kubectl/plugins:all-srcs",
|
||||
"//pkg/kubectl/proxy:all-srcs",
|
||||
"//pkg/kubectl/resource:all-srcs",
|
||||
"//pkg/kubectl/testing:all-srcs",
|
||||
"//pkg/kubectl/util:all-srcs",
|
||||
|
|
|
@ -91,6 +91,7 @@ go_library(
|
|||
"//pkg/kubectl/cmd/util/openapi:go_default_library",
|
||||
"//pkg/kubectl/metricsutil:go_default_library",
|
||||
"//pkg/kubectl/plugins:go_default_library",
|
||||
"//pkg/kubectl/proxy:go_default_library",
|
||||
"//pkg/kubectl/resource:go_default_library",
|
||||
"//pkg/kubectl/util:go_default_library",
|
||||
"//pkg/kubectl/util/term:go_default_library",
|
||||
|
|
|
@ -139,7 +139,7 @@ func RunProxy(f cmdutil.Factory, out io.Writer, cmd *cobra.Command) error {
|
|||
filter = nil
|
||||
}
|
||||
|
||||
server, err := proxy.NewProxyServer(staticDir, apiProxyPrefix, staticPrefix, filter, clientConfig)
|
||||
server, err := proxy.NewServer(staticDir, apiProxyPrefix, staticPrefix, filter, clientConfig)
|
||||
|
||||
// Separate listening from serving so we can report the bound port
|
||||
// when it is chosen by os (eg: port == 0)
|
||||
|
@ -152,7 +152,7 @@ func RunProxy(f cmdutil.Factory, out io.Writer, cmd *cobra.Command) error {
|
|||
if err != nil {
|
||||
glog.Fatal(err)
|
||||
}
|
||||
fmt.Fprintf(out, "Starting to serve on %s", l.Addr().String())
|
||||
fmt.Fprintf(out, "Starting to serve on %s\n", l.Addr().String())
|
||||
glog.Fatal(server.ServeOnListener(l))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["proxy_server_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/proxy:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["proxy_server.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/kubectl/util:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/proxy:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
"//vendor/k8s.io/client-go/transport:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
|
@ -20,7 +20,6 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"os"
|
||||
"regexp"
|
||||
|
@ -28,19 +27,26 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/util/proxy"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/transport"
|
||||
"k8s.io/kubernetes/pkg/kubectl/util"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultHostAcceptRE = "^localhost$,^127\\.0\\.0\\.1$,^\\[::1\\]$"
|
||||
DefaultPathAcceptRE = "^.*"
|
||||
DefaultPathRejectRE = "^/api/.*/pods/.*/exec,^/api/.*/pods/.*/attach"
|
||||
// DefaultHostAcceptRE is the default value for which hosts to accept.
|
||||
DefaultHostAcceptRE = "^localhost$,^127\\.0\\.0\\.1$,^\\[::1\\]$"
|
||||
// DefaultPathAcceptRE is the default path to accept.
|
||||
DefaultPathAcceptRE = "^.*"
|
||||
// DefaultPathRejectRE is the default set of paths to reject.
|
||||
DefaultPathRejectRE = "^/api/.*/pods/.*/exec,^/api/.*/pods/.*/attach"
|
||||
// DefaultMethodRejectRE is the set of HTTP methods to reject by default.
|
||||
DefaultMethodRejectRE = "^$"
|
||||
)
|
||||
|
||||
var (
|
||||
// The reverse proxy will periodically flush the io writer at this frequency.
|
||||
// ReverseProxyFlushInterval is the frequency to flush the reverse proxy.
|
||||
// Only matters for long poll connections like the one used to watch. With an
|
||||
// interval of 0 the reverse proxy will buffer content sent on any connection
|
||||
// with transfer-encoding=chunked.
|
||||
|
@ -63,7 +69,7 @@ type FilterServer struct {
|
|||
delegate http.Handler
|
||||
}
|
||||
|
||||
// Splits a comma separated list of regexps into an array of Regexp objects.
|
||||
// MakeRegexpArray splits a comma separated list of regexps into an array of Regexp objects.
|
||||
func MakeRegexpArray(str string) ([]*regexp.Regexp, error) {
|
||||
parts := strings.Split(str, ",")
|
||||
result := make([]*regexp.Regexp, len(parts))
|
||||
|
@ -77,6 +83,7 @@ func MakeRegexpArray(str string) ([]*regexp.Regexp, error) {
|
|||
return result, nil
|
||||
}
|
||||
|
||||
// MakeRegexpArrayOrDie creates an array of regular expression objects from a string or exits.
|
||||
func MakeRegexpArrayOrDie(str string) []*regexp.Regexp {
|
||||
result, err := MakeRegexpArray(str)
|
||||
if err != nil {
|
||||
|
@ -137,15 +144,38 @@ func (f *FilterServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
|||
rw.Write([]byte("<h3>Unauthorized</h3>"))
|
||||
}
|
||||
|
||||
// ProxyServer is a http.Handler which proxies Kubernetes APIs to remote API server.
|
||||
type ProxyServer struct {
|
||||
// Server is a http.Handler which proxies Kubernetes APIs to remote API server.
|
||||
type Server struct {
|
||||
handler http.Handler
|
||||
}
|
||||
|
||||
// NewProxyServer creates and installs a new ProxyServer.
|
||||
// It automatically registers the created ProxyServer to http.DefaultServeMux.
|
||||
type responder struct{}
|
||||
|
||||
func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||
glog.Errorf("Error while proxying request: %v", err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
// makeUpgradeTransport creates a transport that explicitly bypasses HTTP2 support
|
||||
// for proxy connections that must upgrade.
|
||||
func makeUpgradeTransport(config *rest.Config) (http.RoundTripper, error) {
|
||||
transportConfig, err := config.TransportConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tlsConfig, err := transport.TLSConfigFor(transportConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rt := utilnet.SetOldTransportDefaults(&http.Transport{
|
||||
TLSClientConfig: tlsConfig,
|
||||
})
|
||||
return transport.HTTPWrappersForConfig(transportConfig, rt)
|
||||
}
|
||||
|
||||
// NewServer creates and installs a new Server.
|
||||
// 'filter', if non-nil, protects requests to the api only.
|
||||
func NewProxyServer(filebase string, apiProxyPrefix string, staticPrefix string, filter *FilterServer, cfg *restclient.Config) (*ProxyServer, error) {
|
||||
func NewServer(filebase string, apiProxyPrefix string, staticPrefix string, filter *FilterServer, cfg *rest.Config) (*Server, error) {
|
||||
host := cfg.Host
|
||||
if !strings.HasSuffix(host, "/") {
|
||||
host = host + "/"
|
||||
|
@ -154,10 +184,20 @@ func NewProxyServer(filebase string, apiProxyPrefix string, staticPrefix string,
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
proxy := newProxy(target)
|
||||
if proxy.Transport, err = restclient.TransportFor(cfg); err != nil {
|
||||
|
||||
responder := &responder{}
|
||||
transport, err := rest.TransportFor(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
upgradeTransport, err := makeUpgradeTransport(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
proxy := proxy.NewUpgradeAwareHandler(target, transport, false, false, responder)
|
||||
proxy.UpgradeTransport = upgradeTransport
|
||||
proxy.UseRequestLocation = true
|
||||
|
||||
proxyServer := http.Handler(proxy)
|
||||
if filter != nil {
|
||||
proxyServer = filter.HandlerFor(proxyServer)
|
||||
|
@ -174,16 +214,16 @@ func NewProxyServer(filebase string, apiProxyPrefix string, staticPrefix string,
|
|||
// serving their working directory by default.
|
||||
mux.Handle(staticPrefix, newFileHandler(staticPrefix, filebase))
|
||||
}
|
||||
return &ProxyServer{handler: mux}, nil
|
||||
return &Server{handler: mux}, nil
|
||||
}
|
||||
|
||||
// Listen is a simple wrapper around net.Listen.
|
||||
func (s *ProxyServer) Listen(address string, port int) (net.Listener, error) {
|
||||
func (s *Server) Listen(address string, port int) (net.Listener, error) {
|
||||
return net.Listen("tcp", fmt.Sprintf("%s:%d", address, port))
|
||||
}
|
||||
|
||||
// ListenUnix does net.Listen for a unix socket
|
||||
func (s *ProxyServer) ListenUnix(path string) (net.Listener, error) {
|
||||
func (s *Server) ListenUnix(path string) (net.Listener, error) {
|
||||
// Remove any socket, stale or not, but fall through for other files
|
||||
fi, err := os.Stat(path)
|
||||
if err == nil && (fi.Mode()&os.ModeSocket) != 0 {
|
||||
|
@ -196,23 +236,14 @@ func (s *ProxyServer) ListenUnix(path string) (net.Listener, error) {
|
|||
return l, err
|
||||
}
|
||||
|
||||
// Serve starts the server using given listener, loops forever.
|
||||
func (s *ProxyServer) ServeOnListener(l net.Listener) error {
|
||||
// ServeOnListener starts the server using given listener, loops forever.
|
||||
func (s *Server) ServeOnListener(l net.Listener) error {
|
||||
server := http.Server{
|
||||
Handler: s.handler,
|
||||
}
|
||||
return server.Serve(l)
|
||||
}
|
||||
|
||||
func newProxy(target *url.URL) *httputil.ReverseProxy {
|
||||
director := func(req *http.Request) {
|
||||
req.URL.Scheme = target.Scheme
|
||||
req.URL.Host = target.Host
|
||||
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
|
||||
}
|
||||
return &httputil.ReverseProxy{Director: director, FlushInterval: ReverseProxyFlushInterval}
|
||||
}
|
||||
|
||||
func newFileHandler(prefix, base string) http.Handler {
|
||||
return http.StripPrefix(prefix, http.FileServer(http.Dir(base)))
|
||||
}
|
||||
|
|
|
@ -27,7 +27,8 @@ import (
|
|||
"strings"
|
||||
"testing"
|
||||
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/apimachinery/pkg/util/proxy"
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
func TestAccept(t *testing.T) {
|
||||
|
@ -340,6 +341,12 @@ func TestFileServing(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func newProxy(target *url.URL) http.Handler {
|
||||
p := proxy.NewUpgradeAwareHandler(target, http.DefaultTransport, false, false, &responder{})
|
||||
p.UseRequestLocation = true
|
||||
return p
|
||||
}
|
||||
|
||||
func TestAPIRequests(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
b, err := ioutil.ReadAll(r.Body)
|
||||
|
@ -353,6 +360,7 @@ func TestAPIRequests(t *testing.T) {
|
|||
|
||||
// httptest.NewServer should always generate a valid URL.
|
||||
target, _ := url.Parse(ts.URL)
|
||||
target.Path = "/"
|
||||
proxy := newProxy(target)
|
||||
|
||||
tests := []struct{ method, body string }{
|
||||
|
@ -404,13 +412,13 @@ func TestPathHandling(t *testing.T) {
|
|||
{"/custom/", "/custom/api/v1/pods/", "/api/v1/pods/"},
|
||||
}
|
||||
|
||||
cc := &restclient.Config{
|
||||
cc := &rest.Config{
|
||||
Host: ts.URL,
|
||||
}
|
||||
|
||||
for _, item := range table {
|
||||
func() {
|
||||
p, err := NewProxyServer("", item.prefix, "/not/used/for/this/test", nil, cc)
|
||||
p, err := NewServer("", item.prefix, "/not/used/for/this/test", nil, cc)
|
||||
if err != nil {
|
||||
t.Fatalf("%#v: %v", item, err)
|
||||
}
|
||||
|
|
|
@ -8,6 +8,14 @@ load(
|
|||
"go_test",
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["util_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
|
@ -23,14 +31,6 @@ go_library(
|
|||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["util_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
|
|
Loading…
Reference in New Issue