From 7b242533a217bd809e2c846c3e3fadf7bf6edee8 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 12 Sep 2018 10:59:01 +0200 Subject: [PATCH 1/2] apiserver: separate transport setting from storagebackend.Config --- .../app/options/options_test.go | 12 +++-- cmd/kube-apiserver/app/server.go | 4 +- .../test/integration/objectmeta_test.go | 8 +-- .../apiserver/pkg/server/options/etcd.go | 10 ++-- .../apiserver/pkg/server/options/etcd_test.go | 54 +++++++++++-------- .../pkg/server/storage/storage_factory.go | 12 ++--- .../server/storage/storage_factory_test.go | 16 +++--- .../pkg/storage/etcd/testing/utils.go | 10 ++-- .../pkg/storage/storagebackend/config.go | 21 +++++--- .../storage/storagebackend/factory/etcd3.go | 6 +-- .../storagebackend/factory/tls_test.go | 14 ++--- test/integration/examples/apiserver_test.go | 2 +- test/integration/examples/setup_test.go | 2 +- test/integration/framework/master_utils.go | 4 +- .../master/transformation_testcase.go | 2 +- test/integration/utils.go | 2 +- 16 files changed, 101 insertions(+), 78 deletions(-) diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index c3ef30e2c3..00ebd03a0a 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -141,12 +141,14 @@ func TestAddFlags(t *testing.T) { }, Etcd: &apiserveroptions.EtcdOptions{ StorageConfig: storagebackend.Config{ - Type: "etcd3", - ServerList: nil, + Type: "etcd3", + Transport: storagebackend.TransportConfig{ + ServerList: nil, + KeyFile: "/var/run/kubernetes/etcd.key", + CAFile: "/var/run/kubernetes/etcdca.crt", + CertFile: "/var/run/kubernetes/etcdce.crt", + }, Prefix: "/registry", - KeyFile: "/var/run/kubernetes/etcd.key", - CAFile: "/var/run/kubernetes/etcdca.crt", - CertFile: "/var/run/kubernetes/etcdce.crt", CompactionInterval: storagebackend.DefaultCompactInterval, CountMetricPollPeriod: time.Minute, }, diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 2b17c20d8b..cb88f75ed5 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -287,8 +287,8 @@ func CreateKubeAPIServerConfig( return } - if _, port, err := net.SplitHostPort(s.Etcd.StorageConfig.ServerList[0]); err == nil && port != "0" && len(port) != 0 { - if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.ServerList}.CheckEtcdServers); err != nil { + if _, port, err := net.SplitHostPort(s.Etcd.StorageConfig.Transport.ServerList[0]); err == nil && port != "0" && len(port) != 0 { + if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.Transport.ServerList}.CheckEtcdServers); err != nil { lastErr = fmt.Errorf("error waiting for etcd connection: %v", err) return } diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/objectmeta_test.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/objectmeta_test.go index 1ce637aa70..123543be8d 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/objectmeta_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/objectmeta_test.go @@ -110,16 +110,16 @@ func TestInvalidObjectMetaInStorage(t *testing.T) { t.Fatal(err) } tlsInfo := transport.TLSInfo{ - CertFile: restOptions.StorageConfig.CertFile, - KeyFile: restOptions.StorageConfig.KeyFile, - CAFile: restOptions.StorageConfig.CAFile, + CertFile: restOptions.StorageConfig.Transport.CertFile, + KeyFile: restOptions.StorageConfig.Transport.KeyFile, + CAFile: restOptions.StorageConfig.Transport.CAFile, } tlsConfig, err := tlsInfo.ClientConfig() if err != nil { t.Fatal(err) } etcdConfig := clientv3.Config{ - Endpoints: restOptions.StorageConfig.ServerList, + Endpoints: restOptions.StorageConfig.Transport.ServerList, TLS: tlsConfig, } etcdclient, err := clientv3.New(etcdConfig) diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index 26173eb72c..f1fe2c3b41 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -81,7 +81,7 @@ func (s *EtcdOptions) Validate() []error { } allErrors := []error{} - if len(s.StorageConfig.ServerList) == 0 { + if len(s.StorageConfig.Transport.ServerList) == 0 { allErrors = append(allErrors, fmt.Errorf("--etcd-servers must be specified")) } @@ -148,19 +148,19 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&dummyCacheSize, "deserialization-cache-size", 0, "Number of deserialized json objects to cache in memory.") fs.MarkDeprecated("deserialization-cache-size", "the deserialization cache was dropped in 1.13 with support for etcd2") - fs.StringSliceVar(&s.StorageConfig.ServerList, "etcd-servers", s.StorageConfig.ServerList, + fs.StringSliceVar(&s.StorageConfig.Transport.ServerList, "etcd-servers", s.StorageConfig.Transport.ServerList, "List of etcd servers to connect with (scheme://ip:port), comma separated.") fs.StringVar(&s.StorageConfig.Prefix, "etcd-prefix", s.StorageConfig.Prefix, "The prefix to prepend to all resource paths in etcd.") - fs.StringVar(&s.StorageConfig.KeyFile, "etcd-keyfile", s.StorageConfig.KeyFile, + fs.StringVar(&s.StorageConfig.Transport.KeyFile, "etcd-keyfile", s.StorageConfig.Transport.KeyFile, "SSL key file used to secure etcd communication.") - fs.StringVar(&s.StorageConfig.CertFile, "etcd-certfile", s.StorageConfig.CertFile, + fs.StringVar(&s.StorageConfig.Transport.CertFile, "etcd-certfile", s.StorageConfig.Transport.CertFile, "SSL certification file used to secure etcd communication.") - fs.StringVar(&s.StorageConfig.CAFile, "etcd-cafile", s.StorageConfig.CAFile, + fs.StringVar(&s.StorageConfig.Transport.CAFile, "etcd-cafile", s.StorageConfig.Transport.CAFile, "SSL Certificate Authority file used to secure etcd communication.") fs.StringVar(&s.EncryptionProviderConfigFilepath, "experimental-encryption-provider-config", s.EncryptionProviderConfigFilepath, diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go index 7f772f5b39..5600d5438a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go @@ -36,12 +36,14 @@ func TestEtcdOptionsValidate(t *testing.T) { name: "test when ServerList is not specified", testOptions: &EtcdOptions{ StorageConfig: storagebackend.Config{ - Type: "etcd3", - ServerList: nil, - Prefix: "/registry", - KeyFile: "/var/run/kubernetes/etcd.key", - CAFile: "/var/run/kubernetes/etcdca.crt", - CertFile: "/var/run/kubernetes/etcdce.crt", + Type: "etcd3", + Prefix: "/registry", + Transport: storagebackend.TransportConfig{ + ServerList: nil, + KeyFile: "/var/run/kubernetes/etcd.key", + CAFile: "/var/run/kubernetes/etcdca.crt", + CertFile: "/var/run/kubernetes/etcdce.crt", + }, CompactionInterval: storagebackend.DefaultCompactInterval, CountMetricPollPeriod: time.Minute, }, @@ -58,12 +60,14 @@ func TestEtcdOptionsValidate(t *testing.T) { name: "test when storage-backend is invalid", testOptions: &EtcdOptions{ StorageConfig: storagebackend.Config{ - Type: "etcd4", - ServerList: []string{"http://127.0.0.1"}, - Prefix: "/registry", - KeyFile: "/var/run/kubernetes/etcd.key", - CAFile: "/var/run/kubernetes/etcdca.crt", - CertFile: "/var/run/kubernetes/etcdce.crt", + Type: "etcd4", + Prefix: "/registry", + Transport: storagebackend.TransportConfig{ + ServerList: []string{"http://127.0.0.1"}, + KeyFile: "/var/run/kubernetes/etcd.key", + CAFile: "/var/run/kubernetes/etcdca.crt", + CertFile: "/var/run/kubernetes/etcdce.crt", + }, CompactionInterval: storagebackend.DefaultCompactInterval, CountMetricPollPeriod: time.Minute, }, @@ -80,12 +84,14 @@ func TestEtcdOptionsValidate(t *testing.T) { name: "test when etcd-servers-overrides is invalid", testOptions: &EtcdOptions{ StorageConfig: storagebackend.Config{ - Type: "etcd3", - ServerList: []string{"http://127.0.0.1"}, + Type: "etcd3", + Transport: storagebackend.TransportConfig{ + ServerList: []string{"http://127.0.0.1"}, + KeyFile: "/var/run/kubernetes/etcd.key", + CAFile: "/var/run/kubernetes/etcdca.crt", + CertFile: "/var/run/kubernetes/etcdce.crt", + }, Prefix: "/registry", - KeyFile: "/var/run/kubernetes/etcd.key", - CAFile: "/var/run/kubernetes/etcdca.crt", - CertFile: "/var/run/kubernetes/etcdce.crt", CompactionInterval: storagebackend.DefaultCompactInterval, CountMetricPollPeriod: time.Minute, }, @@ -102,12 +108,14 @@ func TestEtcdOptionsValidate(t *testing.T) { name: "test when EtcdOptions is valid", testOptions: &EtcdOptions{ StorageConfig: storagebackend.Config{ - Type: "etcd3", - ServerList: []string{"http://127.0.0.1"}, - Prefix: "/registry", - KeyFile: "/var/run/kubernetes/etcd.key", - CAFile: "/var/run/kubernetes/etcdca.crt", - CertFile: "/var/run/kubernetes/etcdce.crt", + Type: "etcd3", + Prefix: "/registry", + Transport: storagebackend.TransportConfig{ + ServerList: []string{"http://127.0.0.1"}, + KeyFile: "/var/run/kubernetes/etcd.key", + CAFile: "/var/run/kubernetes/etcdca.crt", + CertFile: "/var/run/kubernetes/etcdce.crt", + }, CompactionInterval: storagebackend.DefaultCompactInterval, CountMetricPollPeriod: time.Minute, }, diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go index a87ce4a5ef..c3bb6ecd6d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go @@ -121,7 +121,7 @@ type groupResourceOverrides struct { // Apply overrides the provided config and options if the override has a value in that position func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *StorageCodecConfig) { if len(o.etcdLocation) > 0 { - config.ServerList = o.etcdLocation + config.Transport.ServerList = o.etcdLocation } if len(o.etcdPrefix) > 0 { config.Prefix = o.etcdPrefix @@ -290,7 +290,7 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (* // Backends returns all backends for all registered storage destinations. // Used for getting all instances for health validations. func (s *DefaultStorageFactory) Backends() []Backend { - servers := sets.NewString(s.StorageConfig.ServerList...) + servers := sets.NewString(s.StorageConfig.Transport.ServerList...) for _, overrides := range s.Overrides { servers.Insert(overrides.etcdLocation...) @@ -299,16 +299,16 @@ func (s *DefaultStorageFactory) Backends() []Backend { tlsConfig := &tls.Config{ InsecureSkipVerify: true, } - if len(s.StorageConfig.CertFile) > 0 && len(s.StorageConfig.KeyFile) > 0 { - cert, err := tls.LoadX509KeyPair(s.StorageConfig.CertFile, s.StorageConfig.KeyFile) + if len(s.StorageConfig.Transport.CertFile) > 0 && len(s.StorageConfig.Transport.KeyFile) > 0 { + cert, err := tls.LoadX509KeyPair(s.StorageConfig.Transport.CertFile, s.StorageConfig.Transport.KeyFile) if err != nil { klog.Errorf("failed to load key pair while getting backends: %s", err) } else { tlsConfig.Certificates = []tls.Certificate{cert} } } - if len(s.StorageConfig.CAFile) > 0 { - if caCert, err := ioutil.ReadFile(s.StorageConfig.CAFile); err != nil { + if len(s.StorageConfig.Transport.CAFile) > 0 { + if caCert, err := ioutil.ReadFile(s.StorageConfig.Transport.CAFile); err != nil { klog.Errorf("failed to read ca file while getting backends: %s", err) } else { caPool := x509.NewCertPool() diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory_test.go b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory_test.go index 197ff6b795..3c8c1c2d3c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory_test.go @@ -104,7 +104,7 @@ func TestConfigurableStorageFactory(t *testing.T) { if err != nil { t.Fatal(err) } - if config.Prefix != "/prefix_for_test" || !reflect.DeepEqual(config.ServerList, []string{"/server2"}) { + if config.Prefix != "/prefix_for_test" || !reflect.DeepEqual(config.Transport.ServerList, []string{"/server2"}) { t.Errorf("unexpected config %#v", config) } if !called { @@ -136,8 +136,10 @@ func TestUpdateEtcdOverrides(t *testing.T) { defaultEtcdLocation := []string{"http://127.0.0.1"} for i, test := range testCases { defaultConfig := storagebackend.Config{ - Prefix: "/registry", - ServerList: defaultEtcdLocation, + Prefix: "/registry", + Transport: storagebackend.TransportConfig{ + ServerList: defaultEtcdLocation, + }, } storageFactory := NewDefaultStorageFactory(defaultConfig, "", codecs, NewDefaultResourceEncodingConfig(scheme), NewResourceConfig(), nil) storageFactory.SetEtcdLocation(test.resource, test.servers) @@ -148,8 +150,8 @@ func TestUpdateEtcdOverrides(t *testing.T) { t.Errorf("%d: unexpected error %v", i, err) continue } - if !reflect.DeepEqual(config.ServerList, test.servers) { - t.Errorf("%d: expected %v, got %v", i, test.servers, config.ServerList) + if !reflect.DeepEqual(config.Transport.ServerList, test.servers) { + t.Errorf("%d: expected %v, got %v", i, test.servers, config.Transport.ServerList) continue } @@ -158,8 +160,8 @@ func TestUpdateEtcdOverrides(t *testing.T) { t.Errorf("%d: unexpected error %v", i, err) continue } - if !reflect.DeepEqual(config.ServerList, defaultEtcdLocation) { - t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, config.ServerList) + if !reflect.DeepEqual(config.Transport.ServerList, defaultEtcdLocation) { + t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, config.Transport.ServerList) continue } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/testing/utils.go index 493abaa2fe..77a6db4525 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/testing/utils.go @@ -293,10 +293,12 @@ func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storageb } server.V3Client = server.v3Cluster.RandClient() config := &storagebackend.Config{ - Type: "etcd3", - Prefix: etcdtest.PathPrefix(), - ServerList: server.V3Client.Endpoints(), - Paging: true, + Type: "etcd3", + Prefix: etcdtest.PathPrefix(), + Transport: storagebackend.TransportConfig{ + ServerList: server.V3Client.Endpoints(), + }, + Paging: true, } return server, config } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go index f18ac76dd9..c36a103942 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go @@ -30,18 +30,26 @@ const ( DefaultCompactInterval = 5 * time.Minute ) -// Config is configuration for creating a storage backend. -type Config struct { - // Type defines the type of storage backend. Default ("") is "etcd3". - Type string - // Prefix is the prefix to all keys passed to storage.Interface methods. - Prefix string +// TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to. +type TransportConfig struct { // ServerList is the list of storage servers to connect with. ServerList []string // TLS credentials KeyFile string CertFile string CAFile string +} + +// Config is configuration for creating a storage backend. +type Config struct { + // Type defines the type of storage backend. Default ("") is "etcd3". + Type string + // Prefix is the prefix to all keys passed to storage.Interface methods. + Prefix string + // Transport holds all connection related info, i.e. equal TransportConfig means equal servers we talk to. + Transport TransportConfig + // Quorum indicates that whether read operations should be quorum-level consistent. + Quorum bool // Paging indicates whether the server implementation should allow paging (if it is // supported). This is generally configured by feature gating, or by a specific // resource type not wishing to allow paging, and is not intended for end users to @@ -55,7 +63,6 @@ type Config struct { // CompactionInterval is an interval of requesting compaction from apiserver. // If the value is 0, no compaction will be issued. CompactionInterval time.Duration - // CountMetricPollPeriod specifies how often should count metric be updated CountMetricPollPeriod time.Duration } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index e18fe9acde..a41f09de1f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -54,7 +54,7 @@ func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) { clientErrMsg.Store("etcd client connection not yet established") go wait.PollUntil(time.Second, func() (bool, error) { - client, err := newETCD3Client(c) + client, err := newETCD3Client(c.Transport) if err != nil { clientErrMsg.Store(err.Error()) return false, nil @@ -78,7 +78,7 @@ func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) { }, nil } -func newETCD3Client(c storagebackend.Config) (*clientv3.Client, error) { +func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error) { tlsInfo := transport.TLSInfo{ CertFile: c.CertFile, KeyFile: c.KeyFile, @@ -109,7 +109,7 @@ func newETCD3Client(c storagebackend.Config) (*clientv3.Client, error) { } func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { - client, err := newETCD3Client(c) + client, err := newETCD3Client(c.Transport) if err != nil { return nil, nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go index 9286c7bc2c..0646de89bf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go @@ -66,12 +66,14 @@ func TestTLSConnection(t *testing.T) { defer cluster.Terminate(t) cfg := storagebackend.Config{ - Type: storagebackend.StorageTypeETCD3, - ServerList: []string{cluster.Members[0].GRPCAddr()}, - CertFile: certFile, - KeyFile: keyFile, - CAFile: caFile, - Codec: codec, + Type: storagebackend.StorageTypeETCD3, + Transport: storagebackend.TransportConfig{ + ServerList: []string{cluster.Members[0].GRPCAddr()}, + CertFile: certFile, + KeyFile: keyFile, + CAFile: caFile, + }, + Codec: codec, } storage, destroyFunc, err := newETCD3Storage(cfg) defer destroyFunc() diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 994a304bf2..e5119fd6be 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -98,7 +98,7 @@ func TestAggregatedAPIServer(t *testing.T) { kubeAPIServerOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1") kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir kubeAPIServerOptions.InsecureServing.BindPort = 0 - kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()} + kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()} kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"} kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"} diff --git a/test/integration/examples/setup_test.go b/test/integration/examples/setup_test.go index be388126ed..3c9f2becb7 100644 --- a/test/integration/examples/setup_test.go +++ b/test/integration/examples/setup_test.go @@ -89,7 +89,7 @@ func startTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir kubeAPIServerOptions.InsecureServing.BindPort = 0 kubeAPIServerOptions.Etcd.StorageConfig.Prefix = path.Join("/", uuid.New(), "registry") - kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()} + kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()} kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"} kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"} diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index d5512120b5..0515994b3e 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -242,7 +242,7 @@ func NewMasterConfig() *master.Config { // prefix code, so please don't change without ensuring // sufficient coverage in other ways. etcdOptions := options.NewEtcdOptions(storagebackend.NewDefaultConfig(uuid.New(), nil)) - etcdOptions.StorageConfig.ServerList = []string{GetEtcdURL()} + etcdOptions.StorageConfig.Transport.ServerList = []string{GetEtcdURL()} info, _ := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON) ns := NewSingleContentTypeSerializer(legacyscheme.Scheme, info) @@ -341,7 +341,7 @@ func RunAMasterUsingServer(masterConfig *master.Config, s *httptest.Server, mast // SharedEtcd creates a storage config for a shared etcd instance, with a unique prefix. func SharedEtcd() *storagebackend.Config { cfg := storagebackend.NewDefaultConfig(path.Join(uuid.New(), "registry"), nil) - cfg.ServerList = []string{GetEtcdURL()} + cfg.Transport.ServerList = []string{GetEtcdURL()} return cfg } diff --git a/test/integration/master/transformation_testcase.go b/test/integration/master/transformation_testcase.go index 3778b514cf..50d5c1ab5b 100644 --- a/test/integration/master/transformation_testcase.go +++ b/test/integration/master/transformation_testcase.go @@ -228,7 +228,7 @@ func (e *transformTest) createSecret(name, namespace string) (*corev1.Secret, er } func (e *transformTest) readRawRecordFromETCD(path string) (*clientv3.GetResponse, error) { - _, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig) + _, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig.Transport) if err != nil { return nil, fmt.Errorf("failed to create etcd client: %v", err) } diff --git a/test/integration/utils.go b/test/integration/utils.go index 5eb72c8655..533948c632 100644 --- a/test/integration/utils.go +++ b/test/integration/utils.go @@ -67,7 +67,7 @@ func WaitForPodToDisappear(podClient coreclient.PodInterface, podName string, in }) } -func GetEtcdClients(config storagebackend.Config) (*clientv3.Client, clientv3.KV, error) { +func GetEtcdClients(config storagebackend.TransportConfig) (*clientv3.Client, clientv3.KV, error) { tlsInfo := transport.TLSInfo{ CertFile: config.CertFile, KeyFile: config.KeyFile, From 00a717b572f3582d0d20633644e827dd60991dce Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 12 Sep 2018 11:54:14 +0200 Subject: [PATCH 2/2] apiserver: start only one compactor per unique storage transport config --- .../storage/storagebackend/factory/etcd3.go | 83 +++++++++++++++++-- test/integration/etcd/server.go | 4 +- 2 files changed, 80 insertions(+), 7 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index a41f09de1f..6bac742438 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -19,6 +19,7 @@ package factory import ( "context" "fmt" + "sync" "sync/atomic" "time" @@ -108,16 +109,88 @@ func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error) return clientv3.New(cfg) } +type runningCompactor struct { + interval time.Duration + cancel context.CancelFunc + client *clientv3.Client + refs int +} + +var ( + lock sync.Mutex + compactors = map[string]*runningCompactor{} +) + +// startCompactorOnce start one compactor per transport. If the interval get smaller on repeated calls, the +// compactor is replaced. A destroy func is returned. If all destroy funcs with the same transport are called, +// the compactor is stopped. +func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration) (func(), error) { + lock.Lock() + defer lock.Unlock() + + key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile} + if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval { + compactorClient, err := newETCD3Client(c) + if err != nil { + return nil, err + } + + if foundBefore { + // replace compactor + compactor.cancel() + compactor.client.Close() + } else { + // start new compactor + compactor = &runningCompactor{} + compactors[key] = compactor + } + + ctx, cancel := context.WithCancel(context.Background()) + + compactor.interval = interval + compactor.cancel = cancel + compactor.client = compactorClient + + etcd3.StartCompactor(ctx, compactorClient, interval) + } + + compactors[key].refs++ + + return func() { + lock.Lock() + defer lock.Unlock() + + compactor := compactors[key] + compactor.refs-- + if compactor.refs == 0 { + compactor.cancel() + compactor.client.Close() + delete(compactors, key) + } + }, nil +} + func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { - client, err := newETCD3Client(c.Transport) + stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval) if err != nil { return nil, nil, err } - ctx, cancel := context.WithCancel(context.Background()) - etcd3.StartCompactor(ctx, client, c.CompactionInterval) + + client, err := newETCD3Client(c.Transport) + if err != nil { + stopCompactor() + return nil, nil, err + } + + var once sync.Once destroyFunc := func() { - cancel() - client.Close() + // we know that storage destroy funcs are called multiple times (due to reuse in subresources). + // Hence, we only destroy once. + // TODO: fix duplicated storage destroy calls higher level + once.Do(func() { + stopCompactor() + client.Close() + }) } transformer := c.Transformer if transformer == nil { diff --git a/test/integration/etcd/server.go b/test/integration/etcd/server.go index d86534ba29..8c43361801 100644 --- a/test/integration/etcd/server.go +++ b/test/integration/etcd/server.go @@ -74,7 +74,7 @@ func StartRealMasterOrDie(t *testing.T) *Master { kubeAPIServerOptions.InsecureServing.BindPort = 0 kubeAPIServerOptions.SecureServing.Listener = listener kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir - kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()} + kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()} kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // force json we can easily interpret the result in etcd kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"} @@ -88,7 +88,7 @@ func StartRealMasterOrDie(t *testing.T) *Master { } // get etcd client before starting API server - rawClient, kvClient, err := integration.GetEtcdClients(completedOptions.Etcd.StorageConfig) + rawClient, kvClient, err := integration.GetEtcdClients(completedOptions.Etcd.StorageConfig.Transport) if err != nil { t.Fatal(err) }