diff --git a/pkg/api/conversion.go b/pkg/api/conversion.go index 89bfdefff5..bd33db677d 100644 --- a/pkg/api/conversion.go +++ b/pkg/api/conversion.go @@ -35,6 +35,10 @@ func init() { obj.LabelSelector = labels.Everything() obj.FieldSelector = fields.Everything() }, + func(obj *PodExecOptions) { + obj.Stderr = true + obj.Stdout = true + }, ) Scheme.AddConversionFuncs( func(in *util.Time, out *util.Time, s conversion.Scope) error { diff --git a/pkg/api/latest/latest.go b/pkg/api/latest/latest.go index 63b131c54e..887bba1a5b 100644 --- a/pkg/api/latest/latest.go +++ b/pkg/api/latest/latest.go @@ -125,7 +125,14 @@ func init() { } // these kinds should be excluded from the list of resources - ignoredKinds := util.NewStringSet("ListOptions", "DeleteOptions", "Status", "ContainerManifest") + ignoredKinds := util.NewStringSet( + "ListOptions", + "DeleteOptions", + "Status", + "ContainerManifest", + "PodLogOptions", + "PodExecOptions", + "PodProxyOptions") // enumerate all supported versions, get the kinds, and register with the mapper how to address our resources for _, version := range versions { diff --git a/pkg/api/register.go b/pkg/api/register.go index 2918a15328..4a35296199 100644 --- a/pkg/api/register.go +++ b/pkg/api/register.go @@ -58,6 +58,8 @@ func init() { &DeleteOptions{}, &ListOptions{}, &PodLogOptions{}, + &PodExecOptions{}, + &PodProxyOptions{}, ) // Legacy names are supported Scheme.AddKnownTypeWithName("", "Minion", &Node{}) @@ -97,3 +99,5 @@ func (*PersistentVolumeClaimList) IsAnAPIObject() {} func (*DeleteOptions) IsAnAPIObject() {} func (*ListOptions) IsAnAPIObject() {} func (*PodLogOptions) IsAnAPIObject() {} +func (*PodExecOptions) IsAnAPIObject() {} +func (*PodProxyOptions) IsAnAPIObject() {} diff --git a/pkg/api/rest/rest.go b/pkg/api/rest/rest.go index 069ed3c2f8..967b92eade 100644 --- a/pkg/api/rest/rest.go +++ b/pkg/api/rest/rest.go @@ -171,6 +171,32 @@ type Redirector interface { ResourceLocation(ctx api.Context, id string) (remoteLocation *url.URL, transport http.RoundTripper, err error) } +// ConnectHandler is a handler for HTTP connection requests. It extends the standard +// http.Handler interface by adding a method that returns an error object if an error +// occurred during the handling of the request. +type ConnectHandler interface { + http.Handler + + // RequestError returns an error if one occurred during handling of an HTTP request + RequestError() error +} + +// Connecter is a storage object that responds to a connection request +type Connecter interface { + // Connect returns a ConnectHandler that will handle the request/response for a request + Connect(ctx api.Context, id string, options runtime.Object) (ConnectHandler, error) + + // NewConnectOptions returns an empty options object that will be used to pass + // options to the Connect method. If nil, then a nil options object is passed to + // Connect. It may return a bool and a string. If true, the value of the request + // path below the object will be included as the named string in the serialization + // of the runtime object. + NewConnectOptions() (runtime.Object, bool, string) + + // ConnectMethods returns the list of HTTP methods handled by Connect + ConnectMethods() []string +} + // ResourceStreamer is an interface implemented by objects that prefer to be streamed from the server // instead of decoded directly. type ResourceStreamer interface { diff --git a/pkg/api/serialization_test.go b/pkg/api/serialization_test.go index d8c02773de..ca79e6f940 100644 --- a/pkg/api/serialization_test.go +++ b/pkg/api/serialization_test.go @@ -129,7 +129,7 @@ func TestList(t *testing.T) { } var nonRoundTrippableTypes = util.NewStringSet("ContainerManifest", "ContainerManifestList") -var nonInternalRoundTrippableTypes = util.NewStringSet("List", "ListOptions") +var nonInternalRoundTrippableTypes = util.NewStringSet("List", "ListOptions", "PodExecOptions") func TestRoundTripTypes(t *testing.T) { // api.Scheme.Log(t) diff --git a/pkg/api/types.go b/pkg/api/types.go index cc91b969af..5868517c84 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -1304,6 +1304,37 @@ type PodLogOptions struct { Follow bool } +// PodExecOptions is the query options to a Pod's remote exec call +type PodExecOptions struct { + TypeMeta + + // Stdin if true indicates that stdin is to be redirected for the exec call + Stdin bool + + // Stdout if true indicates that stdout is to be redirected for the exec call + Stdout bool + + // Stderr if true indicates that stderr is to be redirected for the exec call + Stderr bool + + // TTY if true indicates that a tty will be allocated for the exec call + TTY bool + + // Container in which to execute the command. + Container string + + // Command is the remote command to execute + Command string +} + +// PodProxyOptions is the query options to a Pod's proxy call +type PodProxyOptions struct { + TypeMeta + + // Path is the URL path to use for the current proxy request + Path string +} + // Status is a return value for calls that don't return other objects. // TODO: this could go in apiserver, but I'm including it here so clients needn't // import both. diff --git a/pkg/api/v1beta1/register.go b/pkg/api/v1beta1/register.go index 42d8be4dca..f2a137292e 100644 --- a/pkg/api/v1beta1/register.go +++ b/pkg/api/v1beta1/register.go @@ -66,6 +66,8 @@ func init() { &DeleteOptions{}, &ListOptions{}, &PodLogOptions{}, + &PodExecOptions{}, + &PodProxyOptions{}, ) // Future names are supported api.Scheme.AddKnownTypeWithName("v1beta1", "Node", &Minion{}) @@ -106,3 +108,5 @@ func (*PersistentVolumeClaimList) IsAnAPIObject() {} func (*DeleteOptions) IsAnAPIObject() {} func (*ListOptions) IsAnAPIObject() {} func (*PodLogOptions) IsAnAPIObject() {} +func (*PodExecOptions) IsAnAPIObject() {} +func (*PodProxyOptions) IsAnAPIObject() {} diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index 8ad1fb9ea9..57d41de70e 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -1159,6 +1159,37 @@ type PodLogOptions struct { Follow bool `json:"follow,omitempty" description:"follow the log stream of the pod; defaults to false"` } +// PodExecOptions is the query options to a Pod's remote exec call +type PodExecOptions struct { + TypeMeta `json:",inline"` + + // Stdin if true indicates that stdin is to be redirected for the exec call + Stdin bool `json:"stdin,omitempty" description:"redirect the standard input stream of the pod for this call; defaults to false"` + + // Stdout if true indicates that stdout is to be redirected for the exec call + Stdout bool `json:"stdout,omitempty" description:"redirect the standard output stream of the pod for this call; defaults to true"` + + // Stderr if true indicates that stderr is to be redirected for the exec call + Stderr bool `json:"stderr,omitempty" description:"redirect the standard error stream of the pod for this call; defaults to true"` + + // TTY if true indicates that a tty will be allocated for the exec call + TTY bool `json:"tty,omitempty" description:"allocate a terminal for this exec call; defaults to false"` + + // Container in which to execute the command. + Container string `json:"container,omitempty" description:"the container in which to execute the command. Defaults to only container if there is only one container in the pod."` + + // Command is the remote command to execute + Command string `json:"command" description:"the command to execute"` +} + +// PodProxyOptions is the query options to a Pod's proxy call +type PodProxyOptions struct { + TypeMeta `json:",inline"` + + // Path is the URL path to use for the current proxy request + Path string `json:"path,omitempty" description:"URL path to use in proxy request to pod"` +} + // Status is a return value for calls that don't return other objects. // TODO: this could go in apiserver, but I'm including it here so clients needn't // import both. diff --git a/pkg/api/v1beta2/register.go b/pkg/api/v1beta2/register.go index ccd8f06477..3cc2bc7d54 100644 --- a/pkg/api/v1beta2/register.go +++ b/pkg/api/v1beta2/register.go @@ -66,6 +66,8 @@ func init() { &DeleteOptions{}, &ListOptions{}, &PodLogOptions{}, + &PodExecOptions{}, + &PodProxyOptions{}, ) // Future names are supported api.Scheme.AddKnownTypeWithName("v1beta2", "Node", &Minion{}) @@ -106,3 +108,5 @@ func (*PersistentVolumeClaimList) IsAnAPIObject() {} func (*DeleteOptions) IsAnAPIObject() {} func (*ListOptions) IsAnAPIObject() {} func (*PodLogOptions) IsAnAPIObject() {} +func (*PodExecOptions) IsAnAPIObject() {} +func (*PodProxyOptions) IsAnAPIObject() {} diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index a76ea787be..52e1fc720c 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -1186,6 +1186,37 @@ type PodLogOptions struct { Follow bool `json:"follow,omitempty" description:"follow the log stream of the pod; defaults to false"` } +// PodExecOptions is the query options to a Pod's remote exec call +type PodExecOptions struct { + TypeMeta `json:",inline"` + + // Stdin if true indicates that stdin is to be redirected for the exec call + Stdin bool `json:"stdin,omitempty" description:"redirect the standard input stream of the pod for this call; defaults to false"` + + // Stdout if true indicates that stdout is to be redirected for the exec call + Stdout bool `json:"stdout,omitempty" description:"redirect the standard output stream of the pod for this call; defaults to true"` + + // Stderr if true indicates that stderr is to be redirected for the exec call + Stderr bool `json:"stderr,omitempty" description:"redirect the standard error stream of the pod for this call; defaults to true"` + + // TTY if true indicates that a tty will be allocated for the exec call + TTY bool `json:"tty,omitempty" description:"allocate a terminal for this exec call; defaults to false"` + + // Container in which to execute the command. + Container string `json:"container,omitempty" description:"the container in which to execute the command. Defaults to only container if there is only one container in the pod."` + + // Command is the remote command to execute + Command string `json:"command" description:"the command to execute"` +} + +// PodProxyOptions is the query options to a Pod's proxy call +type PodProxyOptions struct { + TypeMeta `json:",inline"` + + // Path is the URL path to use for the current proxy request + Path string `json:"path,omitempty" description:"URL path to use in proxy request to pod"` +} + // Status is a return value for calls that don't return other objects. // TODO: this could go in apiserver, but I'm including it here so clients needn't // import both. diff --git a/pkg/api/v1beta3/register.go b/pkg/api/v1beta3/register.go index dd35b5de23..538ea5a9ac 100644 --- a/pkg/api/v1beta3/register.go +++ b/pkg/api/v1beta3/register.go @@ -59,6 +59,8 @@ func init() { &DeleteOptions{}, &ListOptions{}, &PodLogOptions{}, + &PodExecOptions{}, + &PodProxyOptions{}, ) // Legacy names are supported api.Scheme.AddKnownTypeWithName("v1beta3", "Minion", &Node{}) @@ -98,3 +100,5 @@ func (*PersistentVolumeClaimList) IsAnAPIObject() {} func (*DeleteOptions) IsAnAPIObject() {} func (*ListOptions) IsAnAPIObject() {} func (*PodLogOptions) IsAnAPIObject() {} +func (*PodExecOptions) IsAnAPIObject() {} +func (*PodProxyOptions) IsAnAPIObject() {} diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index 1c02a1ae59..a70316c06d 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -1291,6 +1291,37 @@ type PodLogOptions struct { Follow bool `json:"follow,omitempty" description:"follow the log stream of the pod; defaults to false"` } +// PodExecOptions is the query options to a Pod's remote exec call +type PodExecOptions struct { + TypeMeta `json:",inline"` + + // Stdin if true indicates that stdin is to be redirected for the exec call + Stdin bool `json:"stdin,omitempty" description:"redirect the standard input stream of the pod for this call; defaults to false"` + + // Stdout if true indicates that stdout is to be redirected for the exec call + Stdout bool `json:"stdout,omitempty" description:"redirect the standard output stream of the pod for this call; defaults to true"` + + // Stderr if true indicates that stderr is to be redirected for the exec call + Stderr bool `json:"stderr,omitempty" description:"redirect the standard error stream of the pod for this call; defaults to true"` + + // TTY if true indicates that a tty will be allocated for the exec call + TTY bool `json:"tty,omitempty" description:"allocate a terminal for this exec call; defaults to false"` + + // Container in which to execute the command. + Container string `json:"container,omitempty" description:"the container in which to execute the command. Defaults to only container if there is only one container in the pod."` + + // Command is the remote command to execute + Command string `json:"command" description:"the command to execute"` +} + +// PodProxyOptions is the query options to a Pod's proxy call +type PodProxyOptions struct { + TypeMeta `json:",inline"` + + // Path is the URL path to use for the current proxy request + Path string `json:"path,omitempty" description:"URL path to use in proxy request to pod"` +} + // Status is a return value for calls that don't return other objects. type Status struct { TypeMeta `json:",inline"` diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index 10ff4c90c5..4215ef8741 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -139,6 +139,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag patcher, isPatcher := storage.(rest.Patcher) watcher, isWatcher := storage.(rest.Watcher) _, isRedirector := storage.(rest.Redirector) + connecter, isConnecter := storage.(rest.Connecter) storageMeta, isMetadata := storage.(rest.StorageMetadata) if !isMetadata { storageMeta = defaultStorageMetadata{} @@ -193,6 +194,22 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag isGetter = true } + var ( + connectOptions runtime.Object + connectOptionsKind string + connectSubpath bool + connectSubpathKey string + ) + if isConnecter { + connectOptions, connectSubpath, connectSubpathKey = connecter.NewConnectOptions() + if connectOptions != nil { + _, connectOptionsKind, err = a.group.Typer.ObjectVersionAndKind(connectOptions) + if err != nil { + return err + } + } + } + var ctxFn ContextFunc ctxFn = func(req *restful.Request) api.Context { if ctx, ok := context.Get(req.Request); ok { @@ -238,6 +255,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag actions = appendIf(actions, action{"REDIRECT", "redirect/" + itemPath, nameParams, namer}, isRedirector) actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer}, isRedirector) actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer}, isRedirector) + actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer}, isConnecter) + actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", nameParams, namer}, isConnecter && connectSubpath) } else { // v1beta3 format with namespace in path @@ -275,6 +294,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag actions = appendIf(actions, action{"REDIRECT", "redirect/" + itemPath, nameParams, namer}, isRedirector) actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer}, isRedirector) actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer}, isRedirector) + actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer}, isConnecter) + actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", nameParams, namer}, isConnecter && connectSubpath) // list across namespace. namer = scopeNaming{scope, a.group.Linker, gpath.Join(a.prefix, itemPath), true} @@ -315,6 +336,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag actions = appendIf(actions, action{"REDIRECT", "redirect/" + itemPath, nameParams, namer}, isRedirector) actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer}, isRedirector) actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer}, isRedirector) + actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer}, isConnecter) + actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", nameParams, namer}, isConnecter && connectSubpath) } } @@ -480,6 +503,23 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag addProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) addProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) addProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) + case "CONNECT": + for _, method := range connecter.ConnectMethods() { + route := ws.Method(method).Path(action.Path). + To(ConnectResource(connecter, reqScope, connectOptionsKind, connectSubpath, connectSubpathKey)). + Filter(m). + Doc("connect " + method + " requests to " + kind). + Operation("connect" + method + kind). + Produces("*/*"). + Consumes("*/*"). + Writes("string") + if connectOptions != nil { + if err := addObjectParams(ws, route, connectOptions); err != nil { + return err + } + } + ws.Route(route) + } default: return fmt.Errorf("unrecognized action verb: %s", action.Verb) } diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index a15a4dbc70..fc2d2ebc8f 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -348,6 +348,19 @@ func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, bool, return s, false, s.contentType, s.err } +type SimpleConnectHandler struct { + response string + err error +} + +func (h *SimpleConnectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + w.Write([]byte(h.response)) +} + +func (h *SimpleConnectHandler) RequestError() error { + return h.err +} + func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Object, error) { storage.checkContext(ctx) if id == "binary" { @@ -443,6 +456,39 @@ func (storage *SimpleRESTStorage) ResourceLocation(ctx api.Context, id string) ( return &locationCopy, nil, nil } +// Implement Connecter +type ConnecterRESTStorage struct { + connectHandler rest.ConnectHandler + emptyConnectOptions runtime.Object + receivedConnectOptions runtime.Object + receivedID string + takesPath string +} + +// Implement Connecter +var _ = rest.Connecter(&ConnecterRESTStorage{}) + +func (s *ConnecterRESTStorage) New() runtime.Object { + return &Simple{} +} + +func (s *ConnecterRESTStorage) Connect(ctx api.Context, id string, options runtime.Object) (rest.ConnectHandler, error) { + s.receivedConnectOptions = options + s.receivedID = id + return s.connectHandler, nil +} + +func (s *ConnecterRESTStorage) ConnectMethods() []string { + return []string{"GET", "POST", "PUT", "DELETE"} +} + +func (s *ConnecterRESTStorage) NewConnectOptions() (runtime.Object, bool, string) { + if len(s.takesPath) > 0 { + return s.emptyConnectOptions, true, s.takesPath + } + return s.emptyConnectOptions, false, "" +} + type LegacyRESTStorage struct { *SimpleRESTStorage } @@ -1108,6 +1154,135 @@ func TestGetMissing(t *testing.T) { } } +func TestConnect(t *testing.T) { + responseText := "Hello World" + itemID := "theID" + connectStorage := &ConnecterRESTStorage{ + connectHandler: &SimpleConnectHandler{ + response: responseText, + }, + } + storage := map[string]rest.Storage{ + "simple/connect": connectStorage, + } + handler := handle(storage) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/api/version/simple/" + itemID + "/connect") + + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("unexpected response: %#v", resp) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if connectStorage.receivedID != itemID { + t.Errorf("Unexpected item id. Expected: %s. Actual: %s.", itemID, connectStorage.receivedID) + } + if string(body) != responseText { + t.Errorf("Unexpected response. Expected: %s. Actual: %s.", responseText, string(body)) + } +} + +func TestConnectWithOptions(t *testing.T) { + responseText := "Hello World" + itemID := "theID" + connectStorage := &ConnecterRESTStorage{ + connectHandler: &SimpleConnectHandler{ + response: responseText, + }, + emptyConnectOptions: &SimpleGetOptions{}, + } + storage := map[string]rest.Storage{ + "simple/connect": connectStorage, + } + handler := handle(storage) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/api/version/simple/" + itemID + "/connect?param1=value1¶m2=value2") + + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("unexpected response: %#v", resp) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if connectStorage.receivedID != itemID { + t.Errorf("Unexpected item id. Expected: %s. Actual: %s.", itemID, connectStorage.receivedID) + } + if string(body) != responseText { + t.Errorf("Unexpected response. Expected: %s. Actual: %s.", responseText, string(body)) + } + opts, ok := connectStorage.receivedConnectOptions.(*SimpleGetOptions) + if !ok { + t.Errorf("Unexpected options type: %#v", connectStorage.receivedConnectOptions) + } + if opts.Param1 != "value1" && opts.Param2 != "value2" { + t.Errorf("Unexpected options value: %#v", opts) + } +} + +func TestConnectWithOptionsAndPath(t *testing.T) { + responseText := "Hello World" + itemID := "theID" + testPath := "a/b/c/def" + connectStorage := &ConnecterRESTStorage{ + connectHandler: &SimpleConnectHandler{ + response: responseText, + }, + emptyConnectOptions: &SimpleGetOptions{}, + takesPath: "atAPath", + } + storage := map[string]rest.Storage{ + "simple/connect": connectStorage, + } + handler := handle(storage) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/api/version/simple/" + itemID + "/connect/" + testPath + "?param1=value1¶m2=value2") + + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("unexpected response: %#v", resp) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if connectStorage.receivedID != itemID { + t.Errorf("Unexpected item id. Expected: %s. Actual: %s.", itemID, connectStorage.receivedID) + } + if string(body) != responseText { + t.Errorf("Unexpected response. Expected: %s. Actual: %s.", responseText, string(body)) + } + opts, ok := connectStorage.receivedConnectOptions.(*SimpleGetOptions) + if !ok { + t.Errorf("Unexpected options type: %#v", connectStorage.receivedConnectOptions) + } + if opts.Param1 != "value1" && opts.Param2 != "value2" { + t.Errorf("Unexpected options value: %#v", opts) + } + if opts.Path != testPath { + t.Errorf("Unexpected path value. Expected: %s. Actual: %s.", testPath, opts.Path) + } +} + func TestDelete(t *testing.T) { storage := map[string]rest.Storage{} simpleStorage := SimpleRESTStorage{} diff --git a/pkg/apiserver/proxy.go b/pkg/apiserver/proxy.go index 8b4336beed..1cdb700ba6 100644 --- a/pkg/apiserver/proxy.go +++ b/pkg/apiserver/proxy.go @@ -17,12 +17,9 @@ limitations under the License. package apiserver import ( - "bytes" - "compress/gzip" "crypto/tls" "fmt" "io" - "io/ioutil" "net" "net/http" "net/http/httputil" @@ -36,47 +33,13 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream" + proxyutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util/proxy" "github.com/GoogleCloudPlatform/kubernetes/third_party/golang/netutil" "github.com/golang/glog" - "golang.org/x/net/html" ) -// tagsToAttrs states which attributes of which tags require URL substitution. -// Sources: http://www.w3.org/TR/REC-html40/index/attributes.html -// http://www.w3.org/html/wg/drafts/html/master/index.html#attributes-1 -var tagsToAttrs = map[string]util.StringSet{ - "a": util.NewStringSet("href"), - "applet": util.NewStringSet("codebase"), - "area": util.NewStringSet("href"), - "audio": util.NewStringSet("src"), - "base": util.NewStringSet("href"), - "blockquote": util.NewStringSet("cite"), - "body": util.NewStringSet("background"), - "button": util.NewStringSet("formaction"), - "command": util.NewStringSet("icon"), - "del": util.NewStringSet("cite"), - "embed": util.NewStringSet("src"), - "form": util.NewStringSet("action"), - "frame": util.NewStringSet("longdesc", "src"), - "head": util.NewStringSet("profile"), - "html": util.NewStringSet("manifest"), - "iframe": util.NewStringSet("longdesc", "src"), - "img": util.NewStringSet("longdesc", "src", "usemap"), - "input": util.NewStringSet("src", "usemap", "formaction"), - "ins": util.NewStringSet("cite"), - "link": util.NewStringSet("href"), - "object": util.NewStringSet("classid", "codebase", "data", "usemap"), - "q": util.NewStringSet("cite"), - "script": util.NewStringSet("src"), - "source": util.NewStringSet("src"), - "video": util.NewStringSet("poster", "src"), - - // TODO: css URLs hidden in style elements. -} - // ProxyHandler provides a http.Handler which will proxy traffic to locations // specified by items implementing Redirector. type ProxyHandler struct { @@ -210,10 +173,10 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { if len(namespace) > 0 { prepend = path.Join(r.prefix, "namespaces", namespace, resource, id) } - transport = &proxyTransport{ - proxyScheme: req.URL.Scheme, - proxyHost: req.URL.Host, - proxyPathPrepend: prepend, + transport = &proxyutil.Transport{ + Scheme: req.URL.Scheme, + Host: req.URL.Host, + PathPrepend: prepend, } } proxy.Transport = transport @@ -321,149 +284,3 @@ func singleJoiningSlash(a, b string) string { } return a + b } - -type proxyTransport struct { - proxyScheme string - proxyHost string - proxyPathPrepend string -} - -func (t *proxyTransport) RoundTrip(req *http.Request) (*http.Response, error) { - // Add reverse proxy headers. - req.Header.Set("X-Forwarded-Uri", t.proxyPathPrepend+req.URL.Path) - req.Header.Set("X-Forwarded-Host", t.proxyHost) - req.Header.Set("X-Forwarded-Proto", t.proxyScheme) - - resp, err := http.DefaultTransport.RoundTrip(req) - - if err != nil { - message := fmt.Sprintf("Error: '%s'\nTrying to reach: '%v'", err.Error(), req.URL.String()) - resp = &http.Response{ - StatusCode: http.StatusServiceUnavailable, - Body: ioutil.NopCloser(strings.NewReader(message)), - } - return resp, nil - } - - if redirect := resp.Header.Get("Location"); redirect != "" { - resp.Header.Set("Location", t.rewriteURL(redirect, req.URL)) - } - - cType := resp.Header.Get("Content-Type") - cType = strings.TrimSpace(strings.SplitN(cType, ";", 2)[0]) - if cType != "text/html" { - // Do nothing, simply pass through - return resp, nil - } - - return t.fixLinks(req, resp) -} - -// rewriteURL rewrites a single URL to go through the proxy, if the URL refers -// to the same host as sourceURL, which is the page on which the target URL -// occurred. If any error occurs (e.g. parsing), it returns targetURL. -func (t *proxyTransport) rewriteURL(targetURL string, sourceURL *url.URL) string { - url, err := url.Parse(targetURL) - if err != nil { - return targetURL - } - if url.Host != "" && url.Host != sourceURL.Host { - return targetURL - } - - url.Scheme = t.proxyScheme - url.Host = t.proxyHost - origPath := url.Path - - if strings.HasPrefix(url.Path, "/") { - // The path is rooted at the host. Just add proxy prepend. - url.Path = path.Join(t.proxyPathPrepend, url.Path) - } else { - // The path is relative to sourceURL. - url.Path = path.Join(t.proxyPathPrepend, path.Dir(sourceURL.Path), url.Path) - } - - if strings.HasSuffix(origPath, "/") { - // Add back the trailing slash, which was stripped by path.Join(). - url.Path += "/" - } - - return url.String() -} - -// updateURLs checks and updates any of n's attributes that are listed in tagsToAttrs. -// Any URLs found are, if they're relative, updated with the necessary changes to make -// a visit to that URL also go through the proxy. -// sourceURL is the URL of the page which we're currently on; it's required to make -// relative links work. -func (t *proxyTransport) updateURLs(n *html.Node, sourceURL *url.URL) { - if n.Type != html.ElementNode { - return - } - attrs, ok := tagsToAttrs[n.Data] - if !ok { - return - } - for i, attr := range n.Attr { - if !attrs.Has(attr.Key) { - continue - } - n.Attr[i].Val = t.rewriteURL(attr.Val, sourceURL) - } -} - -// scan recursively calls f for every n and every subnode of n. -func (t *proxyTransport) scan(n *html.Node, f func(*html.Node)) { - f(n) - for c := n.FirstChild; c != nil; c = c.NextSibling { - t.scan(c, f) - } -} - -// fixLinks modifies links in an HTML file such that they will be redirected through the proxy if needed. -func (t *proxyTransport) fixLinks(req *http.Request, resp *http.Response) (*http.Response, error) { - origBody := resp.Body - defer origBody.Close() - - newContent := &bytes.Buffer{} - var reader io.Reader = origBody - var writer io.Writer = newContent - encoding := resp.Header.Get("Content-Encoding") - switch encoding { - case "gzip": - var err error - reader, err = gzip.NewReader(reader) - if err != nil { - return nil, fmt.Errorf("errorf making gzip reader: %v", err) - } - gzw := gzip.NewWriter(writer) - defer gzw.Close() - writer = gzw - // TODO: support flate, other encodings. - case "": - // This is fine - default: - // Some encoding we don't understand-- don't try to parse this - glog.Errorf("Proxy encountered encoding %v for text/html; can't understand this so not fixing links.", encoding) - return resp, nil - } - - doc, err := html.Parse(reader) - if err != nil { - glog.Errorf("Parse failed: %v", err) - return resp, err - } - - t.scan(doc, func(n *html.Node) { t.updateURLs(n, req.URL) }) - if err := html.Render(writer, doc); err != nil { - glog.Errorf("Failed to render: %v", err) - } - - resp.Body = ioutil.NopCloser(newContent) - // Update header node with new content-length - // TODO: Remove any hash/signature headers here? - resp.Header.Del("Content-Length") - resp.ContentLength = int64(newContent.Len()) - - return resp, err -} diff --git a/pkg/apiserver/proxy_test.go b/pkg/apiserver/proxy_test.go index 9d2f25ecd0..38509095bb 100644 --- a/pkg/apiserver/proxy_test.go +++ b/pkg/apiserver/proxy_test.go @@ -17,7 +17,6 @@ limitations under the License. package apiserver import ( - "bytes" "compress/gzip" "fmt" "io" @@ -29,207 +28,9 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" - "golang.org/x/net/html" "golang.org/x/net/websocket" ) -func parseURLOrDie(inURL string) *url.URL { - parsed, err := url.Parse(inURL) - if err != nil { - panic(err) - } - return parsed -} - -// fmtHTML parses and re-emits 'in', effectively canonicalizing it. -func fmtHTML(in string) string { - doc, err := html.Parse(strings.NewReader(in)) - if err != nil { - panic(err) - } - out := &bytes.Buffer{} - if err := html.Render(out, doc); err != nil { - panic(err) - } - return string(out.Bytes()) -} - -func TestProxyTransport(t *testing.T) { - testTransport := &proxyTransport{ - proxyScheme: "http", - proxyHost: "foo.com", - proxyPathPrepend: "/proxy/minion/minion1:10250", - } - testTransport2 := &proxyTransport{ - proxyScheme: "https", - proxyHost: "foo.com", - proxyPathPrepend: "/proxy/minion/minion1:8080", - } - type Item struct { - input string - sourceURL string - transport *proxyTransport - output string - contentType string - forwardedURI string - redirect string - redirectWant string - } - - table := map[string]Item{ - "normal": { - input: `
kubelet.loggoogle.log`, - sourceURL: "http://myminion.com/logs/log.log", - transport: testTransport, - output: `
kubelet.loggoogle.log`, - contentType: "text/html", - forwardedURI: "/proxy/minion/minion1:10250/logs/log.log", - }, - "trailing slash": { - input: `
kubelet.loggoogle.log`, - sourceURL: "http://myminion.com/logs/log.log", - transport: testTransport, - output: `
kubelet.loggoogle.log`, - contentType: "text/html", - forwardedURI: "/proxy/minion/minion1:10250/logs/log.log", - }, - "content-type charset": { - input: `
kubelet.loggoogle.log`, - sourceURL: "http://myminion.com/logs/log.log", - transport: testTransport, - output: `
kubelet.loggoogle.log`, - contentType: "text/html; charset=utf-8", - forwardedURI: "/proxy/minion/minion1:10250/logs/log.log", - }, - "content-type passthrough": { - input: `
kubelet.loggoogle.log`, - sourceURL: "http://myminion.com/logs/log.log", - transport: testTransport, - output: `
kubelet.loggoogle.log`, - contentType: "text/plain", - forwardedURI: "/proxy/minion/minion1:10250/logs/log.log", - }, - "subdir": { - input: `kubelet.loggoogle.log`, - sourceURL: "http://myminion.com/whatever/apt/somelog.log", - transport: testTransport2, - output: `kubelet.loggoogle.log`, - contentType: "text/html", - forwardedURI: "/proxy/minion/minion1:8080/whatever/apt/somelog.log", - }, - "image": { - input: `
`, - sourceURL: "http://myminion.com/", - transport: testTransport, - output: `
`, - contentType: "text/html", - forwardedURI: "/proxy/minion/minion1:10250/", - }, - "abs": { - input: ``, - sourceURL: "http://myminion.com/any/path/", - transport: testTransport, - output: ``, - contentType: "text/html", - forwardedURI: "/proxy/minion/minion1:10250/any/path/", - }, - "abs but same host": { - input: ``, - sourceURL: "http://myminion.com/any/path/", - transport: testTransport, - output: ``, - contentType: "text/html", - forwardedURI: "/proxy/minion/minion1:10250/any/path/", - }, - "redirect rel": { - sourceURL: "http://myminion.com/redirect", - transport: testTransport, - redirect: "/redirected/target/", - redirectWant: "http://foo.com/proxy/minion/minion1:10250/redirected/target/", - forwardedURI: "/proxy/minion/minion1:10250/redirect", - }, - "redirect abs same host": { - sourceURL: "http://myminion.com/redirect", - transport: testTransport, - redirect: "http://myminion.com/redirected/target/", - redirectWant: "http://foo.com/proxy/minion/minion1:10250/redirected/target/", - forwardedURI: "/proxy/minion/minion1:10250/redirect", - }, - "redirect abs other host": { - sourceURL: "http://myminion.com/redirect", - transport: testTransport, - redirect: "http://example.com/redirected/target/", - redirectWant: "http://example.com/redirected/target/", - forwardedURI: "/proxy/minion/minion1:10250/redirect", - }, - } - - testItem := func(name string, item *Item) { - // Canonicalize the html so we can diff. - item.input = fmtHTML(item.input) - item.output = fmtHTML(item.output) - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Check request headers. - if got, want := r.Header.Get("X-Forwarded-Uri"), item.forwardedURI; got != want { - t.Errorf("%v: X-Forwarded-Uri = %q, want %q", name, got, want) - } - if got, want := r.Header.Get("X-Forwarded-Host"), item.transport.proxyHost; got != want { - t.Errorf("%v: X-Forwarded-Host = %q, want %q", name, got, want) - } - if got, want := r.Header.Get("X-Forwarded-Proto"), item.transport.proxyScheme; got != want { - t.Errorf("%v: X-Forwarded-Proto = %q, want %q", name, got, want) - } - - // Send response. - if item.redirect != "" { - http.Redirect(w, r, item.redirect, http.StatusMovedPermanently) - return - } - w.Header().Set("Content-Type", item.contentType) - fmt.Fprint(w, item.input) - })) - defer server.Close() - - // Replace source URL with our test server address. - sourceURL := parseURLOrDie(item.sourceURL) - serverURL := parseURLOrDie(server.URL) - item.input = strings.Replace(item.input, sourceURL.Host, serverURL.Host, -1) - item.redirect = strings.Replace(item.redirect, sourceURL.Host, serverURL.Host, -1) - sourceURL.Host = serverURL.Host - - req, err := http.NewRequest("GET", sourceURL.String(), nil) - if err != nil { - t.Errorf("%v: Unexpected error: %v", name, err) - return - } - resp, err := item.transport.RoundTrip(req) - if err != nil { - t.Errorf("%v: Unexpected error: %v", name, err) - return - } - if item.redirect != "" { - // Check that redirect URLs get rewritten properly. - if got, want := resp.Header.Get("Location"), item.redirectWant; got != want { - t.Errorf("%v: Location header = %q, want %q", name, got, want) - } - return - } - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - t.Errorf("%v: Unexpected error: %v", name, err) - return - } - if e, a := item.output, string(body); e != a { - t.Errorf("%v: expected %v, but got %v", name, e, a) - } - } - - for name, item := range table { - testItem(name, &item) - } -} - func TestProxy(t *testing.T) { table := []struct { method string diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 26cd92384c..f449dc88b1 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -116,16 +116,7 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction { func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope, getOptionsKind string, subpath bool, subpathKey string) restful.RouteFunction { return getResourceHandler(scope, func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) { - query := req.Request.URL.Query() - if subpath { - newQuery := make(url.Values) - for k, v := range query { - newQuery[k] = v - } - newQuery[subpathKey] = []string{req.PathParameter("path")} - query = newQuery - } - opts, err := queryToObject(query, scope, getOptionsKind) + opts, err := getRequestOptions(req, scope, getOptionsKind, subpath, subpathKey) if err != nil { return nil, err } @@ -133,6 +124,52 @@ func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope, getOpt }) } +func getRequestOptions(req *restful.Request, scope RequestScope, kind string, subpath bool, subpathKey string) (runtime.Object, error) { + if len(kind) == 0 { + return nil, nil + } + query := req.Request.URL.Query() + if subpath { + newQuery := make(url.Values) + for k, v := range query { + newQuery[k] = v + } + newQuery[subpathKey] = []string{req.PathParameter("path")} + query = newQuery + } + return queryToObject(query, scope, kind) +} + +// ConnectResource returns a function that handles a connect request on a rest.Storage object. +func ConnectResource(connecter rest.Connecter, scope RequestScope, connectOptionsKind string, subpath bool, subpathKey string) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + namespace, name, err := scope.Namer.Name(req) + if err != nil { + errorJSON(err, scope.Codec, w) + return + } + ctx := scope.ContextFunc(req) + ctx = api.WithNamespace(ctx, namespace) + opts, err := getRequestOptions(req, scope, connectOptionsKind, subpath, subpathKey) + if err != nil { + errorJSON(err, scope.Codec, w) + return + } + handler, err := connecter.Connect(ctx, name, opts) + if err != nil { + errorJSON(err, scope.Codec, w) + return + } + handler.ServeHTTP(w, req.Request) + err = handler.RequestError() + if err != nil { + errorJSON(err, scope.Codec, w) + return + } + } +} + // ListResource returns a function that handles retrieving a list of resources from a rest.Storage object. func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) { diff --git a/pkg/master/master.go b/pkg/master/master.go index ca7c987df4..a310bd1766 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -387,11 +387,14 @@ func (m *Master) init(c *Config) { // TODO: Factor out the core API registration m.storage = map[string]rest.Storage{ - "pods": podStorage.Pod, - "pods/status": podStorage.Status, - "pods/log": podStorage.Log, - "pods/binding": podStorage.Binding, - "bindings": podStorage.Binding, + "pods": podStorage.Pod, + "pods/status": podStorage.Status, + "pods/log": podStorage.Log, + "pods/exec": podStorage.Exec, + "pods/portforward": podStorage.PortForward, + "pods/proxy": podStorage.Proxy, + "pods/binding": podStorage.Binding, + "bindings": podStorage.Binding, "replicationControllers": controllerStorage, "services": service.NewStorage(m.serviceRegistry, m.nodeRegistry, m.endpointRegistry, m.portalNet, c.ClusterName), diff --git a/pkg/registry/generic/rest/proxy.go b/pkg/registry/generic/rest/proxy.go new file mode 100644 index 0000000000..fdee893c79 --- /dev/null +++ b/pkg/registry/generic/rest/proxy.go @@ -0,0 +1,197 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + "net/http/httputil" + "net/url" + "strings" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/proxy" + + "github.com/GoogleCloudPlatform/kubernetes/third_party/golang/netutil" + "github.com/golang/glog" +) + +// UpgradeAwareProxyHandler is a handler for proxy requests that may require an upgrade +type UpgradeAwareProxyHandler struct { + UpgradeRequired bool + Location *url.URL + Transport http.RoundTripper + FlushInterval time.Duration + err error +} + +const defaultFlushInterval = 200 * time.Millisecond + +// NewUpgradeAwareProxyHandler creates a new proxy handler with a default flush interval +func NewUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, upgradeRequired bool) *UpgradeAwareProxyHandler { + return &UpgradeAwareProxyHandler{ + Location: location, + Transport: transport, + UpgradeRequired: upgradeRequired, + FlushInterval: defaultFlushInterval, + } +} + +// RequestError returns an error that occurred while handling request +func (h *UpgradeAwareProxyHandler) RequestError() error { + return h.err +} + +// ServeHTTP handles the proxy request +func (h *UpgradeAwareProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + h.err = nil + if len(h.Location.Scheme) == 0 { + h.Location.Scheme = "http" + } + if h.tryUpgrade(w, req) { + return + } + if h.UpgradeRequired { + h.err = errors.NewBadRequest("Upgrade request required") + return + } + + if h.Transport == nil { + h.Transport = h.defaultProxyTransport(req.URL) + } + + loc := *h.Location + loc.RawQuery = req.URL.RawQuery + newReq, err := http.NewRequest(req.Method, loc.String(), req.Body) + if err != nil { + h.err = err + return + } + newReq.Header = req.Header + + proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host}) + proxy.Transport = h.Transport + proxy.FlushInterval = h.FlushInterval + proxy.ServeHTTP(w, newReq) +} + +// tryUpgrade returns true if the request was handled. +func (h *UpgradeAwareProxyHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool { + if !httpstream.IsUpgradeRequest(req) { + return false + } + + backendConn, err := h.dialURL() + if err != nil { + h.err = err + return true + } + defer backendConn.Close() + + requestHijackedConn, _, err := w.(http.Hijacker).Hijack() + if err != nil { + h.err = err + return true + } + defer requestHijackedConn.Close() + + newReq, err := http.NewRequest(req.Method, h.Location.String(), req.Body) + if err != nil { + h.err = err + return true + } + newReq.Header = req.Header + + if err = newReq.Write(backendConn); err != nil { + h.err = err + return true + } + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + _, err := io.Copy(backendConn, requestHijackedConn) + if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + glog.Errorf("Error proxying data from client to backend: %v", err) + } + wg.Done() + }() + + go func() { + _, err := io.Copy(requestHijackedConn, backendConn) + if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + glog.Errorf("Error proxying data from backend to client: %v", err) + } + wg.Done() + }() + + wg.Wait() + return true +} + +func (h *UpgradeAwareProxyHandler) dialURL() (net.Conn, error) { + dialAddr := netutil.CanonicalAddr(h.Location) + + switch h.Location.Scheme { + case "http": + return net.Dial("tcp", dialAddr) + case "https": + // Get the tls config from the transport if we recognize it + var tlsConfig *tls.Config + if h.Transport != nil { + httpTransport, ok := h.Transport.(*http.Transport) + if ok { + tlsConfig = httpTransport.TLSClientConfig + } + } + + // Dial + tlsConn, err := tls.Dial("tcp", dialAddr, tlsConfig) + if err != nil { + return nil, err + } + + // Verify + host, _, _ := net.SplitHostPort(dialAddr) + if err := tlsConn.VerifyHostname(host); err != nil { + tlsConn.Close() + return nil, err + } + + return tlsConn, nil + default: + return nil, fmt.Errorf("Unknown scheme: %s", h.Location.Scheme) + } +} + +func (h *UpgradeAwareProxyHandler) defaultProxyTransport(url *url.URL) http.RoundTripper { + scheme := url.Scheme + host := url.Host + pathPrepend := strings.TrimRight(url.Path, h.Location.Path) + return &proxy.Transport{ + Scheme: scheme, + Host: host, + PathPrepend: pathPrepend, + } +} diff --git a/pkg/registry/generic/rest/proxy_test.go b/pkg/registry/generic/rest/proxy_test.go new file mode 100644 index 0000000000..615312032c --- /dev/null +++ b/pkg/registry/generic/rest/proxy_test.go @@ -0,0 +1,242 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "bytes" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "golang.org/x/net/websocket" +) + +type SimpleBackendHandler struct { + requestURL url.URL + requestHeader http.Header + requestBody []byte + requestMethod string + responseBody string + responseHeader map[string]string + t *testing.T +} + +func (s *SimpleBackendHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + s.requestURL = *req.URL + s.requestHeader = req.Header + s.requestMethod = req.Method + var err error + s.requestBody, err = ioutil.ReadAll(req.Body) + if err != nil { + s.t.Errorf("Unexpected error: %v", err) + return + } + + for k, v := range s.responseHeader { + w.Header().Add(k, v) + } + w.Write([]byte(s.responseBody)) +} + +func validateParameters(t *testing.T, name string, actual url.Values, expected map[string]string) { + for k, v := range expected { + actualValue, ok := actual[k] + if !ok { + t.Errorf("%s: Expected parameter %s not received", name, k) + continue + } + if actualValue[0] != v { + t.Errorf("%s: Parameter %s values don't match. Actual: %#v, Expected: %s", + name, k, actualValue, v) + } + } +} + +func validateHeaders(t *testing.T, name string, actual http.Header, expected map[string]string) { + for k, v := range expected { + actualValue, ok := actual[k] + if !ok { + t.Errorf("%s: Expected header %s not received", name, k) + continue + } + if actualValue[0] != v { + t.Errorf("%s: Header %s values don't match. Actual: %s, Expected: %s", + name, k, actualValue, v) + } + } +} + +func TestServeHTTP(t *testing.T) { + tests := []struct { + name string + method string + requestPath string + requestBody string + requestParams map[string]string + requestHeader map[string]string + }{ + { + name: "root path, simple get", + method: "GET", + requestPath: "/", + }, + { + name: "simple path, get", + method: "GET", + requestPath: "/path/to/test", + }, + { + name: "request params", + method: "POST", + requestPath: "/some/path", + requestParams: map[string]string{"param1": "value/1", "param2": "value%2"}, + requestBody: "test request body", + }, + { + name: "request headers", + method: "PUT", + requestPath: "/some/path", + requestHeader: map[string]string{"Header1": "value1", "Header2": "value2"}, + }, + } + + for _, test := range tests { + func() { + backendResponse := "Hello" + backendHandler := &SimpleBackendHandler{ + responseBody: backendResponse, + responseHeader: map[string]string{"Content-Type": "text/html"}, + } + backendServer := httptest.NewServer(backendHandler) + defer backendServer.Close() + + backendURL, _ := url.Parse(backendServer.URL) + backendURL.Path = test.requestPath + proxyHandler := &UpgradeAwareProxyHandler{ + Location: backendURL, + } + proxyServer := httptest.NewServer(proxyHandler) + defer proxyServer.Close() + proxyURL, _ := url.Parse(proxyServer.URL) + proxyURL.Path = test.requestPath + paramValues := url.Values{} + for k, v := range test.requestParams { + paramValues[k] = []string{v} + } + proxyURL.RawQuery = paramValues.Encode() + var requestBody io.Reader + if test.requestBody != "" { + requestBody = bytes.NewBufferString(test.requestBody) + } + req, err := http.NewRequest(test.method, proxyURL.String(), requestBody) + if test.requestHeader != nil { + header := http.Header{} + for k, v := range test.requestHeader { + header.Add(k, v) + } + req.Header = header + } + if err != nil { + t.Errorf("Error creating client request: %v", err) + } + client := &http.Client{} + res, err := client.Do(req) + if err != nil { + t.Errorf("Error from proxy request: %v", err) + } + + // Validate backend request + // Method + if backendHandler.requestMethod != test.method { + t.Errorf("Unexpected request method: %s. Expected: %s", + backendHandler.requestMethod, test.method) + } + + // Body + if string(backendHandler.requestBody) != test.requestBody { + t.Errorf("Unexpected request body: %s. Expected: %s", + string(backendHandler.requestBody), test.requestBody) + } + + // Path + if backendHandler.requestURL.Path != test.requestPath { + t.Errorf("Unexpected request path: %s", backendHandler.requestURL.Path) + } + // Parameters + validateParameters(t, test.name, backendHandler.requestURL.Query(), test.requestParams) + + // Headers + validateHeaders(t, test.name+" backend request", backendHandler.requestHeader, + test.requestHeader) + + // Validate proxy response + // Validate Body + responseBody, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Errorf("Unexpected error reading response body: %v", err) + } + if rb := string(responseBody); rb != backendResponse { + t.Errorf("Did not get expected response body: %s. Expected: %s", rb, backendResponse) + } + + // Error + err = proxyHandler.RequestError() + if err != nil { + t.Errorf("Unexpected proxy handler error: %v", err) + } + }() + } +} + +func TestProxyUpgrade(t *testing.T) { + backendServer := httptest.NewServer(websocket.Handler(func(ws *websocket.Conn) { + defer ws.Close() + body := make([]byte, 5) + ws.Read(body) + ws.Write([]byte("hello " + string(body))) + })) + defer backendServer.Close() + + serverURL, _ := url.Parse(backendServer.URL) + proxyHandler := &UpgradeAwareProxyHandler{ + Location: serverURL, + } + proxy := httptest.NewServer(proxyHandler) + defer proxy.Close() + + ws, err := websocket.Dial("ws://"+proxy.Listener.Addr().String()+"/some/path", "", "http://127.0.0.1/") + if err != nil { + t.Fatalf("websocket dial err: %s", err) + } + defer ws.Close() + + if _, err := ws.Write([]byte("world")); err != nil { + t.Fatalf("write err: %s", err) + } + + response := make([]byte, 20) + n, err := ws.Read(response) + if err != nil { + t.Fatalf("read err: %s", err) + } + if e, a := "hello world", string(response[0:n]); e != a { + t.Fatalf("expected '%#v', got '%#v'", e, a) + } +} diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index 90ba972851..a3ab8e7edc 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "net/url" + "path" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -39,10 +40,13 @@ import ( // PodStorage includes storage for pods and all sub resources type PodStorage struct { - Pod *REST - Binding *BindingREST - Status *StatusREST - Log *LogREST + Pod *REST + Binding *BindingREST + Status *StatusREST + Log *LogREST + Proxy *ProxyREST + Exec *ExecREST + PortForward *PortForwardREST } // REST implements a RESTStorage for pods against etcd @@ -85,10 +89,13 @@ func NewStorage(h tools.EtcdHelper, k client.ConnectionInfoGetter) PodStorage { statusStore.UpdateStrategy = pod.StatusStrategy return PodStorage{ - Pod: &REST{*store}, - Binding: &BindingREST{store: store}, - Status: &StatusREST{store: &statusStore}, - Log: &LogREST{store: store, kubeletConn: k}, + Pod: &REST{*store}, + Binding: &BindingREST{store: store}, + Status: &StatusREST{store: &statusStore}, + Log: &LogREST{store: store, kubeletConn: k}, + Proxy: &ProxyREST{store: store}, + Exec: &ExecREST{store: store, kubeletConn: k}, + PortForward: &PortForwardREST{store: store, kubeletConn: k}, } } @@ -193,6 +200,9 @@ func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object return r.store.Update(ctx, obj) } +// Implement GetterWithOptions +var _ = rest.GetterWithOptions(&LogREST{}) + // LogREST implements the log endpoint for a Pod type LogREST struct { store *etcdgeneric.Etcd @@ -204,7 +214,8 @@ var _ = rest.GetterWithOptions(&LogREST{}) // New creates a new Pod log options object func (r *LogREST) New() runtime.Object { - return &api.PodLogOptions{} + // TODO - return a resource that represents a log + return &api.Pod{} } // Get retrieves a runtime.Object that will stream the contents of the pod log @@ -229,3 +240,114 @@ func (r *LogREST) Get(ctx api.Context, name string, opts runtime.Object) (runtim func (r *LogREST) NewGetOptions() (runtime.Object, bool, string) { return &api.PodLogOptions{}, false, "" } + +// ProxyREST implements the proxy subresource for a Pod +type ProxyREST struct { + store *etcdgeneric.Etcd +} + +// Implement Connecter +var _ = rest.Connecter(&ProxyREST{}) + +var proxyMethods = []string{"GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS"} + +// New returns an empty pod resource +func (r *ProxyREST) New() runtime.Object { + return &api.Pod{} +} + +// ConnectMethods returns the list of HTTP methods that can be proxied +func (r *ProxyREST) ConnectMethods() []string { + return proxyMethods +} + +// NewConnectOptions returns versioned resource that represents proxy parameters +func (r *ProxyREST) NewConnectOptions() (runtime.Object, bool, string) { + return &api.PodProxyOptions{}, true, "path" +} + +// Connect returns a handler for the pod proxy +func (r *ProxyREST) Connect(ctx api.Context, id string, opts runtime.Object) (rest.ConnectHandler, error) { + proxyOpts, ok := opts.(*api.PodProxyOptions) + if !ok { + return nil, fmt.Errorf("Invalid options object: %#v", opts) + } + location, _, err := pod.ResourceLocation(r.store, ctx, id) + if err != nil { + return nil, err + } + location.Path = path.Join(location.Path, proxyOpts.Path) + return genericrest.NewUpgradeAwareProxyHandler(location, nil, false), nil +} + +var upgradeableMethods = []string{"GET"} + +// ExecREST implements the exec subresource for a Pod +type ExecREST struct { + store *etcdgeneric.Etcd + kubeletConn client.ConnectionInfoGetter +} + +// Implement Connecter +var _ = rest.Connecter(&ExecREST{}) + +// New creates a new Pod object +func (r *ExecREST) New() runtime.Object { + return &api.Pod{} +} + +// Connect returns a handler for the pod exec proxy +func (r *ExecREST) Connect(ctx api.Context, name string, opts runtime.Object) (rest.ConnectHandler, error) { + execOpts, ok := opts.(*api.PodExecOptions) + if !ok { + return nil, fmt.Errorf("Invalid options object: %#v", opts) + } + location, transport, err := pod.ExecLocation(r.store, r.kubeletConn, ctx, name, execOpts) + if err != nil { + return nil, err + } + return genericrest.NewUpgradeAwareProxyHandler(location, transport, true), nil +} + +// NewConnectOptions returns the versioned object that represents exec parameters +func (r *ExecREST) NewConnectOptions() (runtime.Object, bool, string) { + return &api.PodExecOptions{}, false, "" +} + +// ConnectMethods returns the methods supported by exec +func (r *ExecREST) ConnectMethods() []string { + return upgradeableMethods +} + +// PortForwardREST implements the portforward subresource for a Pod +type PortForwardREST struct { + store *etcdgeneric.Etcd + kubeletConn client.ConnectionInfoGetter +} + +// Implement Connecter +var _ = rest.Connecter(&PortForwardREST{}) + +// New returns an empty pod object +func (r *PortForwardREST) New() runtime.Object { + return &api.Pod{} +} + +// NewConnectOptions returns nil since portforward doesn't take additional parameters +func (r *PortForwardREST) NewConnectOptions() (runtime.Object, bool, string) { + return nil, false, "" +} + +// ConnectMethods returns the methods supported by portforward +func (r *PortForwardREST) ConnectMethods() []string { + return upgradeableMethods +} + +// Connect returns a handler for the pod portforward proxy +func (r *PortForwardREST) Connect(ctx api.Context, name string, opts runtime.Object) (rest.ConnectHandler, error) { + location, transport, err := pod.PortForwardLocation(r.store, r.kubeletConn, ctx, name) + if err != nil { + return nil, err + } + return genericrest.NewUpgradeAwareProxyHandler(location, transport, true), nil +} diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index 6d70f66ce6..5f81e9c843 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -202,7 +202,7 @@ func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ct if len(pod.Spec.Containers) == 1 { container = pod.Spec.Containers[0].Name } else { - return nil, nil, fmt.Errorf("a container name must be specified for pod %s", name) + return nil, nil, errors.NewBadRequest(fmt.Sprintf("a container name must be specified for pod %s", name)) } } nodeHost := pod.Status.HostIP @@ -226,3 +226,78 @@ func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ct } return loc, nodeTransport, nil } + +// ExecLocation returns the exec URL for a pod container. If opts.Container is blank +// and only one container is present in the pod, that container is used. +func ExecLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodExecOptions) (*url.URL, http.RoundTripper, error) { + + pod, err := getPod(getter, ctx, name) + if err != nil { + return nil, nil, err + } + + // Try to figure out a container + container := opts.Container + if container == "" { + if len(pod.Spec.Containers) == 1 { + container = pod.Spec.Containers[0].Name + } else { + return nil, nil, errors.NewBadRequest(fmt.Sprintf("a container name must be specified for pod %s", name)) + } + } + nodeHost := pod.Status.HostIP + if len(nodeHost) == 0 { + // If pod has not been assigned a host, return an empty location + return nil, nil, fmt.Errorf("pod %s does not have a host assigned", name) + } + nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(nodeHost) + if err != nil { + return nil, nil, err + } + params := url.Values{} + if opts.Stdin { + params.Add(api.ExecStdinParam, "1") + } + if opts.Stdout { + params.Add(api.ExecStdoutParam, "1") + } + if opts.Stderr { + params.Add(api.ExecStderrParam, "1") + } + if opts.TTY { + params.Add(api.ExecTTYParam, "1") + } + params.Add("command", opts.Command) + loc := &url.URL{ + Scheme: nodeScheme, + Host: fmt.Sprintf("%s:%d", nodeHost, nodePort), + Path: fmt.Sprintf("/exec/%s/%s/%s", pod.Namespace, name, container), + RawQuery: params.Encode(), + } + return loc, nodeTransport, nil +} + +// PortForwardLocation returns a the port-forward URL for a pod. +func PortForwardLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string) (*url.URL, http.RoundTripper, error) { + + pod, err := getPod(getter, ctx, name) + if err != nil { + return nil, nil, err + } + + nodeHost := pod.Status.HostIP + if len(nodeHost) == 0 { + // If pod has not been assigned a host, return an empty location + return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name)) + } + nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(nodeHost) + if err != nil { + return nil, nil, err + } + loc := &url.URL{ + Scheme: nodeScheme, + Host: fmt.Sprintf("%s:%d", nodeHost, nodePort), + Path: fmt.Sprintf("/portForward/%s/%s", pod.Namespace, name), + } + return loc, nodeTransport, nil +} diff --git a/pkg/util/proxy/doc.go b/pkg/util/proxy/doc.go new file mode 100644 index 0000000000..9001817373 --- /dev/null +++ b/pkg/util/proxy/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package proxy provides transport and upgrade support for proxies +package proxy diff --git a/pkg/util/proxy/transport.go b/pkg/util/proxy/transport.go new file mode 100644 index 0000000000..bf8525c57e --- /dev/null +++ b/pkg/util/proxy/transport.go @@ -0,0 +1,217 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "path" + "strings" + + "github.com/golang/glog" + "golang.org/x/net/html" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// tagsToAttrs states which attributes of which tags require URL substitution. +// Sources: http://www.w3.org/TR/REC-html40/index/attributes.html +// http://www.w3.org/html/wg/drafts/html/master/index.html#attributes-1 +var tagsToAttrs = map[string]util.StringSet{ + "a": util.NewStringSet("href"), + "applet": util.NewStringSet("codebase"), + "area": util.NewStringSet("href"), + "audio": util.NewStringSet("src"), + "base": util.NewStringSet("href"), + "blockquote": util.NewStringSet("cite"), + "body": util.NewStringSet("background"), + "button": util.NewStringSet("formaction"), + "command": util.NewStringSet("icon"), + "del": util.NewStringSet("cite"), + "embed": util.NewStringSet("src"), + "form": util.NewStringSet("action"), + "frame": util.NewStringSet("longdesc", "src"), + "head": util.NewStringSet("profile"), + "html": util.NewStringSet("manifest"), + "iframe": util.NewStringSet("longdesc", "src"), + "img": util.NewStringSet("longdesc", "src", "usemap"), + "input": util.NewStringSet("src", "usemap", "formaction"), + "ins": util.NewStringSet("cite"), + "link": util.NewStringSet("href"), + "object": util.NewStringSet("classid", "codebase", "data", "usemap"), + "q": util.NewStringSet("cite"), + "script": util.NewStringSet("src"), + "source": util.NewStringSet("src"), + "video": util.NewStringSet("poster", "src"), + + // TODO: css URLs hidden in style elements. +} + +// Transport is a transport for text/html content that replaces URLs in html +// content with the prefix of the proxy server +type Transport struct { + Scheme string + Host string + PathPrepend string +} + +// RoundTrip implements the http.RoundTripper interface +func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { + // Add reverse proxy headers. + req.Header.Set("X-Forwarded-Uri", t.PathPrepend+req.URL.Path) + req.Header.Set("X-Forwarded-Host", t.Host) + req.Header.Set("X-Forwarded-Proto", t.Scheme) + + resp, err := http.DefaultTransport.RoundTrip(req) + + if err != nil { + message := fmt.Sprintf("Error: '%s'\nTrying to reach: '%v'", err.Error(), req.URL.String()) + resp = &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Body: ioutil.NopCloser(strings.NewReader(message)), + } + return resp, nil + } + + if redirect := resp.Header.Get("Location"); redirect != "" { + resp.Header.Set("Location", t.rewriteURL(redirect, req.URL)) + return resp, nil + } + + cType := resp.Header.Get("Content-Type") + cType = strings.TrimSpace(strings.SplitN(cType, ";", 2)[0]) + if cType != "text/html" { + // Do nothing, simply pass through + return resp, nil + } + + return t.fixLinks(req, resp) +} + +// rewriteURL rewrites a single URL to go through the proxy, if the URL refers +// to the same host as sourceURL, which is the page on which the target URL +// occurred. If any error occurs (e.g. parsing), it returns targetURL. +func (t *Transport) rewriteURL(targetURL string, sourceURL *url.URL) string { + url, err := url.Parse(targetURL) + if err != nil { + return targetURL + } + if url.Host != "" && url.Host != sourceURL.Host { + return targetURL + } + + url.Scheme = t.Scheme + url.Host = t.Host + origPath := url.Path + + if strings.HasPrefix(url.Path, "/") { + // The path is rooted at the host. Just add proxy prepend. + url.Path = path.Join(t.PathPrepend, url.Path) + } else { + // The path is relative to sourceURL. + url.Path = path.Join(t.PathPrepend, path.Dir(sourceURL.Path), url.Path) + } + + if strings.HasSuffix(origPath, "/") { + // Add back the trailing slash, which was stripped by path.Join(). + url.Path += "/" + } + + return url.String() +} + +// updateURLs checks and updates any of n's attributes that are listed in tagsToAttrs. +// Any URLs found are, if they're relative, updated with the necessary changes to make +// a visit to that URL also go through the proxy. +// sourceURL is the URL of the page which we're currently on; it's required to make +// relative links work. +func (t *Transport) updateURLs(n *html.Node, sourceURL *url.URL) { + if n.Type != html.ElementNode { + return + } + attrs, ok := tagsToAttrs[n.Data] + if !ok { + return + } + for i, attr := range n.Attr { + if !attrs.Has(attr.Key) { + continue + } + n.Attr[i].Val = t.rewriteURL(attr.Val, sourceURL) + } +} + +// scan recursively calls f for every n and every subnode of n. +func (t *Transport) scan(n *html.Node, f func(*html.Node)) { + f(n) + for c := n.FirstChild; c != nil; c = c.NextSibling { + t.scan(c, f) + } +} + +// fixLinks modifies links in an HTML file such that they will be redirected through the proxy if needed. +func (t *Transport) fixLinks(req *http.Request, resp *http.Response) (*http.Response, error) { + origBody := resp.Body + defer origBody.Close() + + newContent := &bytes.Buffer{} + var reader io.Reader = origBody + var writer io.Writer = newContent + encoding := resp.Header.Get("Content-Encoding") + switch encoding { + case "gzip": + var err error + reader, err = gzip.NewReader(reader) + if err != nil { + return nil, fmt.Errorf("errorf making gzip reader: %v", err) + } + gzw := gzip.NewWriter(writer) + defer gzw.Close() + writer = gzw + // TODO: support flate, other encodings. + case "": + // This is fine + default: + // Some encoding we don't understand-- don't try to parse this + glog.Errorf("Proxy encountered encoding %v for text/html; can't understand this so not fixing links.", encoding) + return resp, nil + } + + doc, err := html.Parse(reader) + if err != nil { + glog.Errorf("Parse failed: %v", err) + return resp, err + } + + t.scan(doc, func(n *html.Node) { t.updateURLs(n, req.URL) }) + if err := html.Render(writer, doc); err != nil { + glog.Errorf("Failed to render: %v", err) + } + + resp.Body = ioutil.NopCloser(newContent) + // Update header node with new content-length + // TODO: Remove any hash/signature headers here? + resp.Header.Del("Content-Length") + resp.ContentLength = int64(newContent.Len()) + + return resp, err +} diff --git a/pkg/util/proxy/transport_test.go b/pkg/util/proxy/transport_test.go new file mode 100644 index 0000000000..f22b733e2f --- /dev/null +++ b/pkg/util/proxy/transport_test.go @@ -0,0 +1,227 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + + "golang.org/x/net/html" +) + +func parseURLOrDie(inURL string) *url.URL { + parsed, err := url.Parse(inURL) + if err != nil { + panic(err) + } + return parsed +} + +// fmtHTML parses and re-emits 'in', effectively canonicalizing it. +func fmtHTML(in string) string { + doc, err := html.Parse(strings.NewReader(in)) + if err != nil { + panic(err) + } + out := &bytes.Buffer{} + if err := html.Render(out, doc); err != nil { + panic(err) + } + return string(out.Bytes()) +} + +func TestProxyTransport(t *testing.T) { + testTransport := &Transport{ + Scheme: "http", + Host: "foo.com", + PathPrepend: "/proxy/minion/minion1:10250", + } + testTransport2 := &Transport{ + Scheme: "https", + Host: "foo.com", + PathPrepend: "/proxy/minion/minion1:8080", + } + type Item struct { + input string + sourceURL string + transport *Transport + output string + contentType string + forwardedURI string + redirect string + redirectWant string + } + + table := map[string]Item{ + "normal": { + input: `
kubelet.loggoogle.log`, + sourceURL: "http://myminion.com/logs/log.log", + transport: testTransport, + output: `
kubelet.loggoogle.log`, + contentType: "text/html", + forwardedURI: "/proxy/minion/minion1:10250/logs/log.log", + }, + "trailing slash": { + input: `
kubelet.loggoogle.log`, + sourceURL: "http://myminion.com/logs/log.log", + transport: testTransport, + output: `
kubelet.loggoogle.log`, + contentType: "text/html", + forwardedURI: "/proxy/minion/minion1:10250/logs/log.log", + }, + "content-type charset": { + input: `
kubelet.loggoogle.log`, + sourceURL: "http://myminion.com/logs/log.log", + transport: testTransport, + output: `
kubelet.loggoogle.log`, + contentType: "text/html; charset=utf-8", + forwardedURI: "/proxy/minion/minion1:10250/logs/log.log", + }, + "content-type passthrough": { + input: `
kubelet.loggoogle.log`, + sourceURL: "http://myminion.com/logs/log.log", + transport: testTransport, + output: `
kubelet.loggoogle.log`, + contentType: "text/plain", + forwardedURI: "/proxy/minion/minion1:10250/logs/log.log", + }, + "subdir": { + input: `kubelet.loggoogle.log`, + sourceURL: "http://myminion.com/whatever/apt/somelog.log", + transport: testTransport2, + output: `kubelet.loggoogle.log`, + contentType: "text/html", + forwardedURI: "/proxy/minion/minion1:8080/whatever/apt/somelog.log", + }, + "image": { + input: `
`, + sourceURL: "http://myminion.com/", + transport: testTransport, + output: `
`, + contentType: "text/html", + forwardedURI: "/proxy/minion/minion1:10250/", + }, + "abs": { + input: ``, + sourceURL: "http://myminion.com/any/path/", + transport: testTransport, + output: ``, + contentType: "text/html", + forwardedURI: "/proxy/minion/minion1:10250/any/path/", + }, + "abs but same host": { + input: ``, + sourceURL: "http://myminion.com/any/path/", + transport: testTransport, + output: ``, + contentType: "text/html", + forwardedURI: "/proxy/minion/minion1:10250/any/path/", + }, + "redirect rel": { + sourceURL: "http://myminion.com/redirect", + transport: testTransport, + redirect: "/redirected/target/", + redirectWant: "http://foo.com/proxy/minion/minion1:10250/redirected/target/", + forwardedURI: "/proxy/minion/minion1:10250/redirect", + }, + "redirect abs same host": { + sourceURL: "http://myminion.com/redirect", + transport: testTransport, + redirect: "http://myminion.com/redirected/target/", + redirectWant: "http://foo.com/proxy/minion/minion1:10250/redirected/target/", + forwardedURI: "/proxy/minion/minion1:10250/redirect", + }, + "redirect abs other host": { + sourceURL: "http://myminion.com/redirect", + transport: testTransport, + redirect: "http://example.com/redirected/target/", + redirectWant: "http://example.com/redirected/target/", + forwardedURI: "/proxy/minion/minion1:10250/redirect", + }, + } + + testItem := func(name string, item *Item) { + // Canonicalize the html so we can diff. + item.input = fmtHTML(item.input) + item.output = fmtHTML(item.output) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check request headers. + if got, want := r.Header.Get("X-Forwarded-Uri"), item.forwardedURI; got != want { + t.Errorf("%v: X-Forwarded-Uri = %q, want %q", name, got, want) + } + if got, want := r.Header.Get("X-Forwarded-Host"), item.transport.Host; got != want { + t.Errorf("%v: X-Forwarded-Host = %q, want %q", name, got, want) + } + if got, want := r.Header.Get("X-Forwarded-Proto"), item.transport.Scheme; got != want { + t.Errorf("%v: X-Forwarded-Proto = %q, want %q", name, got, want) + } + + // Send response. + if item.redirect != "" { + http.Redirect(w, r, item.redirect, http.StatusMovedPermanently) + return + } + w.Header().Set("Content-Type", item.contentType) + fmt.Fprint(w, item.input) + })) + defer server.Close() + + // Replace source URL with our test server address. + sourceURL := parseURLOrDie(item.sourceURL) + serverURL := parseURLOrDie(server.URL) + item.input = strings.Replace(item.input, sourceURL.Host, serverURL.Host, -1) + item.redirect = strings.Replace(item.redirect, sourceURL.Host, serverURL.Host, -1) + sourceURL.Host = serverURL.Host + + req, err := http.NewRequest("GET", sourceURL.String(), nil) + if err != nil { + t.Errorf("%v: Unexpected error: %v", name, err) + return + } + resp, err := item.transport.RoundTrip(req) + if err != nil { + t.Errorf("%v: Unexpected error: %v", name, err) + return + } + if item.redirect != "" { + // Check that redirect URLs get rewritten properly. + if got, want := resp.Header.Get("Location"), item.redirectWant; got != want { + t.Errorf("%v: Location header = %q, want %q", name, got, want) + } + return + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Errorf("%v: Unexpected error: %v", name, err) + return + } + if e, a := item.output, string(body); e != a { + t.Errorf("%v: expected %v, but got %v", name, e, a) + } + } + + for name, item := range table { + testItem(name, &item) + } +}