From cc16d64368be028b1605ba09f20dbdc862fe24d9 Mon Sep 17 00:00:00 2001 From: Di Xu Date: Thu, 1 Feb 2018 15:29:23 +0800 Subject: [PATCH] exit kube-proxy when configuration file changes --- cmd/kube-proxy/app/BUILD | 2 + cmd/kube-proxy/app/server.go | 76 ++++++++++++++++- cmd/kube-proxy/app/server_test.go | 134 ++++++++++++++++++++++++++++++ 3 files changed, 209 insertions(+), 3 deletions(-) diff --git a/cmd/kube-proxy/app/BUILD b/cmd/kube-proxy/app/BUILD index acc9a87d6c..678e1f4057 100644 --- a/cmd/kube-proxy/app/BUILD +++ b/cmd/kube-proxy/app/BUILD @@ -31,6 +31,7 @@ go_library( "//pkg/proxy/ipvs:go_default_library", "//pkg/proxy/userspace:go_default_library", "//pkg/util/configz:go_default_library", + "//pkg/util/filesystem:go_default_library", "//pkg/util/flag:go_default_library", "//pkg/util/ipset:go_default_library", "//pkg/util/iptables:go_default_library", @@ -62,6 +63,7 @@ go_library( "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/component-base/config:go_default_library", "//staging/src/k8s.io/kube-proxy/config/v1alpha1:go_default_library", + "//vendor/github.com/fsnotify/fsnotify:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/github.com/spf13/cobra:go_default_library", "//vendor/github.com/spf13/pflag:go_default_library", diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 9074b7f99e..dfd912722b 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -62,6 +62,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/ipvs" "k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/util/configz" + "k8s.io/kubernetes/pkg/util/filesystem" utilflag "k8s.io/kubernetes/pkg/util/flag" utilipset "k8s.io/kubernetes/pkg/util/ipset" utiliptables "k8s.io/kubernetes/pkg/util/iptables" @@ -73,6 +74,7 @@ import ( "k8s.io/utils/exec" utilpointer "k8s.io/utils/pointer" + "github.com/fsnotify/fsnotify" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -86,6 +88,11 @@ const ( proxyModeKernelspace = "kernelspace" ) +// proxyRun defines the interface to run a specified ProxyServer +type proxyRun interface { + Run() error +} + // Options contains everything necessary to create and run a proxy server. type Options struct { // ConfigFile is the location of the proxy server's configuration file. @@ -101,6 +108,12 @@ type Options struct { WindowsService bool // config is the proxy server's configuration object. config *kubeproxyconfig.KubeProxyConfiguration + // watcher is used to watch on the update change of ConfigFile + watcher filesystem.FSWatcher + // proxyServer is the interface to run the proxy server + proxyServer proxyRun + // errCh is the channel that errors will be sent + errCh chan error // The fields below here are placeholders for flags that can't be directly mapped into // config.KubeProxyConfiguration. @@ -190,6 +203,7 @@ func NewOptions() *Options { scheme: scheme.Scheme, codecs: scheme.Codecs, CleanupIPVS: true, + errCh: make(chan error), } } @@ -208,6 +222,10 @@ func (o *Options) Complete() error { } else { o.config = c } + + if err := o.initWatcher(); err != nil { + return err + } } if err := o.processHostnameOverrideFlag(); err != nil { @@ -217,10 +235,39 @@ func (o *Options) Complete() error { if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(o.config.FeatureGates); err != nil { return err } - return nil } +// Creates a new filesystem watcher and adds watches for the config file. +func (o *Options) initWatcher() error { + fswatcher := filesystem.NewFsnotifyWatcher() + err := fswatcher.Init(o.eventHandler, o.errorHandler) + if err != nil { + return err + } + err = fswatcher.AddWatch(o.ConfigFile) + if err != nil { + return err + } + o.watcher = fswatcher + return nil +} + +func (o *Options) eventHandler(ent fsnotify.Event) { + eventOpIs := func(Op fsnotify.Op) bool { + return ent.Op&Op == Op + } + if eventOpIs(fsnotify.Write) || eventOpIs(fsnotify.Rename) { + // error out when ConfigFile is updated + o.errCh <- fmt.Errorf("content of the proxy server's configuration file was updated") + } + o.errCh <- nil +} + +func (o *Options) errorHandler(err error) { + o.errCh <- err +} + // processHostnameOverrideFlag processes hostname-override flag func (o *Options) processHostnameOverrideFlag() error { // Check if hostname-override flag is set and use value since configFile always overrides @@ -240,7 +287,6 @@ func (o *Options) Validate(args []string) error { if len(args) != 0 { return errors.New("no arguments are supported") } - if errs := validation.Validate(o.config); len(errs) != 0 { return errs.ToAggregate() } @@ -249,6 +295,7 @@ func (o *Options) Validate(args []string) error { } func (o *Options) Run() error { + defer close(o.errCh) if len(o.WriteConfigTo) > 0 { return o.writeConfigFile() } @@ -257,8 +304,31 @@ func (o *Options) Run() error { if err != nil { return err } + o.proxyServer = proxyServer + return o.runLoop() +} - return proxyServer.Run() +// runLoop will watch on the update change of the proxy server's configuration file. +// Return an error when updated +func (o *Options) runLoop() error { + if o.watcher != nil { + o.watcher.Run() + } + + // run the proxy in goroutine + go func() { + err := o.proxyServer.Run() + o.errCh <- err + }() + + for { + select { + case err := <-o.errCh: + if err != nil { + return err + } + } + } } func (o *Options) writeConfigFile() error { diff --git a/cmd/kube-proxy/app/server_test.go b/cmd/kube-proxy/app/server_test.go index b45066f86f..e07cd18b4b 100644 --- a/cmd/kube-proxy/app/server_test.go +++ b/cmd/kube-proxy/app/server_test.go @@ -18,6 +18,8 @@ package app import ( "fmt" + "io/ioutil" + "os" "reflect" "runtime" "strings" @@ -410,3 +412,135 @@ func TestProcessHostnameOverrideFlag(t *testing.T) { }) } } + +func TestConfigChange(t *testing.T) { + setUp := func() (*os.File, string, error) { + tempDir := os.TempDir() + file, err := ioutil.TempFile(tempDir, "kube-proxy-config-") + if err != nil { + return nil, "", fmt.Errorf("unexpected error when creating temp file: %v", err) + } + + _, err = file.WriteString(`apiVersion: kubeproxy.config.k8s.io/v1alpha1 +bindAddress: 0.0.0.0 +clientConnection: + acceptContentTypes: "" + burst: 10 + contentType: application/vnd.kubernetes.protobuf + kubeconfig: /var/lib/kube-proxy/kubeconfig.conf + qps: 5 +clusterCIDR: 10.244.0.0/16 +configSyncPeriod: 15m0s +conntrack: + max: null + maxPerCore: 32768 + min: 131072 + tcpCloseWaitTimeout: 1h0m0s + tcpEstablishedTimeout: 24h0m0s +enableProfiling: false +healthzBindAddress: 0.0.0.0:10256 +hostnameOverride: "" +iptables: + masqueradeAll: false + masqueradeBit: 14 + minSyncPeriod: 0s + syncPeriod: 30s +ipvs: + excludeCIDRs: null + minSyncPeriod: 0s + scheduler: "" + syncPeriod: 30s +kind: KubeProxyConfiguration +metricsBindAddress: 127.0.0.1:10249 +mode: "" +nodePortAddresses: null +oomScoreAdj: -999 +portRange: "" +resourceContainer: /kube-proxy +udpIdleTimeout: 250ms`) + if err != nil { + return nil, "", fmt.Errorf("unexpected error when writing content to temp kube-proxy config file: %v", err) + } + + return file, tempDir, nil + } + + tearDown := func(file *os.File, tempDir string) { + file.Close() + os.RemoveAll(tempDir) + } + + testCases := []struct { + name string + proxyServer proxyRun + append bool + expectedErr string + }{ + { + name: "update config file", + proxyServer: new(fakeProxyServerLongRun), + append: true, + expectedErr: "content of the proxy server's configuration file was updated", + }, + { + name: "fake error", + proxyServer: new(fakeProxyServerError), + expectedErr: "mocking error from ProxyServer.Run()", + }, + } + + for _, tc := range testCases { + file, tempDir, err := setUp() + if err != nil { + t.Fatalf("unexpected error when setting up environment: %v", err) + } + + opt := NewOptions() + opt.ConfigFile = file.Name() + err = opt.Complete() + if err != nil { + t.Fatal(err) + } + opt.proxyServer = tc.proxyServer + + errCh := make(chan error) + go func() { + errCh <- opt.runLoop() + }() + + if tc.append { + file.WriteString("append fake content") + } + + select { + case err := <-errCh: + if err != nil { + if !strings.Contains(err.Error(), tc.expectedErr) { + t.Errorf("[%s] Expected error containing %v, got %v", tc.name, tc.expectedErr, err) + } + } + case <-time.After(10 * time.Second): + t.Errorf("[%s] Timeout: unable to get any events or internal timeout.", tc.name) + } + tearDown(file, tempDir) + } +} + +type fakeProxyServerLongRun struct{} + +// Run runs the specified ProxyServer. +func (s *fakeProxyServerLongRun) Run() error { + for { + time.Sleep(2 * time.Second) + } +} + +type fakeProxyServerError struct{} + +// Run runs the specified ProxyServer. +func (s *fakeProxyServerError) Run() error { + for { + time.Sleep(2 * time.Second) + return fmt.Errorf("mocking error from ProxyServer.Run()") + } +}