From ae584d811420e494c61825d3473bc37b3c455ad9 Mon Sep 17 00:00:00 2001 From: Dominika Hodovska Date: Thu, 4 Aug 2016 08:31:23 +0200 Subject: [PATCH 1/2] kubectl: Convert port-forward cmd to complete/validate/run structure --- pkg/kubectl/cmd/portforward.go | 96 ++++++++++++++++++++--------- pkg/kubectl/cmd/portforward_test.go | 56 +++++++++++++---- 2 files changed, 113 insertions(+), 39 deletions(-) diff --git a/pkg/kubectl/cmd/portforward.go b/pkg/kubectl/cmd/portforward.go index adaa9c9512..2014b62f40 100644 --- a/pkg/kubectl/cmd/portforward.go +++ b/pkg/kubectl/cmd/portforward.go @@ -17,21 +17,32 @@ limitations under the License. package cmd import ( + "fmt" "io" "net/url" "os" "os/signal" - "github.com/golang/glog" "github.com/renstrom/dedent" "github.com/spf13/cobra" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/restclient" + client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/portforward" "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" ) +// PortForwardOptions contains all the options for running the port-forward cli command. +type PortForwardOptions struct { + Namespace string + PodName string + Config *restclient.Config + Client *client.Client + Ports []string + PortForwarder portForwarder +} + var ( portforward_example = dedent.Dedent(` # Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in the pod @@ -48,18 +59,27 @@ var ( ) func NewCmdPortForward(f *cmdutil.Factory, cmdOut, cmdErr io.Writer) *cobra.Command { + opts := &PortForwardOptions{ + PortForwarder: &defaultPortForwarder{ + cmdOut: cmdOut, + cmdErr: cmdErr, + }, + } cmd := &cobra.Command{ Use: "port-forward POD [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]", Short: "Forward one or more local ports to a pod", Long: "Forward one or more local ports to a pod.", Example: portforward_example, Run: func(cmd *cobra.Command, args []string) { - pf := &defaultPortForwarder{ - cmdOut: cmdOut, - cmdErr: cmdErr, + if err := opts.Complete(f, cmd, args, cmdOut, cmdErr); err != nil { + cmdutil.CheckErr(err) + } + if err := opts.Validate(); err != nil { + cmdutil.CheckErr(cmdutil.UsageError(cmd, err.Error())) + } + if err := opts.RunPortForward(); err != nil { + cmdutil.CheckErr(err) } - err := RunPortForward(f, cmd, args, pf) - cmdutil.CheckErr(err) }, } cmd.Flags().StringP("pod", "p", "", "Pod name") @@ -87,45 +107,65 @@ func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, config return fw.ForwardPorts() } -func RunPortForward(f *cmdutil.Factory, cmd *cobra.Command, args []string, fw portForwarder) error { - podName := cmdutil.GetFlagString(cmd, "pod") - if len(podName) == 0 && len(args) == 0 { +// Complete completes all the required options for port-forward cmd. +func (o *PortForwardOptions) Complete(f *cmdutil.Factory, cmd *cobra.Command, args []string, cmdOut io.Writer, cmdErr io.Writer) error { + var err error + o.PodName = cmdutil.GetFlagString(cmd, "pod") + if len(o.PodName) == 0 && len(args) == 0 { return cmdutil.UsageError(cmd, "POD is required for port-forward") } - if len(podName) != 0 { + if len(o.PodName) != 0 { printDeprecationWarning("port-forward POD", "-p POD") + o.Ports = args } else { - podName = args[0] - args = args[1:] + o.PodName = args[0] + o.Ports = args[1:] } - if len(args) < 1 { - return cmdutil.UsageError(cmd, "at least 1 PORT is required for port-forward") - } - - namespace, _, err := f.DefaultNamespace() + o.Namespace, _, err = f.DefaultNamespace() if err != nil { return err } - client, err := f.Client() + o.Client, err = f.Client() if err != nil { return err } - pod, err := client.Pods(namespace).Get(podName) + o.Config, err = f.ClientConfig() + if err != nil { + return err + } + + return nil +} + +// Validate validates all the required options for port-forward cmd. +func (o PortForwardOptions) Validate() error { + if len(o.PodName) == 0 { + return fmt.Errorf("pod name must be specified") + } + + if len(o.Ports) < 1 { + return fmt.Errorf("at least 1 PORT is required for port-forward") + } + + if o.PortForwarder == nil || o.Client == nil || o.Config == nil { + return fmt.Errorf("client, client config, and portforwarder must be provided") + } + return nil +} + +// RunPortForward implements all the necessary functionality for port-forward cmd. +func (o PortForwardOptions) RunPortForward() error { + pod, err := o.Client.Pods(o.Namespace).Get(o.PodName) if err != nil { return err } if pod.Status.Phase != api.PodRunning { - glog.Fatalf("Unable to execute command because pod is not running. Current status=%v", pod.Status.Phase) - } - - config, err := f.ClientConfig() - if err != nil { - return err + return fmt.Errorf("Unable to execute command because pod is not running. Current status=%v", pod.Status.Phase) } signals := make(chan os.Signal, 1) @@ -138,11 +178,11 @@ func RunPortForward(f *cmdutil.Factory, cmd *cobra.Command, args []string, fw po close(stopCh) }() - req := client.RESTClient.Post(). + req := o.Client.RESTClient.Post(). Resource("pods"). - Namespace(namespace). + Namespace(o.Namespace). Name(pod.Name). SubResource("portforward") - return fw.ForwardPorts("POST", req.URL(), config, args, stopCh) + return o.PortForwarder.ForwardPorts("POST", req.URL(), o.Config, o.Ports, stopCh) } diff --git a/pkg/kubectl/cmd/portforward_test.go b/pkg/kubectl/cmd/portforward_test.go index 7e4195cbbc..888c32b96c 100644 --- a/pkg/kubectl/cmd/portforward_test.go +++ b/pkg/kubectl/cmd/portforward_test.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "net/url" + "os" "testing" "github.com/spf13/cobra" @@ -68,6 +69,7 @@ func TestPortForward(t *testing.T) { }, } for _, test := range tests { + var err error f, tf, codec, ns := NewAPIFactory() tf.Client = &fake.RESTClient{ NegotiatedSerializer: ns, @@ -89,9 +91,21 @@ func TestPortForward(t *testing.T) { if test.pfErr { ff.pfErr = fmt.Errorf("pf error") } - cmd := &cobra.Command{} - cmd.Flags().StringP("pod", "p", "", "Pod name") - err := RunPortForward(f, cmd, []string{"foo", ":5000", ":1000"}, ff) + + opts := &PortForwardOptions{} + cmd := NewCmdPortForward(f, os.Stdout, os.Stderr) + cmd.Run = func(cmd *cobra.Command, args []string) { + if err = opts.Complete(f, cmd, args, os.Stdout, os.Stderr); err != nil { + return + } + opts.PortForwarder = ff + if err = opts.Validate(); err != nil { + return + } + err = opts.RunPortForward() + } + + cmd.Run(cmd, []string{"foo", ":5000", ":1000"}) if test.pfErr && err != ff.pfErr { t.Errorf("%s: Unexpected exec error: %v", test.name, err) @@ -109,7 +123,6 @@ func TestPortForward(t *testing.T) { if ff.method != "POST" { t.Errorf("%s: Did not get method for attach request: %s", test.name, ff.method) } - } } @@ -138,6 +151,7 @@ func TestPortForwardWithPFlag(t *testing.T) { }, } for _, test := range tests { + var err error f, tf, codec, ns := NewAPIFactory() tf.Client = &fake.RESTClient{ NegotiatedSerializer: ns, @@ -159,18 +173,38 @@ func TestPortForwardWithPFlag(t *testing.T) { if test.pfErr { ff.pfErr = fmt.Errorf("pf error") } - cmd := &cobra.Command{} - podPtr := cmd.Flags().StringP("pod", "p", "", "Pod name") - *podPtr = "foo" - err := RunPortForward(f, cmd, []string{":5000", ":1000"}, ff) + + opts := &PortForwardOptions{} + cmd := NewCmdPortForward(f, os.Stdout, os.Stderr) + cmd.Run = func(cmd *cobra.Command, args []string) { + if err = opts.Complete(f, cmd, args, os.Stdout, os.Stderr); err != nil { + return + } + opts.PortForwarder = ff + if err = opts.Validate(); err != nil { + return + } + err = opts.RunPortForward() + } + cmd.Flags().Set("pod", "foo") + + cmd.Run(cmd, []string{":5000", ":1000"}) + if test.pfErr && err != ff.pfErr { t.Errorf("%s: Unexpected exec error: %v", test.name, err) } - if !test.pfErr && ff.url.Path != test.pfPath { - t.Errorf("%s: Did not get expected path for portforward request", test.name) - } if !test.pfErr && err != nil { t.Errorf("%s: Unexpected error: %v", test.name, err) } + if test.pfErr { + continue + } + + if ff.url.Path != test.pfPath { + t.Errorf("%s: Did not get expected path for portforward request", test.name) + } + if ff.method != "POST" { + t.Errorf("%s: Did not get method for attach request: %s", test.name, ff.method) + } } } From c5babe239614fa96e5458af7c87fd54e98ac01a2 Mon Sep 17 00:00:00 2001 From: Dominika Hodovska Date: Mon, 8 Aug 2016 14:31:15 +0200 Subject: [PATCH 2/2] expose ready/stop channel --- .../unversioned/portforward/portforward.go | 8 ++++--- .../portforward/portforward_test.go | 12 +++++++---- pkg/kubectl/cmd/portforward.go | 21 ++++++++++++------- pkg/kubectl/cmd/portforward_test.go | 6 +++--- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/pkg/client/unversioned/portforward/portforward.go b/pkg/client/unversioned/portforward/portforward.go index 6ecedb6df9..d746125d0f 100644 --- a/pkg/client/unversioned/portforward/portforward.go +++ b/pkg/client/unversioned/portforward/portforward.go @@ -108,7 +108,7 @@ func parsePorts(ports []string) ([]ForwardedPort, error) { } // New creates a new PortForwarder. -func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, out, errOut io.Writer) (*PortForwarder, error) { +func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) { if len(ports) == 0 { return nil, errors.New("You must specify at least 1 port") } @@ -120,7 +120,7 @@ func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, out dialer: dialer, ports: parsedPorts, stopChan: stopChan, - Ready: make(chan struct{}), + Ready: readyChan, out: out, errOut: errOut, }, nil @@ -164,7 +164,9 @@ func (pf *PortForwarder) forward() error { return fmt.Errorf("Unable to listen on any of the requested ports: %v", pf.ports) } - close(pf.Ready) + if pf.Ready != nil { + close(pf.Ready) + } // wait for interrupt or conn closure select { diff --git a/pkg/client/unversioned/portforward/portforward_test.go b/pkg/client/unversioned/portforward/portforward_test.go index 6bc49a902f..4bdd222c08 100644 --- a/pkg/client/unversioned/portforward/portforward_test.go +++ b/pkg/client/unversioned/portforward/portforward_test.go @@ -88,7 +88,8 @@ func TestParsePortsAndNew(t *testing.T) { dialer := &fakeDialer{} expectedStopChan := make(chan struct{}) - pf, err := New(dialer, test.input, expectedStopChan, os.Stdout, os.Stderr) + readyChan := make(chan struct{}) + pf, err := New(dialer, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr) haveError = err != nil if e, a := test.expectNewError, haveError; e != a { t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err) @@ -305,8 +306,9 @@ func TestForwardPorts(t *testing.T) { } stopChan := make(chan struct{}, 1) + readyChan := make(chan struct{}) - pf, err := New(exec, test.ports, stopChan, os.Stdout, os.Stderr) + pf, err := New(exec, test.ports, stopChan, readyChan, os.Stdout, os.Stderr) if err != nil { t.Fatalf("%s: unexpected error calling New: %v", testName, err) } @@ -375,8 +377,9 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) { stopChan1 := make(chan struct{}, 1) defer close(stopChan1) + readyChan1 := make(chan struct{}) - pf1, err := New(exec, []string{"5555"}, stopChan1, os.Stdout, os.Stderr) + pf1, err := New(exec, []string{"5555"}, stopChan1, readyChan1, os.Stdout, os.Stderr) if err != nil { t.Fatalf("error creating pf1: %v", err) } @@ -384,7 +387,8 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) { <-pf1.Ready stopChan2 := make(chan struct{}, 1) - pf2, err := New(exec, []string{"5555"}, stopChan2, os.Stdout, os.Stderr) + readyChan2 := make(chan struct{}) + pf2, err := New(exec, []string{"5555"}, stopChan2, readyChan2, os.Stdout, os.Stderr) if err != nil { t.Fatalf("error creating pf2: %v", err) } diff --git a/pkg/kubectl/cmd/portforward.go b/pkg/kubectl/cmd/portforward.go index 2014b62f40..fae3758474 100644 --- a/pkg/kubectl/cmd/portforward.go +++ b/pkg/kubectl/cmd/portforward.go @@ -41,6 +41,8 @@ type PortForwardOptions struct { Client *client.Client Ports []string PortForwarder portForwarder + StopChannel chan struct{} + ReadyChannel chan struct{} } var ( @@ -88,19 +90,19 @@ func NewCmdPortForward(f *cmdutil.Factory, cmdOut, cmdErr io.Writer) *cobra.Comm } type portForwarder interface { - ForwardPorts(method string, url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error + ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error } type defaultPortForwarder struct { cmdOut, cmdErr io.Writer } -func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error { - dialer, err := remotecommand.NewExecutor(config, method, url) +func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error { + dialer, err := remotecommand.NewExecutor(opts.Config, method, url) if err != nil { return err } - fw, err := portforward.New(dialer, ports, stopChan, f.cmdOut, f.cmdErr) + fw, err := portforward.New(dialer, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.cmdOut, f.cmdErr) if err != nil { return err } @@ -138,6 +140,8 @@ func (o *PortForwardOptions) Complete(f *cmdutil.Factory, cmd *cobra.Command, ar return err } + o.StopChannel = make(chan struct{}, 1) + o.ReadyChannel = make(chan struct{}) return nil } @@ -165,17 +169,18 @@ func (o PortForwardOptions) RunPortForward() error { } if pod.Status.Phase != api.PodRunning { - return fmt.Errorf("Unable to execute command because pod is not running. Current status=%v", pod.Status.Phase) + return fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) defer signal.Stop(signals) - stopCh := make(chan struct{}, 1) go func() { <-signals - close(stopCh) + if o.StopChannel != nil { + close(o.StopChannel) + } }() req := o.Client.RESTClient.Post(). @@ -184,5 +189,5 @@ func (o PortForwardOptions) RunPortForward() error { Name(pod.Name). SubResource("portforward") - return o.PortForwarder.ForwardPorts("POST", req.URL(), o.Config, o.Ports, stopCh) + return o.PortForwarder.ForwardPorts("POST", req.URL(), o) } diff --git a/pkg/kubectl/cmd/portforward_test.go b/pkg/kubectl/cmd/portforward_test.go index 888c32b96c..45c4597367 100644 --- a/pkg/kubectl/cmd/portforward_test.go +++ b/pkg/kubectl/cmd/portforward_test.go @@ -38,7 +38,7 @@ type fakePortForwarder struct { pfErr error } -func (f *fakePortForwarder) ForwardPorts(method string, url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error { +func (f *fakePortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error { f.method = method f.url = url return f.pfErr @@ -108,7 +108,7 @@ func TestPortForward(t *testing.T) { cmd.Run(cmd, []string{"foo", ":5000", ":1000"}) if test.pfErr && err != ff.pfErr { - t.Errorf("%s: Unexpected exec error: %v", test.name, err) + t.Errorf("%s: Unexpected port-forward error: %v", test.name, err) } if !test.pfErr && err != nil { t.Errorf("%s: Unexpected error: %v", test.name, err) @@ -191,7 +191,7 @@ func TestPortForwardWithPFlag(t *testing.T) { cmd.Run(cmd, []string{":5000", ":1000"}) if test.pfErr && err != ff.pfErr { - t.Errorf("%s: Unexpected exec error: %v", test.name, err) + t.Errorf("%s: Unexpected port-forward error: %v", test.name, err) } if !test.pfErr && err != nil { t.Errorf("%s: Unexpected error: %v", test.name, err)