exit kube-proxy when configuration file changes

pull/564/head
Di Xu 2018-02-01 15:29:23 +08:00
parent 6cfd7beb86
commit cc16d64368
3 changed files with 209 additions and 3 deletions

View File

@ -31,6 +31,7 @@ go_library(
"//pkg/proxy/ipvs:go_default_library",
"//pkg/proxy/userspace:go_default_library",
"//pkg/util/configz:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//pkg/util/flag:go_default_library",
"//pkg/util/ipset:go_default_library",
"//pkg/util/iptables:go_default_library",
@ -62,6 +63,7 @@ go_library(
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/component-base/config:go_default_library",
"//staging/src/k8s.io/kube-proxy/config/v1alpha1:go_default_library",
"//vendor/github.com/fsnotify/fsnotify:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",

View File

@ -62,6 +62,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/ipvs"
"k8s.io/kubernetes/pkg/proxy/userspace"
"k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/util/filesystem"
utilflag "k8s.io/kubernetes/pkg/util/flag"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
@ -73,6 +74,7 @@ import (
"k8s.io/utils/exec"
utilpointer "k8s.io/utils/pointer"
"github.com/fsnotify/fsnotify"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
@ -86,6 +88,11 @@ const (
proxyModeKernelspace = "kernelspace"
)
// proxyRun defines the interface to run a specified ProxyServer
type proxyRun interface {
Run() error
}
// Options contains everything necessary to create and run a proxy server.
type Options struct {
// ConfigFile is the location of the proxy server's configuration file.
@ -101,6 +108,12 @@ type Options struct {
WindowsService bool
// config is the proxy server's configuration object.
config *kubeproxyconfig.KubeProxyConfiguration
// watcher is used to watch on the update change of ConfigFile
watcher filesystem.FSWatcher
// proxyServer is the interface to run the proxy server
proxyServer proxyRun
// errCh is the channel that errors will be sent
errCh chan error
// The fields below here are placeholders for flags that can't be directly mapped into
// config.KubeProxyConfiguration.
@ -190,6 +203,7 @@ func NewOptions() *Options {
scheme: scheme.Scheme,
codecs: scheme.Codecs,
CleanupIPVS: true,
errCh: make(chan error),
}
}
@ -208,6 +222,10 @@ func (o *Options) Complete() error {
} else {
o.config = c
}
if err := o.initWatcher(); err != nil {
return err
}
}
if err := o.processHostnameOverrideFlag(); err != nil {
@ -217,10 +235,39 @@ func (o *Options) Complete() error {
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(o.config.FeatureGates); err != nil {
return err
}
return nil
}
// Creates a new filesystem watcher and adds watches for the config file.
func (o *Options) initWatcher() error {
fswatcher := filesystem.NewFsnotifyWatcher()
err := fswatcher.Init(o.eventHandler, o.errorHandler)
if err != nil {
return err
}
err = fswatcher.AddWatch(o.ConfigFile)
if err != nil {
return err
}
o.watcher = fswatcher
return nil
}
func (o *Options) eventHandler(ent fsnotify.Event) {
eventOpIs := func(Op fsnotify.Op) bool {
return ent.Op&Op == Op
}
if eventOpIs(fsnotify.Write) || eventOpIs(fsnotify.Rename) {
// error out when ConfigFile is updated
o.errCh <- fmt.Errorf("content of the proxy server's configuration file was updated")
}
o.errCh <- nil
}
func (o *Options) errorHandler(err error) {
o.errCh <- err
}
// processHostnameOverrideFlag processes hostname-override flag
func (o *Options) processHostnameOverrideFlag() error {
// Check if hostname-override flag is set and use value since configFile always overrides
@ -240,7 +287,6 @@ func (o *Options) Validate(args []string) error {
if len(args) != 0 {
return errors.New("no arguments are supported")
}
if errs := validation.Validate(o.config); len(errs) != 0 {
return errs.ToAggregate()
}
@ -249,6 +295,7 @@ func (o *Options) Validate(args []string) error {
}
func (o *Options) Run() error {
defer close(o.errCh)
if len(o.WriteConfigTo) > 0 {
return o.writeConfigFile()
}
@ -257,8 +304,31 @@ func (o *Options) Run() error {
if err != nil {
return err
}
o.proxyServer = proxyServer
return o.runLoop()
}
return proxyServer.Run()
// runLoop will watch on the update change of the proxy server's configuration file.
// Return an error when updated
func (o *Options) runLoop() error {
if o.watcher != nil {
o.watcher.Run()
}
// run the proxy in goroutine
go func() {
err := o.proxyServer.Run()
o.errCh <- err
}()
for {
select {
case err := <-o.errCh:
if err != nil {
return err
}
}
}
}
func (o *Options) writeConfigFile() error {

View File

@ -18,6 +18,8 @@ package app
import (
"fmt"
"io/ioutil"
"os"
"reflect"
"runtime"
"strings"
@ -410,3 +412,135 @@ func TestProcessHostnameOverrideFlag(t *testing.T) {
})
}
}
func TestConfigChange(t *testing.T) {
setUp := func() (*os.File, string, error) {
tempDir := os.TempDir()
file, err := ioutil.TempFile(tempDir, "kube-proxy-config-")
if err != nil {
return nil, "", fmt.Errorf("unexpected error when creating temp file: %v", err)
}
_, err = file.WriteString(`apiVersion: kubeproxy.config.k8s.io/v1alpha1
bindAddress: 0.0.0.0
clientConnection:
acceptContentTypes: ""
burst: 10
contentType: application/vnd.kubernetes.protobuf
kubeconfig: /var/lib/kube-proxy/kubeconfig.conf
qps: 5
clusterCIDR: 10.244.0.0/16
configSyncPeriod: 15m0s
conntrack:
max: null
maxPerCore: 32768
min: 131072
tcpCloseWaitTimeout: 1h0m0s
tcpEstablishedTimeout: 24h0m0s
enableProfiling: false
healthzBindAddress: 0.0.0.0:10256
hostnameOverride: ""
iptables:
masqueradeAll: false
masqueradeBit: 14
minSyncPeriod: 0s
syncPeriod: 30s
ipvs:
excludeCIDRs: null
minSyncPeriod: 0s
scheduler: ""
syncPeriod: 30s
kind: KubeProxyConfiguration
metricsBindAddress: 127.0.0.1:10249
mode: ""
nodePortAddresses: null
oomScoreAdj: -999
portRange: ""
resourceContainer: /kube-proxy
udpIdleTimeout: 250ms`)
if err != nil {
return nil, "", fmt.Errorf("unexpected error when writing content to temp kube-proxy config file: %v", err)
}
return file, tempDir, nil
}
tearDown := func(file *os.File, tempDir string) {
file.Close()
os.RemoveAll(tempDir)
}
testCases := []struct {
name string
proxyServer proxyRun
append bool
expectedErr string
}{
{
name: "update config file",
proxyServer: new(fakeProxyServerLongRun),
append: true,
expectedErr: "content of the proxy server's configuration file was updated",
},
{
name: "fake error",
proxyServer: new(fakeProxyServerError),
expectedErr: "mocking error from ProxyServer.Run()",
},
}
for _, tc := range testCases {
file, tempDir, err := setUp()
if err != nil {
t.Fatalf("unexpected error when setting up environment: %v", err)
}
opt := NewOptions()
opt.ConfigFile = file.Name()
err = opt.Complete()
if err != nil {
t.Fatal(err)
}
opt.proxyServer = tc.proxyServer
errCh := make(chan error)
go func() {
errCh <- opt.runLoop()
}()
if tc.append {
file.WriteString("append fake content")
}
select {
case err := <-errCh:
if err != nil {
if !strings.Contains(err.Error(), tc.expectedErr) {
t.Errorf("[%s] Expected error containing %v, got %v", tc.name, tc.expectedErr, err)
}
}
case <-time.After(10 * time.Second):
t.Errorf("[%s] Timeout: unable to get any events or internal timeout.", tc.name)
}
tearDown(file, tempDir)
}
}
type fakeProxyServerLongRun struct{}
// Run runs the specified ProxyServer.
func (s *fakeProxyServerLongRun) Run() error {
for {
time.Sleep(2 * time.Second)
}
}
type fakeProxyServerError struct{}
// Run runs the specified ProxyServer.
func (s *fakeProxyServerError) Run() error {
for {
time.Sleep(2 * time.Second)
return fmt.Errorf("mocking error from ProxyServer.Run()")
}
}