Merge pull request #68557 from sttts/sttts-storage-compaction-once

apiserver: start only one compactor per unique storagebackend transport config
pull/564/head
Kubernetes Prow Robot 2019-01-03 14:37:39 -08:00 committed by GitHub
commit 73bca32cf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 180 additions and 84 deletions

View File

@ -141,12 +141,14 @@ func TestAddFlags(t *testing.T) {
}, },
Etcd: &apiserveroptions.EtcdOptions{ Etcd: &apiserveroptions.EtcdOptions{
StorageConfig: storagebackend.Config{ StorageConfig: storagebackend.Config{
Type: "etcd3", Type: "etcd3",
ServerList: nil, Transport: storagebackend.TransportConfig{
ServerList: nil,
KeyFile: "/var/run/kubernetes/etcd.key",
CAFile: "/var/run/kubernetes/etcdca.crt",
CertFile: "/var/run/kubernetes/etcdce.crt",
},
Prefix: "/registry", Prefix: "/registry",
KeyFile: "/var/run/kubernetes/etcd.key",
CAFile: "/var/run/kubernetes/etcdca.crt",
CertFile: "/var/run/kubernetes/etcdce.crt",
CompactionInterval: storagebackend.DefaultCompactInterval, CompactionInterval: storagebackend.DefaultCompactInterval,
CountMetricPollPeriod: time.Minute, CountMetricPollPeriod: time.Minute,
}, },

View File

@ -287,8 +287,8 @@ func CreateKubeAPIServerConfig(
return return
} }
if _, port, err := net.SplitHostPort(s.Etcd.StorageConfig.ServerList[0]); err == nil && port != "0" && len(port) != 0 { if _, port, err := net.SplitHostPort(s.Etcd.StorageConfig.Transport.ServerList[0]); err == nil && port != "0" && len(port) != 0 {
if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.ServerList}.CheckEtcdServers); err != nil { if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.Transport.ServerList}.CheckEtcdServers); err != nil {
lastErr = fmt.Errorf("error waiting for etcd connection: %v", err) lastErr = fmt.Errorf("error waiting for etcd connection: %v", err)
return return
} }

View File

@ -110,16 +110,16 @@ func TestInvalidObjectMetaInStorage(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
tlsInfo := transport.TLSInfo{ tlsInfo := transport.TLSInfo{
CertFile: restOptions.StorageConfig.CertFile, CertFile: restOptions.StorageConfig.Transport.CertFile,
KeyFile: restOptions.StorageConfig.KeyFile, KeyFile: restOptions.StorageConfig.Transport.KeyFile,
CAFile: restOptions.StorageConfig.CAFile, CAFile: restOptions.StorageConfig.Transport.CAFile,
} }
tlsConfig, err := tlsInfo.ClientConfig() tlsConfig, err := tlsInfo.ClientConfig()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
etcdConfig := clientv3.Config{ etcdConfig := clientv3.Config{
Endpoints: restOptions.StorageConfig.ServerList, Endpoints: restOptions.StorageConfig.Transport.ServerList,
TLS: tlsConfig, TLS: tlsConfig,
} }
etcdclient, err := clientv3.New(etcdConfig) etcdclient, err := clientv3.New(etcdConfig)

View File

@ -81,7 +81,7 @@ func (s *EtcdOptions) Validate() []error {
} }
allErrors := []error{} allErrors := []error{}
if len(s.StorageConfig.ServerList) == 0 { if len(s.StorageConfig.Transport.ServerList) == 0 {
allErrors = append(allErrors, fmt.Errorf("--etcd-servers must be specified")) allErrors = append(allErrors, fmt.Errorf("--etcd-servers must be specified"))
} }
@ -148,19 +148,19 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&dummyCacheSize, "deserialization-cache-size", 0, "Number of deserialized json objects to cache in memory.") fs.IntVar(&dummyCacheSize, "deserialization-cache-size", 0, "Number of deserialized json objects to cache in memory.")
fs.MarkDeprecated("deserialization-cache-size", "the deserialization cache was dropped in 1.13 with support for etcd2") fs.MarkDeprecated("deserialization-cache-size", "the deserialization cache was dropped in 1.13 with support for etcd2")
fs.StringSliceVar(&s.StorageConfig.ServerList, "etcd-servers", s.StorageConfig.ServerList, fs.StringSliceVar(&s.StorageConfig.Transport.ServerList, "etcd-servers", s.StorageConfig.Transport.ServerList,
"List of etcd servers to connect with (scheme://ip:port), comma separated.") "List of etcd servers to connect with (scheme://ip:port), comma separated.")
fs.StringVar(&s.StorageConfig.Prefix, "etcd-prefix", s.StorageConfig.Prefix, fs.StringVar(&s.StorageConfig.Prefix, "etcd-prefix", s.StorageConfig.Prefix,
"The prefix to prepend to all resource paths in etcd.") "The prefix to prepend to all resource paths in etcd.")
fs.StringVar(&s.StorageConfig.KeyFile, "etcd-keyfile", s.StorageConfig.KeyFile, fs.StringVar(&s.StorageConfig.Transport.KeyFile, "etcd-keyfile", s.StorageConfig.Transport.KeyFile,
"SSL key file used to secure etcd communication.") "SSL key file used to secure etcd communication.")
fs.StringVar(&s.StorageConfig.CertFile, "etcd-certfile", s.StorageConfig.CertFile, fs.StringVar(&s.StorageConfig.Transport.CertFile, "etcd-certfile", s.StorageConfig.Transport.CertFile,
"SSL certification file used to secure etcd communication.") "SSL certification file used to secure etcd communication.")
fs.StringVar(&s.StorageConfig.CAFile, "etcd-cafile", s.StorageConfig.CAFile, fs.StringVar(&s.StorageConfig.Transport.CAFile, "etcd-cafile", s.StorageConfig.Transport.CAFile,
"SSL Certificate Authority file used to secure etcd communication.") "SSL Certificate Authority file used to secure etcd communication.")
fs.StringVar(&s.EncryptionProviderConfigFilepath, "experimental-encryption-provider-config", s.EncryptionProviderConfigFilepath, fs.StringVar(&s.EncryptionProviderConfigFilepath, "experimental-encryption-provider-config", s.EncryptionProviderConfigFilepath,

View File

@ -36,12 +36,14 @@ func TestEtcdOptionsValidate(t *testing.T) {
name: "test when ServerList is not specified", name: "test when ServerList is not specified",
testOptions: &EtcdOptions{ testOptions: &EtcdOptions{
StorageConfig: storagebackend.Config{ StorageConfig: storagebackend.Config{
Type: "etcd3", Type: "etcd3",
ServerList: nil, Prefix: "/registry",
Prefix: "/registry", Transport: storagebackend.TransportConfig{
KeyFile: "/var/run/kubernetes/etcd.key", ServerList: nil,
CAFile: "/var/run/kubernetes/etcdca.crt", KeyFile: "/var/run/kubernetes/etcd.key",
CertFile: "/var/run/kubernetes/etcdce.crt", CAFile: "/var/run/kubernetes/etcdca.crt",
CertFile: "/var/run/kubernetes/etcdce.crt",
},
CompactionInterval: storagebackend.DefaultCompactInterval, CompactionInterval: storagebackend.DefaultCompactInterval,
CountMetricPollPeriod: time.Minute, CountMetricPollPeriod: time.Minute,
}, },
@ -58,12 +60,14 @@ func TestEtcdOptionsValidate(t *testing.T) {
name: "test when storage-backend is invalid", name: "test when storage-backend is invalid",
testOptions: &EtcdOptions{ testOptions: &EtcdOptions{
StorageConfig: storagebackend.Config{ StorageConfig: storagebackend.Config{
Type: "etcd4", Type: "etcd4",
ServerList: []string{"http://127.0.0.1"}, Prefix: "/registry",
Prefix: "/registry", Transport: storagebackend.TransportConfig{
KeyFile: "/var/run/kubernetes/etcd.key", ServerList: []string{"http://127.0.0.1"},
CAFile: "/var/run/kubernetes/etcdca.crt", KeyFile: "/var/run/kubernetes/etcd.key",
CertFile: "/var/run/kubernetes/etcdce.crt", CAFile: "/var/run/kubernetes/etcdca.crt",
CertFile: "/var/run/kubernetes/etcdce.crt",
},
CompactionInterval: storagebackend.DefaultCompactInterval, CompactionInterval: storagebackend.DefaultCompactInterval,
CountMetricPollPeriod: time.Minute, CountMetricPollPeriod: time.Minute,
}, },
@ -80,12 +84,14 @@ func TestEtcdOptionsValidate(t *testing.T) {
name: "test when etcd-servers-overrides is invalid", name: "test when etcd-servers-overrides is invalid",
testOptions: &EtcdOptions{ testOptions: &EtcdOptions{
StorageConfig: storagebackend.Config{ StorageConfig: storagebackend.Config{
Type: "etcd3", Type: "etcd3",
ServerList: []string{"http://127.0.0.1"}, Transport: storagebackend.TransportConfig{
ServerList: []string{"http://127.0.0.1"},
KeyFile: "/var/run/kubernetes/etcd.key",
CAFile: "/var/run/kubernetes/etcdca.crt",
CertFile: "/var/run/kubernetes/etcdce.crt",
},
Prefix: "/registry", Prefix: "/registry",
KeyFile: "/var/run/kubernetes/etcd.key",
CAFile: "/var/run/kubernetes/etcdca.crt",
CertFile: "/var/run/kubernetes/etcdce.crt",
CompactionInterval: storagebackend.DefaultCompactInterval, CompactionInterval: storagebackend.DefaultCompactInterval,
CountMetricPollPeriod: time.Minute, CountMetricPollPeriod: time.Minute,
}, },
@ -102,12 +108,14 @@ func TestEtcdOptionsValidate(t *testing.T) {
name: "test when EtcdOptions is valid", name: "test when EtcdOptions is valid",
testOptions: &EtcdOptions{ testOptions: &EtcdOptions{
StorageConfig: storagebackend.Config{ StorageConfig: storagebackend.Config{
Type: "etcd3", Type: "etcd3",
ServerList: []string{"http://127.0.0.1"}, Prefix: "/registry",
Prefix: "/registry", Transport: storagebackend.TransportConfig{
KeyFile: "/var/run/kubernetes/etcd.key", ServerList: []string{"http://127.0.0.1"},
CAFile: "/var/run/kubernetes/etcdca.crt", KeyFile: "/var/run/kubernetes/etcd.key",
CertFile: "/var/run/kubernetes/etcdce.crt", CAFile: "/var/run/kubernetes/etcdca.crt",
CertFile: "/var/run/kubernetes/etcdce.crt",
},
CompactionInterval: storagebackend.DefaultCompactInterval, CompactionInterval: storagebackend.DefaultCompactInterval,
CountMetricPollPeriod: time.Minute, CountMetricPollPeriod: time.Minute,
}, },

View File

@ -121,7 +121,7 @@ type groupResourceOverrides struct {
// Apply overrides the provided config and options if the override has a value in that position // Apply overrides the provided config and options if the override has a value in that position
func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *StorageCodecConfig) { func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *StorageCodecConfig) {
if len(o.etcdLocation) > 0 { if len(o.etcdLocation) > 0 {
config.ServerList = o.etcdLocation config.Transport.ServerList = o.etcdLocation
} }
if len(o.etcdPrefix) > 0 { if len(o.etcdPrefix) > 0 {
config.Prefix = o.etcdPrefix config.Prefix = o.etcdPrefix
@ -290,7 +290,7 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*
// Backends returns all backends for all registered storage destinations. // Backends returns all backends for all registered storage destinations.
// Used for getting all instances for health validations. // Used for getting all instances for health validations.
func (s *DefaultStorageFactory) Backends() []Backend { func (s *DefaultStorageFactory) Backends() []Backend {
servers := sets.NewString(s.StorageConfig.ServerList...) servers := sets.NewString(s.StorageConfig.Transport.ServerList...)
for _, overrides := range s.Overrides { for _, overrides := range s.Overrides {
servers.Insert(overrides.etcdLocation...) servers.Insert(overrides.etcdLocation...)
@ -299,16 +299,16 @@ func (s *DefaultStorageFactory) Backends() []Backend {
tlsConfig := &tls.Config{ tlsConfig := &tls.Config{
InsecureSkipVerify: true, InsecureSkipVerify: true,
} }
if len(s.StorageConfig.CertFile) > 0 && len(s.StorageConfig.KeyFile) > 0 { if len(s.StorageConfig.Transport.CertFile) > 0 && len(s.StorageConfig.Transport.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(s.StorageConfig.CertFile, s.StorageConfig.KeyFile) cert, err := tls.LoadX509KeyPair(s.StorageConfig.Transport.CertFile, s.StorageConfig.Transport.KeyFile)
if err != nil { if err != nil {
klog.Errorf("failed to load key pair while getting backends: %s", err) klog.Errorf("failed to load key pair while getting backends: %s", err)
} else { } else {
tlsConfig.Certificates = []tls.Certificate{cert} tlsConfig.Certificates = []tls.Certificate{cert}
} }
} }
if len(s.StorageConfig.CAFile) > 0 { if len(s.StorageConfig.Transport.CAFile) > 0 {
if caCert, err := ioutil.ReadFile(s.StorageConfig.CAFile); err != nil { if caCert, err := ioutil.ReadFile(s.StorageConfig.Transport.CAFile); err != nil {
klog.Errorf("failed to read ca file while getting backends: %s", err) klog.Errorf("failed to read ca file while getting backends: %s", err)
} else { } else {
caPool := x509.NewCertPool() caPool := x509.NewCertPool()

View File

@ -104,7 +104,7 @@ func TestConfigurableStorageFactory(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if config.Prefix != "/prefix_for_test" || !reflect.DeepEqual(config.ServerList, []string{"/server2"}) { if config.Prefix != "/prefix_for_test" || !reflect.DeepEqual(config.Transport.ServerList, []string{"/server2"}) {
t.Errorf("unexpected config %#v", config) t.Errorf("unexpected config %#v", config)
} }
if !called { if !called {
@ -136,8 +136,10 @@ func TestUpdateEtcdOverrides(t *testing.T) {
defaultEtcdLocation := []string{"http://127.0.0.1"} defaultEtcdLocation := []string{"http://127.0.0.1"}
for i, test := range testCases { for i, test := range testCases {
defaultConfig := storagebackend.Config{ defaultConfig := storagebackend.Config{
Prefix: "/registry", Prefix: "/registry",
ServerList: defaultEtcdLocation, Transport: storagebackend.TransportConfig{
ServerList: defaultEtcdLocation,
},
} }
storageFactory := NewDefaultStorageFactory(defaultConfig, "", codecs, NewDefaultResourceEncodingConfig(scheme), NewResourceConfig(), nil) storageFactory := NewDefaultStorageFactory(defaultConfig, "", codecs, NewDefaultResourceEncodingConfig(scheme), NewResourceConfig(), nil)
storageFactory.SetEtcdLocation(test.resource, test.servers) storageFactory.SetEtcdLocation(test.resource, test.servers)
@ -148,8 +150,8 @@ func TestUpdateEtcdOverrides(t *testing.T) {
t.Errorf("%d: unexpected error %v", i, err) t.Errorf("%d: unexpected error %v", i, err)
continue continue
} }
if !reflect.DeepEqual(config.ServerList, test.servers) { if !reflect.DeepEqual(config.Transport.ServerList, test.servers) {
t.Errorf("%d: expected %v, got %v", i, test.servers, config.ServerList) t.Errorf("%d: expected %v, got %v", i, test.servers, config.Transport.ServerList)
continue continue
} }
@ -158,8 +160,8 @@ func TestUpdateEtcdOverrides(t *testing.T) {
t.Errorf("%d: unexpected error %v", i, err) t.Errorf("%d: unexpected error %v", i, err)
continue continue
} }
if !reflect.DeepEqual(config.ServerList, defaultEtcdLocation) { if !reflect.DeepEqual(config.Transport.ServerList, defaultEtcdLocation) {
t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, config.ServerList) t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, config.Transport.ServerList)
continue continue
} }

View File

@ -293,10 +293,12 @@ func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storageb
} }
server.V3Client = server.v3Cluster.RandClient() server.V3Client = server.v3Cluster.RandClient()
config := &storagebackend.Config{ config := &storagebackend.Config{
Type: "etcd3", Type: "etcd3",
Prefix: etcdtest.PathPrefix(), Prefix: etcdtest.PathPrefix(),
ServerList: server.V3Client.Endpoints(), Transport: storagebackend.TransportConfig{
Paging: true, ServerList: server.V3Client.Endpoints(),
},
Paging: true,
} }
return server, config return server, config
} }

View File

@ -30,18 +30,26 @@ const (
DefaultCompactInterval = 5 * time.Minute DefaultCompactInterval = 5 * time.Minute
) )
// Config is configuration for creating a storage backend. // TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to.
type Config struct { type TransportConfig struct {
// Type defines the type of storage backend. Default ("") is "etcd3".
Type string
// Prefix is the prefix to all keys passed to storage.Interface methods.
Prefix string
// ServerList is the list of storage servers to connect with. // ServerList is the list of storage servers to connect with.
ServerList []string ServerList []string
// TLS credentials // TLS credentials
KeyFile string KeyFile string
CertFile string CertFile string
CAFile string CAFile string
}
// Config is configuration for creating a storage backend.
type Config struct {
// Type defines the type of storage backend. Default ("") is "etcd3".
Type string
// Prefix is the prefix to all keys passed to storage.Interface methods.
Prefix string
// Transport holds all connection related info, i.e. equal TransportConfig means equal servers we talk to.
Transport TransportConfig
// Quorum indicates that whether read operations should be quorum-level consistent.
Quorum bool
// Paging indicates whether the server implementation should allow paging (if it is // Paging indicates whether the server implementation should allow paging (if it is
// supported). This is generally configured by feature gating, or by a specific // supported). This is generally configured by feature gating, or by a specific
// resource type not wishing to allow paging, and is not intended for end users to // resource type not wishing to allow paging, and is not intended for end users to
@ -55,7 +63,6 @@ type Config struct {
// CompactionInterval is an interval of requesting compaction from apiserver. // CompactionInterval is an interval of requesting compaction from apiserver.
// If the value is 0, no compaction will be issued. // If the value is 0, no compaction will be issued.
CompactionInterval time.Duration CompactionInterval time.Duration
// CountMetricPollPeriod specifies how often should count metric be updated // CountMetricPollPeriod specifies how often should count metric be updated
CountMetricPollPeriod time.Duration CountMetricPollPeriod time.Duration
} }

View File

@ -19,6 +19,7 @@ package factory
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -54,7 +55,7 @@ func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) {
clientErrMsg.Store("etcd client connection not yet established") clientErrMsg.Store("etcd client connection not yet established")
go wait.PollUntil(time.Second, func() (bool, error) { go wait.PollUntil(time.Second, func() (bool, error) {
client, err := newETCD3Client(c) client, err := newETCD3Client(c.Transport)
if err != nil { if err != nil {
clientErrMsg.Store(err.Error()) clientErrMsg.Store(err.Error())
return false, nil return false, nil
@ -78,7 +79,7 @@ func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) {
}, nil }, nil
} }
func newETCD3Client(c storagebackend.Config) (*clientv3.Client, error) { func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error) {
tlsInfo := transport.TLSInfo{ tlsInfo := transport.TLSInfo{
CertFile: c.CertFile, CertFile: c.CertFile,
KeyFile: c.KeyFile, KeyFile: c.KeyFile,
@ -108,16 +109,88 @@ func newETCD3Client(c storagebackend.Config) (*clientv3.Client, error) {
return clientv3.New(cfg) return clientv3.New(cfg)
} }
type runningCompactor struct {
interval time.Duration
cancel context.CancelFunc
client *clientv3.Client
refs int
}
var (
lock sync.Mutex
compactors = map[string]*runningCompactor{}
)
// startCompactorOnce start one compactor per transport. If the interval get smaller on repeated calls, the
// compactor is replaced. A destroy func is returned. If all destroy funcs with the same transport are called,
// the compactor is stopped.
func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration) (func(), error) {
lock.Lock()
defer lock.Unlock()
key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile}
if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval {
compactorClient, err := newETCD3Client(c)
if err != nil {
return nil, err
}
if foundBefore {
// replace compactor
compactor.cancel()
compactor.client.Close()
} else {
// start new compactor
compactor = &runningCompactor{}
compactors[key] = compactor
}
ctx, cancel := context.WithCancel(context.Background())
compactor.interval = interval
compactor.cancel = cancel
compactor.client = compactorClient
etcd3.StartCompactor(ctx, compactorClient, interval)
}
compactors[key].refs++
return func() {
lock.Lock()
defer lock.Unlock()
compactor := compactors[key]
compactor.refs--
if compactor.refs == 0 {
compactor.cancel()
compactor.client.Close()
delete(compactors, key)
}
}, nil
}
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
client, err := newETCD3Client(c) stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
ctx, cancel := context.WithCancel(context.Background())
etcd3.StartCompactor(ctx, client, c.CompactionInterval) client, err := newETCD3Client(c.Transport)
if err != nil {
stopCompactor()
return nil, nil, err
}
var once sync.Once
destroyFunc := func() { destroyFunc := func() {
cancel() // we know that storage destroy funcs are called multiple times (due to reuse in subresources).
client.Close() // Hence, we only destroy once.
// TODO: fix duplicated storage destroy calls higher level
once.Do(func() {
stopCompactor()
client.Close()
})
} }
transformer := c.Transformer transformer := c.Transformer
if transformer == nil { if transformer == nil {

View File

@ -66,12 +66,14 @@ func TestTLSConnection(t *testing.T) {
defer cluster.Terminate(t) defer cluster.Terminate(t)
cfg := storagebackend.Config{ cfg := storagebackend.Config{
Type: storagebackend.StorageTypeETCD3, Type: storagebackend.StorageTypeETCD3,
ServerList: []string{cluster.Members[0].GRPCAddr()}, Transport: storagebackend.TransportConfig{
CertFile: certFile, ServerList: []string{cluster.Members[0].GRPCAddr()},
KeyFile: keyFile, CertFile: certFile,
CAFile: caFile, KeyFile: keyFile,
Codec: codec, CAFile: caFile,
},
Codec: codec,
} }
storage, destroyFunc, err := newETCD3Storage(cfg) storage, destroyFunc, err := newETCD3Storage(cfg)
defer destroyFunc() defer destroyFunc()

View File

@ -74,7 +74,7 @@ func StartRealMasterOrDie(t *testing.T) *Master {
kubeAPIServerOptions.InsecureServing.BindPort = 0 kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.SecureServing.Listener = listener kubeAPIServerOptions.SecureServing.Listener = listener
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()} kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()}
kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // force json we can easily interpret the result in etcd kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // force json we can easily interpret the result in etcd
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"} kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"}
@ -88,7 +88,7 @@ func StartRealMasterOrDie(t *testing.T) *Master {
} }
// get etcd client before starting API server // get etcd client before starting API server
rawClient, kvClient, err := integration.GetEtcdClients(completedOptions.Etcd.StorageConfig) rawClient, kvClient, err := integration.GetEtcdClients(completedOptions.Etcd.StorageConfig.Transport)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -98,7 +98,7 @@ func TestAggregatedAPIServer(t *testing.T) {
kubeAPIServerOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1") kubeAPIServerOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.InsecureServing.BindPort = 0 kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()} kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()}
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"} kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"}
kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"} kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"}

View File

@ -89,7 +89,7 @@ func startTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.InsecureServing.BindPort = 0 kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.Etcd.StorageConfig.Prefix = path.Join("/", uuid.New(), "registry") kubeAPIServerOptions.Etcd.StorageConfig.Prefix = path.Join("/", uuid.New(), "registry")
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()} kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()}
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"} kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"}
kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"} kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"}

View File

@ -242,7 +242,7 @@ func NewMasterConfig() *master.Config {
// prefix code, so please don't change without ensuring // prefix code, so please don't change without ensuring
// sufficient coverage in other ways. // sufficient coverage in other ways.
etcdOptions := options.NewEtcdOptions(storagebackend.NewDefaultConfig(uuid.New(), nil)) etcdOptions := options.NewEtcdOptions(storagebackend.NewDefaultConfig(uuid.New(), nil))
etcdOptions.StorageConfig.ServerList = []string{GetEtcdURL()} etcdOptions.StorageConfig.Transport.ServerList = []string{GetEtcdURL()}
info, _ := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON) info, _ := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
ns := NewSingleContentTypeSerializer(legacyscheme.Scheme, info) ns := NewSingleContentTypeSerializer(legacyscheme.Scheme, info)
@ -341,7 +341,7 @@ func RunAMasterUsingServer(masterConfig *master.Config, s *httptest.Server, mast
// SharedEtcd creates a storage config for a shared etcd instance, with a unique prefix. // SharedEtcd creates a storage config for a shared etcd instance, with a unique prefix.
func SharedEtcd() *storagebackend.Config { func SharedEtcd() *storagebackend.Config {
cfg := storagebackend.NewDefaultConfig(path.Join(uuid.New(), "registry"), nil) cfg := storagebackend.NewDefaultConfig(path.Join(uuid.New(), "registry"), nil)
cfg.ServerList = []string{GetEtcdURL()} cfg.Transport.ServerList = []string{GetEtcdURL()}
return cfg return cfg
} }

View File

@ -228,7 +228,7 @@ func (e *transformTest) createSecret(name, namespace string) (*corev1.Secret, er
} }
func (e *transformTest) readRawRecordFromETCD(path string) (*clientv3.GetResponse, error) { func (e *transformTest) readRawRecordFromETCD(path string) (*clientv3.GetResponse, error) {
_, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig) _, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig.Transport)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %v", err) return nil, fmt.Errorf("failed to create etcd client: %v", err)
} }

View File

@ -67,7 +67,7 @@ func WaitForPodToDisappear(podClient coreclient.PodInterface, podName string, in
}) })
} }
func GetEtcdClients(config storagebackend.Config) (*clientv3.Client, clientv3.KV, error) { func GetEtcdClients(config storagebackend.TransportConfig) (*clientv3.Client, clientv3.KV, error) {
tlsInfo := transport.TLSInfo{ tlsInfo := transport.TLSInfo{
CertFile: config.CertFile, CertFile: config.CertFile,
KeyFile: config.KeyFile, KeyFile: config.KeyFile,