Revert "Allow specifying scheme when proxying"

pull/6/head
Filip Grzadkowski 2015-10-07 13:02:35 +02:00
parent 887aeaa101
commit c0532a432a
11 changed files with 70 additions and 235 deletions

View File

@ -41,23 +41,19 @@ import (
type UpgradeAwareProxyHandler struct {
UpgradeRequired bool
Location *url.URL
// Transport provides an optional round tripper to use to proxy. If nil, the default proxy transport is used
Transport http.RoundTripper
// WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting)
WrapTransport bool
FlushInterval time.Duration
MaxBytesPerSec int64
err error
Transport http.RoundTripper
FlushInterval time.Duration
MaxBytesPerSec int64
err error
}
const defaultFlushInterval = 200 * time.Millisecond
// NewUpgradeAwareProxyHandler creates a new proxy handler with a default flush interval
func NewUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool) *UpgradeAwareProxyHandler {
func NewUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, upgradeRequired bool) *UpgradeAwareProxyHandler {
return &UpgradeAwareProxyHandler{
Location: location,
Transport: transport,
WrapTransport: wrapTransport,
UpgradeRequired: upgradeRequired,
FlushInterval: defaultFlushInterval,
}
@ -105,8 +101,8 @@ func (h *UpgradeAwareProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Re
return
}
if h.Transport == nil || h.WrapTransport {
h.Transport = h.defaultProxyTransport(req.URL, h.Transport)
if h.Transport == nil {
h.Transport = h.defaultProxyTransport(req.URL)
}
newReq, err := http.NewRequest(req.Method, loc.String(), req.Body)
@ -262,7 +258,7 @@ func (h *UpgradeAwareProxyHandler) dialURL() (net.Conn, error) {
}
}
func (h *UpgradeAwareProxyHandler) defaultProxyTransport(url *url.URL, internalTransport http.RoundTripper) http.RoundTripper {
func (h *UpgradeAwareProxyHandler) defaultProxyTransport(url *url.URL) http.RoundTripper {
scheme := url.Scheme
host := url.Host
suffix := h.Location.Path
@ -270,14 +266,13 @@ func (h *UpgradeAwareProxyHandler) defaultProxyTransport(url *url.URL, internalT
suffix += "/"
}
pathPrepend := strings.TrimSuffix(url.Path, suffix)
rewritingTransport := &proxy.Transport{
Scheme: scheme,
Host: host,
PathPrepend: pathPrepend,
RoundTripper: internalTransport,
internalTransport := &proxy.Transport{
Scheme: scheme,
Host: host,
PathPrepend: pathPrepend,
}
return &corsRemovingTransport{
RoundTripper: rewritingTransport,
RoundTripper: internalTransport,
}
}

View File

@ -388,7 +388,7 @@ func TestDefaultProxyTransport(t *testing.T) {
h := UpgradeAwareProxyHandler{
Location: locURL,
}
result := h.defaultProxyTransport(URL, nil)
result := h.defaultProxyTransport(URL)
transport := result.(*corsRemovingTransport).RoundTripper.(*proxy.Transport)
if transport.Scheme != test.expectedScheme {
t.Errorf("%s: unexpected scheme. Actual: %s, Expected: %s", test.name, transport.Scheme, test.expectedScheme)

View File

@ -137,7 +137,7 @@ func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher {
// ResourceLocation returns an URL and transport which one can use to send traffic for the specified node.
func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
schemeReq, name, portReq, valid := util.SplitSchemeNamePort(id)
name, portReq, valid := util.SplitPort(id)
if !valid {
return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid node request %q", id))
}
@ -154,7 +154,6 @@ func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGet
host := hostIP.String()
if portReq == "" || strconv.Itoa(ports.KubeletPort) == portReq {
// Ignore requested scheme, use scheme provided by GetConnectionInfo
scheme, port, transport, err := connection.GetConnectionInfo(host)
if err != nil {
return nil, nil, err
@ -169,5 +168,5 @@ func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGet
transport,
nil
}
return &url.URL{Scheme: schemeReq, Host: net.JoinHostPort(host, portReq)}, nil, nil
return &url.URL{Host: net.JoinHostPort(host, portReq)}, nil, nil
}

View File

@ -285,13 +285,12 @@ func (r *ProxyREST) Connect(ctx api.Context, id string, opts runtime.Object) (re
if !ok {
return nil, fmt.Errorf("Invalid options object: %#v", opts)
}
location, transport, err := pod.ResourceLocation(r.store, ctx, id)
location, _, err := pod.ResourceLocation(r.store, ctx, id)
if err != nil {
return nil, err
}
location.Path = path.Join(location.Path, proxyOpts.Path)
// Return a proxy handler that uses the desired transport, wrapped with additional proxy handling (to get URL rewriting, X-Forwarded-* headers, etc)
return newThrottledUpgradeAwareProxyHandler(location, transport, true, false), nil
return newUpgradeAwareProxyHandler(location, nil, false), nil
}
// Support both GET and POST methods. Over time, we want to move all clients to start using POST and then stop supporting GET.
@ -322,7 +321,7 @@ func (r *AttachREST) Connect(ctx api.Context, name string, opts runtime.Object)
if err != nil {
return nil, err
}
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true), nil
return genericrest.NewUpgradeAwareProxyHandler(location, transport, true), nil
}
// NewConnectOptions returns the versioned object that represents exec parameters
@ -360,7 +359,7 @@ func (r *ExecREST) Connect(ctx api.Context, name string, opts runtime.Object) (r
if err != nil {
return nil, err
}
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true), nil
return newUpgradeAwareProxyHandler(location, transport, true), nil
}
// NewConnectOptions returns the versioned object that represents exec parameters
@ -404,11 +403,11 @@ func (r *PortForwardREST) Connect(ctx api.Context, name string, opts runtime.Obj
if err != nil {
return nil, err
}
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true), nil
return newUpgradeAwareProxyHandler(location, transport, true), nil
}
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool) *genericrest.UpgradeAwareProxyHandler {
handler := genericrest.NewUpgradeAwareProxyHandler(location, transport, wrapTransport, upgradeRequired)
func newUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, upgradeRequired bool) *genericrest.UpgradeAwareProxyHandler {
handler := genericrest.NewUpgradeAwareProxyHandler(location, transport, upgradeRequired)
handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
return handler
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package pod
import (
"crypto/tls"
"fmt"
"net"
"net/http"
@ -47,13 +46,6 @@ type podStrategy struct {
// objects via the REST API.
var Strategy = podStrategy{api.Scheme, api.SimpleNameGenerator}
// PodProxyTransport is used by the API proxy to connect to pods
// Exported to allow overriding TLS options (like adding a client certificate)
var PodProxyTransport = util.SetTransportDefaults(&http.Transport{
// Turn off hostname verification, because connections are to assigned IPs, not deterministic
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
})
// NamespaceScoped is true for pods.
func (podStrategy) NamespaceScoped() bool {
return true
@ -196,9 +188,9 @@ func getPod(getter ResourceGetter, ctx api.Context, name string) (*api.Pod, erro
// ResourceLocation returns a URL to which one can send traffic for the specified pod.
func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
// Allow ID as "podname" or "podname:port" or "scheme:podname:port".
// If port is not specified, try to use the first defined port on the pod.
scheme, name, port, valid := util.SplitSchemeNamePort(id)
// Allow ID as "podname" or "podname:port". If port is not specified,
// try to use the first defined port on the pod.
name, port, valid := util.SplitPort(id)
if !valid {
return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid pod request %q", id))
}
@ -219,15 +211,15 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.U
}
}
loc := &url.URL{
Scheme: scheme,
}
// We leave off the scheme ('http://') because we have no idea what sort of server
// is listening at this endpoint.
loc := &url.URL{}
if port == "" {
loc.Host = pod.Status.PodIP
} else {
loc.Host = net.JoinHostPort(pod.Status.PodIP, port)
}
return loc, PodProxyTransport, nil
return loc, nil, nil
}
// LogLocation returns the log URL for a pod container. If opts.Container is blank

View File

@ -17,7 +17,6 @@ limitations under the License.
package service
import (
"crypto/tls"
"fmt"
"math/rand"
"net"
@ -50,13 +49,6 @@ type REST struct {
serviceNodePorts portallocator.Interface
}
// ServiceProxyTransport is used by the API proxy to connect to services
// Exported to allow overriding TLS options (like adding a client certificate)
var ServiceProxyTransport = util.SetTransportDefaults(&http.Transport{
// Turn off hostname verification, because connections are to assigned IPs, not deterministic
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
})
// NewStorage returns a new REST.
func NewStorage(registry Registry, endpoints endpoint.Registry, serviceIPs ipallocator.Interface,
serviceNodePorts portallocator.Interface) *REST {
@ -285,8 +277,8 @@ var _ = rest.Redirector(&REST{})
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
// Allow ID as "svcname", "svcname:port", or "scheme:svcname:port".
svcScheme, svcName, portStr, valid := util.SplitSchemeNamePort(id)
// Allow ID as "svcname" or "svcname:port".
svcName, portStr, valid := util.SplitPort(id)
if !valid {
return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
}
@ -311,10 +303,11 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Rou
// Pick a random address.
ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP
port := ss.Ports[i].Port
// We leave off the scheme ('http://') because we have no idea what sort of server
// is listening at this endpoint.
return &url.URL{
Scheme: svcScheme,
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
}, ServiceProxyTransport, nil
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
}, nil, nil
}
}
}

View File

@ -491,18 +491,6 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
t.Errorf("Expected %v, but got %v", e, a)
}
// Test a scheme + name + port.
location, _, err = redirector.ResourceLocation(ctx, "https:foo:p")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if location == nil {
t.Errorf("Unexpected nil: %v", location)
}
if e, a := "https://1.2.3.4:93", location.String(); e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
// Test a non-existent name + port.
location, _, err = redirector.ResourceLocation(ctx, "foo:q")
if err == nil {

View File

@ -18,60 +18,23 @@ package util
import (
"strings"
"k8s.io/kubernetes/pkg/util/sets"
)
var validSchemes = sets.NewString("http", "https", "")
// SplitSchemeNamePort takes a string of the following forms:
// * "<name>", returns "", "<name>","", true
// * "<name>:<port>", returns "", "<name>","<port>",true
// * "<scheme>:<name>:<port>", returns "<scheme>","<name>","<port>",true
// Takes a string of the form "name:port" or "name".
// * If id is of the form "name" or "name:", then return (name, "", true)
// * If id is of the form "name:port", then return (name, port, true)
// * Otherwise, return ("", "", false)
// Additionally, name must be non-empty or valid will be returned false.
//
// Name must be non-empty or valid will be returned false.
// Scheme must be "http" or "https" if specified
// Port is returned as a string, and it is not required to be numeric (could be
// used for a named port, for example).
func SplitSchemeNamePort(id string) (scheme, name, port string, valid bool) {
func SplitPort(id string) (name, port string, valid bool) {
parts := strings.Split(id, ":")
switch len(parts) {
case 1:
name = parts[0]
case 2:
name = parts[0]
port = parts[1]
case 3:
scheme = parts[0]
name = parts[1]
port = parts[2]
default:
return "", "", "", false
if len(parts) > 2 {
return "", "", false
}
if len(name) > 0 && validSchemes.Has(scheme) {
return scheme, name, port, true
} else {
return "", "", "", false
if len(parts) == 2 {
return parts[0], parts[1], len(parts[0]) > 0
}
}
// JoinSchemeNamePort returns a string that specifies the scheme, name, and port:
// * "<name>"
// * "<name>:<port>"
// * "<scheme>:<name>:<port>"
// None of the parameters may contain a ':' character
// Name is required
// Scheme must be "", "http", or "https"
func JoinSchemeNamePort(scheme, name, port string) string {
if len(scheme) > 0 {
// Must include three segments to specify scheme
return scheme + ":" + name + ":" + port
}
if len(port) > 0 {
// Must include two segments to specify port
return name + ":" + port
}
// Return name alone
return name
return id, "", len(id) > 0
}

View File

@ -20,12 +20,11 @@ import (
"testing"
)
func TestSplitSchemeNamePort(t *testing.T) {
func TestSplitPort(t *testing.T) {
table := []struct {
in string
name, port, scheme string
valid bool
normalized bool
in string
name, port string
valid bool
}{
{
in: "aoeu:asdf",
@ -33,50 +32,26 @@ func TestSplitSchemeNamePort(t *testing.T) {
port: "asdf",
valid: true,
}, {
in: "http:aoeu:asdf",
scheme: "http",
name: "aoeu",
port: "asdf",
valid: true,
in: "aoeu:",
name: "aoeu",
valid: true,
}, {
in: "https:aoeu:",
scheme: "https",
name: "aoeu",
port: "",
valid: true,
normalized: false,
in: ":asdf",
name: "",
port: "asdf",
}, {
in: "https:aoeu:asdf",
scheme: "https",
name: "aoeu",
port: "asdf",
valid: true,
}, {
in: "aoeu:",
name: "aoeu",
valid: true,
normalized: false,
}, {
in: ":asdf",
valid: false,
}, {
in: "aoeu:asdf:htns",
valid: false,
in: "aoeu:asdf:htns",
}, {
in: "aoeu",
name: "aoeu",
valid: true,
}, {
in: "",
valid: false,
in: "",
},
}
for _, item := range table {
scheme, name, port, valid := SplitSchemeNamePort(item.in)
if e, a := item.scheme, scheme; e != a {
t.Errorf("%q: Wanted %q, got %q", item.in, e, a)
}
name, port, valid := SplitPort(item.in)
if e, a := item.name, name; e != a {
t.Errorf("%q: Wanted %q, got %q", item.in, e, a)
}
@ -86,26 +61,5 @@ func TestSplitSchemeNamePort(t *testing.T) {
if e, a := item.valid, valid; e != a {
t.Errorf("%q: Wanted %t, got %t", item.in, e, a)
}
// Make sure valid items round trip through JoinSchemeNamePort
if item.valid {
out := JoinSchemeNamePort(scheme, name, port)
if item.normalized && out != item.in {
t.Errorf("%q: Wanted %s, got %s", item.in, item.in, out)
}
scheme, name, port, valid := SplitSchemeNamePort(out)
if e, a := item.scheme, scheme; e != a {
t.Errorf("%q: Wanted %q, got %q", item.in, e, a)
}
if e, a := item.name, name; e != a {
t.Errorf("%q: Wanted %q, got %q", item.in, e, a)
}
if e, a := item.port, port; e != a {
t.Errorf("%q: Wanted %q, got %q", item.in, e, a)
}
if e, a := item.valid, valid; e != a {
t.Errorf("%q: Wanted %t, got %t", item.in, e, a)
}
}
}
}

View File

@ -78,16 +78,6 @@ func proxyContext(version string) {
Port: 81,
TargetPort: util.NewIntOrStringFromInt(162),
},
{
Name: "tlsportname1",
Port: 443,
TargetPort: util.NewIntOrStringFromString("tlsdest1"),
},
{
Name: "tlsportname2",
Port: 444,
TargetPort: util.NewIntOrStringFromInt(462),
},
},
},
})
@ -103,7 +93,7 @@ func proxyContext(version string) {
pods := []*api.Pod{}
cfg := RCConfig{
Client: f.Client,
Image: "gcr.io/google_containers/porter:cd5cb5791ebaa8641955f0e8c2a9bed669b1eaab",
Image: "gcr.io/google_containers/porter:59ad46ed2c56ba50fa7f1dc176c07c37",
Name: service.Name,
Namespace: f.Namespace.Name,
Replicas: 1,
@ -112,17 +102,10 @@ func proxyContext(version string) {
"SERVE_PORT_80": `<a href="/rewriteme">test</a>`,
"SERVE_PORT_160": "foo",
"SERVE_PORT_162": "bar",
"SERVE_TLS_PORT_443": `<a href="/tlsrewriteme">test</a>`,
"SERVE_TLS_PORT_460": `tls baz`,
"SERVE_TLS_PORT_462": `tls qux`,
},
Ports: map[string]int{
"dest1": 160,
"dest2": 162,
"tlsdest1": 460,
"tlsdest2": 462,
},
Labels: labels,
CreatedPods: &pods,
@ -133,44 +116,14 @@ func proxyContext(version string) {
Expect(f.WaitForAnEndpoint(service.Name)).NotTo(HaveOccurred())
// Try proxying through the service and directly to through the pod.
svcProxyURL := func(scheme, port string) string {
return prefix + "/proxy/namespaces/" + f.Namespace.Name + "/services/" + util.JoinSchemeNamePort(scheme, service.Name, port)
}
podProxyURL := func(scheme, port string) string {
return prefix + "/proxy/namespaces/" + f.Namespace.Name + "/pods/" + util.JoinSchemeNamePort(scheme, pods[0].Name, port)
}
subresourcePodProxyURL := func(scheme, port string) string {
return prefix + "/namespaces/" + f.Namespace.Name + "/pods/" + util.JoinSchemeNamePort(scheme, pods[0].Name, port) + "/proxy"
}
svcPrefix := prefix + "/proxy/namespaces/" + f.Namespace.Name + "/services/" + service.Name
podPrefix := prefix + "/proxy/namespaces/" + f.Namespace.Name + "/pods/" + pods[0].Name
expectations := map[string]string{
svcProxyURL("", "portname1") + "/": "foo",
svcProxyURL("", "portname2") + "/": "bar",
svcProxyURL("http", "portname1") + "/": "foo",
svcProxyURL("http", "portname2") + "/": "bar",
svcProxyURL("https", "tlsportname1") + "/": "tls baz",
svcProxyURL("https", "tlsportname2") + "/": "tls qux",
podProxyURL("", "80") + "/": `<a href="` + podProxyURL("", "80") + `/rewriteme">test</a>`,
podProxyURL("", "160") + "/": "foo",
podProxyURL("", "162") + "/": "bar",
podProxyURL("http", "80") + "/": `<a href="` + podProxyURL("http", "80") + `/rewriteme">test</a>`,
podProxyURL("http", "160") + "/": "foo",
podProxyURL("http", "162") + "/": "bar",
subresourcePodProxyURL("", "") + "/": `<a href="` + subresourcePodProxyURL("", "") + `/rewriteme">test</a>`,
subresourcePodProxyURL("", "80") + "/": `<a href="` + subresourcePodProxyURL("", "80") + `/rewriteme">test</a>`,
subresourcePodProxyURL("http", "80") + "/": `<a href="` + subresourcePodProxyURL("http", "80") + `/rewriteme">test</a>`,
subresourcePodProxyURL("", "160") + "/": "foo",
subresourcePodProxyURL("http", "160") + "/": "foo",
subresourcePodProxyURL("", "162") + "/": "bar",
subresourcePodProxyURL("http", "162") + "/": "bar",
subresourcePodProxyURL("https", "443") + "/": `<a href="` + subresourcePodProxyURL("https", "443") + `/tlsrewriteme">test</a>`,
subresourcePodProxyURL("https", "460") + "/": "tls baz",
subresourcePodProxyURL("https", "462") + "/": "tls qux",
svcPrefix + ":portname1/": "foo",
svcPrefix + ":portname2/": "bar",
podPrefix + ":80/": `<a href="` + podPrefix + `:80/rewriteme">test</a>`,
podPrefix + ":160/": "foo",
podPrefix + ":162/": "bar",
// TODO: below entries don't work, but I believe we should make them work.
// svcPrefix + ":80": "foo",
// svcPrefix + ":81": "bar",
@ -206,8 +159,7 @@ func proxyContext(version string) {
recordError(fmt.Sprintf("%v: path %v took %v > 15s", i, path, d))
}
}(i, path, val)
// default QPS is 5
time.Sleep(200 * time.Millisecond)
time.Sleep(150 * time.Millisecond)
}
}
wg.Wait()

View File

@ -8,7 +8,7 @@
"containers": [
{
"name": "porter",
"image": "gcr.io/google_containers/porter:cd5cb5791ebaa8641955f0e8c2a9bed669b1eaab",
"image": "gcr.io/google_containers/porter:59ad46ed2c56ba50fa7f1dc176c07c37",
"env": [
{
"name": "SERVE_PORT_80",