From b9ba3bd7c23a3045e35d304c5c105624a1b24a25 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Fri, 2 Oct 2015 02:45:32 -0400 Subject: [PATCH] Allow specifying scheme when proxying --- pkg/registry/generic/rest/proxy.go | 31 ++++++----- pkg/registry/generic/rest/proxy_test.go | 2 +- pkg/registry/node/strategy.go | 5 +- pkg/registry/pod/etcd/etcd.go | 15 +++--- pkg/registry/pod/strategy.go | 22 +++++--- pkg/registry/service/rest.go | 19 ++++--- pkg/registry/service/rest_test.go | 12 +++++ pkg/util/port_split.go | 59 ++++++++++++++++---- pkg/util/port_split_test.go | 72 ++++++++++++++++++++----- test/e2e/proxy.go | 66 +++++++++++++++++++---- test/images/porter/pod.json | 2 +- 11 files changed, 235 insertions(+), 70 deletions(-) diff --git a/pkg/registry/generic/rest/proxy.go b/pkg/registry/generic/rest/proxy.go index 45611931bd..14bd0e0e27 100644 --- a/pkg/registry/generic/rest/proxy.go +++ b/pkg/registry/generic/rest/proxy.go @@ -41,19 +41,23 @@ import ( type UpgradeAwareProxyHandler struct { UpgradeRequired bool Location *url.URL - Transport http.RoundTripper - FlushInterval time.Duration - MaxBytesPerSec int64 - err error + // Transport provides an optional round tripper to use to proxy. If nil, the default proxy transport is used + Transport http.RoundTripper + // WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting) + WrapTransport bool + FlushInterval time.Duration + MaxBytesPerSec int64 + err error } const defaultFlushInterval = 200 * time.Millisecond // NewUpgradeAwareProxyHandler creates a new proxy handler with a default flush interval -func NewUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, upgradeRequired bool) *UpgradeAwareProxyHandler { +func NewUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool) *UpgradeAwareProxyHandler { return &UpgradeAwareProxyHandler{ Location: location, Transport: transport, + WrapTransport: wrapTransport, UpgradeRequired: upgradeRequired, FlushInterval: defaultFlushInterval, } @@ -101,8 +105,8 @@ func (h *UpgradeAwareProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Re return } - if h.Transport == nil { - h.Transport = h.defaultProxyTransport(req.URL) + if h.Transport == nil || h.WrapTransport { + h.Transport = h.defaultProxyTransport(req.URL, h.Transport) } newReq, err := http.NewRequest(req.Method, loc.String(), req.Body) @@ -258,7 +262,7 @@ func (h *UpgradeAwareProxyHandler) dialURL() (net.Conn, error) { } } -func (h *UpgradeAwareProxyHandler) defaultProxyTransport(url *url.URL) http.RoundTripper { +func (h *UpgradeAwareProxyHandler) defaultProxyTransport(url *url.URL, internalTransport http.RoundTripper) http.RoundTripper { scheme := url.Scheme host := url.Host suffix := h.Location.Path @@ -266,13 +270,14 @@ func (h *UpgradeAwareProxyHandler) defaultProxyTransport(url *url.URL) http.Roun suffix += "/" } pathPrepend := strings.TrimSuffix(url.Path, suffix) - internalTransport := &proxy.Transport{ - Scheme: scheme, - Host: host, - PathPrepend: pathPrepend, + rewritingTransport := &proxy.Transport{ + Scheme: scheme, + Host: host, + PathPrepend: pathPrepend, + RoundTripper: internalTransport, } return &corsRemovingTransport{ - RoundTripper: internalTransport, + RoundTripper: rewritingTransport, } } diff --git a/pkg/registry/generic/rest/proxy_test.go b/pkg/registry/generic/rest/proxy_test.go index bb2e8b5615..1f20f3dac8 100644 --- a/pkg/registry/generic/rest/proxy_test.go +++ b/pkg/registry/generic/rest/proxy_test.go @@ -388,7 +388,7 @@ func TestDefaultProxyTransport(t *testing.T) { h := UpgradeAwareProxyHandler{ Location: locURL, } - result := h.defaultProxyTransport(URL) + result := h.defaultProxyTransport(URL, nil) transport := result.(*corsRemovingTransport).RoundTripper.(*proxy.Transport) if transport.Scheme != test.expectedScheme { t.Errorf("%s: unexpected scheme. Actual: %s, Expected: %s", test.name, transport.Scheme, test.expectedScheme) diff --git a/pkg/registry/node/strategy.go b/pkg/registry/node/strategy.go index 939fc106dc..76f77e0239 100644 --- a/pkg/registry/node/strategy.go +++ b/pkg/registry/node/strategy.go @@ -137,7 +137,7 @@ func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher { // ResourceLocation returns an URL and transport which one can use to send traffic for the specified node. func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { - name, portReq, valid := util.SplitPort(id) + schemeReq, name, portReq, valid := util.SplitSchemeNamePort(id) if !valid { return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid node request %q", id)) } @@ -154,6 +154,7 @@ func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGet host := hostIP.String() if portReq == "" || strconv.Itoa(ports.KubeletPort) == portReq { + // Ignore requested scheme, use scheme provided by GetConnectionInfo scheme, port, transport, err := connection.GetConnectionInfo(host) if err != nil { return nil, nil, err @@ -168,5 +169,5 @@ func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGet transport, nil } - return &url.URL{Host: net.JoinHostPort(host, portReq)}, nil, nil + return &url.URL{Scheme: schemeReq, Host: net.JoinHostPort(host, portReq)}, nil, nil } diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index d00129da44..c32399629d 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -285,12 +285,13 @@ func (r *ProxyREST) Connect(ctx api.Context, id string, opts runtime.Object) (re if !ok { return nil, fmt.Errorf("Invalid options object: %#v", opts) } - location, _, err := pod.ResourceLocation(r.store, ctx, id) + location, transport, err := pod.ResourceLocation(r.store, ctx, id) if err != nil { return nil, err } location.Path = path.Join(location.Path, proxyOpts.Path) - return newUpgradeAwareProxyHandler(location, nil, false), nil + // Return a proxy handler that uses the desired transport, wrapped with additional proxy handling (to get URL rewriting, X-Forwarded-* headers, etc) + return newThrottledUpgradeAwareProxyHandler(location, transport, true, false), nil } // Support both GET and POST methods. Over time, we want to move all clients to start using POST and then stop supporting GET. @@ -321,7 +322,7 @@ func (r *AttachREST) Connect(ctx api.Context, name string, opts runtime.Object) if err != nil { return nil, err } - return genericrest.NewUpgradeAwareProxyHandler(location, transport, true), nil + return newThrottledUpgradeAwareProxyHandler(location, transport, false, true), nil } // NewConnectOptions returns the versioned object that represents exec parameters @@ -359,7 +360,7 @@ func (r *ExecREST) Connect(ctx api.Context, name string, opts runtime.Object) (r if err != nil { return nil, err } - return newUpgradeAwareProxyHandler(location, transport, true), nil + return newThrottledUpgradeAwareProxyHandler(location, transport, false, true), nil } // NewConnectOptions returns the versioned object that represents exec parameters @@ -403,11 +404,11 @@ func (r *PortForwardREST) Connect(ctx api.Context, name string, opts runtime.Obj if err != nil { return nil, err } - return newUpgradeAwareProxyHandler(location, transport, true), nil + return newThrottledUpgradeAwareProxyHandler(location, transport, false, true), nil } -func newUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, upgradeRequired bool) *genericrest.UpgradeAwareProxyHandler { - handler := genericrest.NewUpgradeAwareProxyHandler(location, transport, upgradeRequired) +func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool) *genericrest.UpgradeAwareProxyHandler { + handler := genericrest.NewUpgradeAwareProxyHandler(location, transport, wrapTransport, upgradeRequired) handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec return handler } diff --git a/pkg/registry/pod/strategy.go b/pkg/registry/pod/strategy.go index cc721099a5..4029715b97 100644 --- a/pkg/registry/pod/strategy.go +++ b/pkg/registry/pod/strategy.go @@ -17,6 +17,7 @@ limitations under the License. package pod import ( + "crypto/tls" "fmt" "net" "net/http" @@ -46,6 +47,13 @@ type podStrategy struct { // objects via the REST API. var Strategy = podStrategy{api.Scheme, api.SimpleNameGenerator} +// PodProxyTransport is used by the API proxy to connect to pods +// Exported to allow overriding TLS options (like adding a client certificate) +var PodProxyTransport = util.SetTransportDefaults(&http.Transport{ + // Turn off hostname verification, because connections are to assigned IPs, not deterministic + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, +}) + // NamespaceScoped is true for pods. func (podStrategy) NamespaceScoped() bool { return true @@ -188,9 +196,9 @@ func getPod(getter ResourceGetter, ctx api.Context, name string) (*api.Pod, erro // ResourceLocation returns a URL to which one can send traffic for the specified pod. func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { - // Allow ID as "podname" or "podname:port". If port is not specified, - // try to use the first defined port on the pod. - name, port, valid := util.SplitPort(id) + // Allow ID as "podname" or "podname:port" or "scheme:podname:port". + // If port is not specified, try to use the first defined port on the pod. + scheme, name, port, valid := util.SplitSchemeNamePort(id) if !valid { return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid pod request %q", id)) } @@ -211,15 +219,15 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.U } } - // We leave off the scheme ('http://') because we have no idea what sort of server - // is listening at this endpoint. - loc := &url.URL{} + loc := &url.URL{ + Scheme: scheme, + } if port == "" { loc.Host = pod.Status.PodIP } else { loc.Host = net.JoinHostPort(pod.Status.PodIP, port) } - return loc, nil, nil + return loc, PodProxyTransport, nil } // LogLocation returns the log URL for a pod container. If opts.Container is blank diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 075d3d84ac..3b17d72bab 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -17,6 +17,7 @@ limitations under the License. package service import ( + "crypto/tls" "fmt" "math/rand" "net" @@ -49,6 +50,13 @@ type REST struct { serviceNodePorts portallocator.Interface } +// ServiceProxyTransport is used by the API proxy to connect to services +// Exported to allow overriding TLS options (like adding a client certificate) +var ServiceProxyTransport = util.SetTransportDefaults(&http.Transport{ + // Turn off hostname verification, because connections are to assigned IPs, not deterministic + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, +}) + // NewStorage returns a new REST. func NewStorage(registry Registry, endpoints endpoint.Registry, serviceIPs ipallocator.Interface, serviceNodePorts portallocator.Interface) *REST { @@ -277,8 +285,8 @@ var _ = rest.Redirector(&REST{}) // ResourceLocation returns a URL to which one can send traffic for the specified service. func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { - // Allow ID as "svcname" or "svcname:port". - svcName, portStr, valid := util.SplitPort(id) + // Allow ID as "svcname", "svcname:port", or "scheme:svcname:port". + svcScheme, svcName, portStr, valid := util.SplitSchemeNamePort(id) if !valid { return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id)) } @@ -303,11 +311,10 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Rou // Pick a random address. ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP port := ss.Ports[i].Port - // We leave off the scheme ('http://') because we have no idea what sort of server - // is listening at this endpoint. return &url.URL{ - Host: net.JoinHostPort(ip, strconv.Itoa(port)), - }, nil, nil + Scheme: svcScheme, + Host: net.JoinHostPort(ip, strconv.Itoa(port)), + }, ServiceProxyTransport, nil } } } diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 2f7e76e161..288b1d1257 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -491,6 +491,18 @@ func TestServiceRegistryResourceLocation(t *testing.T) { t.Errorf("Expected %v, but got %v", e, a) } + // Test a scheme + name + port. + location, _, err = redirector.ResourceLocation(ctx, "https:foo:p") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if location == nil { + t.Errorf("Unexpected nil: %v", location) + } + if e, a := "https://1.2.3.4:93", location.String(); e != a { + t.Errorf("Expected %v, but got %v", e, a) + } + // Test a non-existent name + port. location, _, err = redirector.ResourceLocation(ctx, "foo:q") if err == nil { diff --git a/pkg/util/port_split.go b/pkg/util/port_split.go index c991dc6758..176271189a 100644 --- a/pkg/util/port_split.go +++ b/pkg/util/port_split.go @@ -18,23 +18,60 @@ package util import ( "strings" + + "k8s.io/kubernetes/pkg/util/sets" ) -// Takes a string of the form "name:port" or "name". -// * If id is of the form "name" or "name:", then return (name, "", true) -// * If id is of the form "name:port", then return (name, port, true) -// * Otherwise, return ("", "", false) -// Additionally, name must be non-empty or valid will be returned false. +var validSchemes = sets.NewString("http", "https", "") + +// SplitSchemeNamePort takes a string of the following forms: +// * "", returns "", "","", true +// * ":", returns "", "","",true +// * "::", returns "","","",true // +// Name must be non-empty or valid will be returned false. +// Scheme must be "http" or "https" if specified // Port is returned as a string, and it is not required to be numeric (could be // used for a named port, for example). -func SplitPort(id string) (name, port string, valid bool) { +func SplitSchemeNamePort(id string) (scheme, name, port string, valid bool) { parts := strings.Split(id, ":") - if len(parts) > 2 { - return "", "", false + switch len(parts) { + case 1: + name = parts[0] + case 2: + name = parts[0] + port = parts[1] + case 3: + scheme = parts[0] + name = parts[1] + port = parts[2] + default: + return "", "", "", false } - if len(parts) == 2 { - return parts[0], parts[1], len(parts[0]) > 0 + + if len(name) > 0 && validSchemes.Has(scheme) { + return scheme, name, port, true + } else { + return "", "", "", false } - return id, "", len(id) > 0 +} + +// JoinSchemeNamePort returns a string that specifies the scheme, name, and port: +// * "" +// * ":" +// * "::" +// None of the parameters may contain a ':' character +// Name is required +// Scheme must be "", "http", or "https" +func JoinSchemeNamePort(scheme, name, port string) string { + if len(scheme) > 0 { + // Must include three segments to specify scheme + return scheme + ":" + name + ":" + port + } + if len(port) > 0 { + // Must include two segments to specify port + return name + ":" + port + } + // Return name alone + return name } diff --git a/pkg/util/port_split_test.go b/pkg/util/port_split_test.go index 468059cb5d..9d9e5fb0ff 100644 --- a/pkg/util/port_split_test.go +++ b/pkg/util/port_split_test.go @@ -20,11 +20,12 @@ import ( "testing" ) -func TestSplitPort(t *testing.T) { +func TestSplitSchemeNamePort(t *testing.T) { table := []struct { - in string - name, port string - valid bool + in string + name, port, scheme string + valid bool + normalized bool }{ { in: "aoeu:asdf", @@ -32,26 +33,50 @@ func TestSplitPort(t *testing.T) { port: "asdf", valid: true, }, { - in: "aoeu:", - name: "aoeu", - valid: true, + in: "http:aoeu:asdf", + scheme: "http", + name: "aoeu", + port: "asdf", + valid: true, }, { - in: ":asdf", - name: "", - port: "asdf", + in: "https:aoeu:", + scheme: "https", + name: "aoeu", + port: "", + valid: true, + normalized: false, }, { - in: "aoeu:asdf:htns", + in: "https:aoeu:asdf", + scheme: "https", + name: "aoeu", + port: "asdf", + valid: true, + }, { + in: "aoeu:", + name: "aoeu", + valid: true, + normalized: false, + }, { + in: ":asdf", + valid: false, + }, { + in: "aoeu:asdf:htns", + valid: false, }, { in: "aoeu", name: "aoeu", valid: true, }, { - in: "", + in: "", + valid: false, }, } for _, item := range table { - name, port, valid := SplitPort(item.in) + scheme, name, port, valid := SplitSchemeNamePort(item.in) + if e, a := item.scheme, scheme; e != a { + t.Errorf("%q: Wanted %q, got %q", item.in, e, a) + } if e, a := item.name, name; e != a { t.Errorf("%q: Wanted %q, got %q", item.in, e, a) } @@ -61,5 +86,26 @@ func TestSplitPort(t *testing.T) { if e, a := item.valid, valid; e != a { t.Errorf("%q: Wanted %t, got %t", item.in, e, a) } + + // Make sure valid items round trip through JoinSchemeNamePort + if item.valid { + out := JoinSchemeNamePort(scheme, name, port) + if item.normalized && out != item.in { + t.Errorf("%q: Wanted %s, got %s", item.in, item.in, out) + } + scheme, name, port, valid := SplitSchemeNamePort(out) + if e, a := item.scheme, scheme; e != a { + t.Errorf("%q: Wanted %q, got %q", item.in, e, a) + } + if e, a := item.name, name; e != a { + t.Errorf("%q: Wanted %q, got %q", item.in, e, a) + } + if e, a := item.port, port; e != a { + t.Errorf("%q: Wanted %q, got %q", item.in, e, a) + } + if e, a := item.valid, valid; e != a { + t.Errorf("%q: Wanted %t, got %t", item.in, e, a) + } + } } } diff --git a/test/e2e/proxy.go b/test/e2e/proxy.go index b1149d8159..5217fc06d6 100644 --- a/test/e2e/proxy.go +++ b/test/e2e/proxy.go @@ -78,6 +78,16 @@ func proxyContext(version string) { Port: 81, TargetPort: util.NewIntOrStringFromInt(162), }, + { + Name: "tlsportname1", + Port: 443, + TargetPort: util.NewIntOrStringFromString("tlsdest1"), + }, + { + Name: "tlsportname2", + Port: 444, + TargetPort: util.NewIntOrStringFromInt(462), + }, }, }, }) @@ -93,7 +103,7 @@ func proxyContext(version string) { pods := []*api.Pod{} cfg := RCConfig{ Client: f.Client, - Image: "gcr.io/google_containers/porter:59ad46ed2c56ba50fa7f1dc176c07c37", + Image: "gcr.io/google_containers/porter:cd5cb5791ebaa8641955f0e8c2a9bed669b1eaab", Name: service.Name, Namespace: f.Namespace.Name, Replicas: 1, @@ -102,10 +112,17 @@ func proxyContext(version string) { "SERVE_PORT_80": `test`, "SERVE_PORT_160": "foo", "SERVE_PORT_162": "bar", + + "SERVE_TLS_PORT_443": `test`, + "SERVE_TLS_PORT_460": `tls baz`, + "SERVE_TLS_PORT_462": `tls qux`, }, Ports: map[string]int{ "dest1": 160, "dest2": 162, + + "tlsdest1": 460, + "tlsdest2": 462, }, Labels: labels, CreatedPods: &pods, @@ -116,14 +133,44 @@ func proxyContext(version string) { Expect(f.WaitForAnEndpoint(service.Name)).NotTo(HaveOccurred()) // Try proxying through the service and directly to through the pod. - svcPrefix := prefix + "/proxy/namespaces/" + f.Namespace.Name + "/services/" + service.Name - podPrefix := prefix + "/proxy/namespaces/" + f.Namespace.Name + "/pods/" + pods[0].Name + svcProxyURL := func(scheme, port string) string { + return prefix + "/proxy/namespaces/" + f.Namespace.Name + "/services/" + util.JoinSchemeNamePort(scheme, service.Name, port) + } + podProxyURL := func(scheme, port string) string { + return prefix + "/proxy/namespaces/" + f.Namespace.Name + "/pods/" + util.JoinSchemeNamePort(scheme, pods[0].Name, port) + } + subresourcePodProxyURL := func(scheme, port string) string { + return prefix + "/namespaces/" + f.Namespace.Name + "/pods/" + util.JoinSchemeNamePort(scheme, pods[0].Name, port) + "/proxy" + } expectations := map[string]string{ - svcPrefix + ":portname1/": "foo", - svcPrefix + ":portname2/": "bar", - podPrefix + ":80/": `test`, - podPrefix + ":160/": "foo", - podPrefix + ":162/": "bar", + svcProxyURL("", "portname1") + "/": "foo", + svcProxyURL("", "portname2") + "/": "bar", + + svcProxyURL("http", "portname1") + "/": "foo", + svcProxyURL("http", "portname2") + "/": "bar", + + svcProxyURL("https", "tlsportname1") + "/": "tls baz", + svcProxyURL("https", "tlsportname2") + "/": "tls qux", + + podProxyURL("", "80") + "/": `test`, + podProxyURL("", "160") + "/": "foo", + podProxyURL("", "162") + "/": "bar", + + podProxyURL("http", "80") + "/": `test`, + podProxyURL("http", "160") + "/": "foo", + podProxyURL("http", "162") + "/": "bar", + + subresourcePodProxyURL("", "") + "/": `test`, + subresourcePodProxyURL("", "80") + "/": `test`, + subresourcePodProxyURL("http", "80") + "/": `test`, + subresourcePodProxyURL("", "160") + "/": "foo", + subresourcePodProxyURL("http", "160") + "/": "foo", + subresourcePodProxyURL("", "162") + "/": "bar", + subresourcePodProxyURL("http", "162") + "/": "bar", + + subresourcePodProxyURL("https", "443") + "/": `test`, + subresourcePodProxyURL("https", "460") + "/": "tls baz", + subresourcePodProxyURL("https", "462") + "/": "tls qux", // TODO: below entries don't work, but I believe we should make them work. // svcPrefix + ":80": "foo", // svcPrefix + ":81": "bar", @@ -159,7 +206,8 @@ func proxyContext(version string) { recordError(fmt.Sprintf("%v: path %v took %v > 15s", i, path, d)) } }(i, path, val) - time.Sleep(150 * time.Millisecond) + // default QPS is 5 + time.Sleep(200 * time.Millisecond) } } wg.Wait() diff --git a/test/images/porter/pod.json b/test/images/porter/pod.json index 894fc76a9a..2a94ee1e7e 100644 --- a/test/images/porter/pod.json +++ b/test/images/porter/pod.json @@ -8,7 +8,7 @@ "containers": [ { "name": "porter", - "image": "gcr.io/google_containers/porter:59ad46ed2c56ba50fa7f1dc176c07c37", + "image": "gcr.io/google_containers/porter:cd5cb5791ebaa8641955f0e8c2a9bed669b1eaab", "env": [ { "name": "SERVE_PORT_80",