mirror of https://github.com/k3s-io/k3s
Merge pull request #30890 from timothysc/etcd3_v3client
Automatic merge from submit-queue Enable v3 Client as the default on UTs Updates the default initialization to use clientv3 interface to etcd3, and fixes the UTs. This PR includes a cherry-pick of https://github.com/kubernetes/kubernetes/pull/30634 so we can validate the tests, so do not merge until that PR is complete.pull/6/head
commit
9484b2c0a0
|
@ -44,7 +44,7 @@ import (
|
|||
|
||||
// setUp is a convience function for setting up for (most) tests.
|
||||
func setUp(t *testing.T) (GenericAPIServer, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
|
||||
etcdServer := etcdtesting.NewEtcdTestClientServer(t)
|
||||
etcdServer, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
|
||||
|
||||
genericapiserver := GenericAPIServer{}
|
||||
config := Config{}
|
||||
|
|
|
@ -59,7 +59,6 @@ import (
|
|||
"k8s.io/kubernetes/pkg/storage"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
|
||||
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
|
||||
"k8s.io/kubernetes/pkg/storage/storagebackend"
|
||||
"k8s.io/kubernetes/pkg/util/intstr"
|
||||
utilnet "k8s.io/kubernetes/pkg/util/net"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
|
@ -74,7 +73,7 @@ import (
|
|||
|
||||
// setUp is a convience function for setting up for (most) tests.
|
||||
func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
|
||||
server := etcdtesting.NewUnsecuredEtcdTestClientServer(t)
|
||||
server, storageConfig := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
|
||||
|
||||
master := &Master{
|
||||
GenericAPIServer: &genericapiserver.GenericAPIServer{},
|
||||
|
@ -83,16 +82,6 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
|
|||
Config: &genericapiserver.Config{},
|
||||
}
|
||||
|
||||
storageConfig := storagebackend.Config{
|
||||
Prefix: etcdtest.PathPrefix(),
|
||||
CAFile: server.CAFile,
|
||||
KeyFile: server.KeyFile,
|
||||
CertFile: server.CertFile,
|
||||
}
|
||||
for _, url := range server.ClientURLs {
|
||||
storageConfig.ServerList = append(storageConfig.ServerList, url.String())
|
||||
}
|
||||
|
||||
resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig()
|
||||
resourceEncoding.SetVersionEncoding(api.GroupName, *testapi.Default.GroupVersion(), unversioned.GroupVersion{Group: api.GroupName, Version: runtime.APIVersionInternal})
|
||||
resourceEncoding.SetVersionEncoding(autoscaling.GroupName, *testapi.Autoscaling.GroupVersion(), unversioned.GroupVersion{Group: autoscaling.GroupName, Version: runtime.APIVersionInternal})
|
||||
|
@ -101,7 +90,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
|
|||
resourceEncoding.SetVersionEncoding(extensions.GroupName, *testapi.Extensions.GroupVersion(), unversioned.GroupVersion{Group: extensions.GroupName, Version: runtime.APIVersionInternal})
|
||||
resourceEncoding.SetVersionEncoding(rbac.GroupName, *testapi.Rbac.GroupVersion(), unversioned.GroupVersion{Group: rbac.GroupName, Version: runtime.APIVersionInternal})
|
||||
resourceEncoding.SetVersionEncoding(certificates.GroupName, *testapi.Certificates.GroupVersion(), unversioned.GroupVersion{Group: certificates.GroupName, Version: runtime.APIVersionInternal})
|
||||
storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, testapi.StorageMediaType(), api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
|
||||
storageFactory := genericapiserver.NewDefaultStorageFactory(*storageConfig, testapi.StorageMediaType(), api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
|
||||
|
||||
config.StorageFactory = storageFactory
|
||||
config.APIResourceConfigSource = DefaultAPIResourceConfigSource()
|
||||
|
|
|
@ -38,8 +38,8 @@ import (
|
|||
"k8s.io/kubernetes/pkg/selection"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
|
||||
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
|
||||
"k8s.io/kubernetes/pkg/storage/storagebackend/factory"
|
||||
storagetesting "k8s.io/kubernetes/pkg/storage/testing"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/validation/field"
|
||||
|
@ -1002,10 +1002,13 @@ func TestStoreWatch(t *testing.T) {
|
|||
|
||||
func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesting.EtcdTestServer, *Store) {
|
||||
podPrefix := "/pods"
|
||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||
server, sc := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
|
||||
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}
|
||||
codec := testapi.Default.StorageCodec()
|
||||
s := etcdstorage.NewEtcdStorage(server.Client, codec, etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
|
||||
sc.Codec = testapi.Default.StorageCodec()
|
||||
s, err := factory.Create(*sc)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating storage: %v", err)
|
||||
}
|
||||
if hasCacheEnabled {
|
||||
config := storage.CacherConfig{
|
||||
CacheCapacity: 10,
|
||||
|
@ -1015,7 +1018,7 @@ func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesti
|
|||
ResourcePrefix: podPrefix,
|
||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
|
||||
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
||||
Codec: codec,
|
||||
Codec: sc.Codec,
|
||||
}
|
||||
s = storage.NewCacherFromConfig(config)
|
||||
}
|
||||
|
|
|
@ -30,21 +30,14 @@ import (
|
|||
"k8s.io/kubernetes/pkg/registry/generic/registry"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
|
||||
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
|
||||
"k8s.io/kubernetes/pkg/storage/storagebackend"
|
||||
storagetesting "k8s.io/kubernetes/pkg/storage/testing"
|
||||
)
|
||||
|
||||
func NewEtcdStorage(t *testing.T, group string) (*storagebackend.Config, *etcdtesting.EtcdTestServer) {
|
||||
server := etcdtesting.NewUnsecuredEtcdTestClientServer(t)
|
||||
config := &storagebackend.Config{
|
||||
Type: "etcd2",
|
||||
Prefix: etcdtest.PathPrefix(),
|
||||
ServerList: server.Client.Endpoints(),
|
||||
DeserializationCacheSize: etcdtest.DeserializationCacheSize,
|
||||
Codec: testapi.Groups[group].StorageCodec(),
|
||||
}
|
||||
server, config := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
|
||||
config.Codec = testapi.Groups[group].StorageCodec()
|
||||
return config, server
|
||||
}
|
||||
|
||||
|
|
|
@ -27,12 +27,16 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/testing/testingcert"
|
||||
"k8s.io/kubernetes/pkg/storage/storagebackend"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
|
@ -42,6 +46,8 @@ import (
|
|||
|
||||
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
|
||||
type EtcdTestServer struct {
|
||||
// The following are lumped etcd2 test server params
|
||||
// TODO: Deprecate in a post 1.5 release
|
||||
etcdserver.ServerConfig
|
||||
PeerListeners, ClientListeners []net.Listener
|
||||
Client etcd.Client
|
||||
|
@ -54,6 +60,10 @@ type EtcdTestServer struct {
|
|||
raftHandler http.Handler
|
||||
s *etcdserver.EtcdServer
|
||||
hss []*httptest.Server
|
||||
|
||||
// The following are lumped etcd3 test server params
|
||||
v3Cluster *integration.ClusterV3
|
||||
v3Client *clientv3.Client
|
||||
}
|
||||
|
||||
// newLocalListener opens a port localhost using any port
|
||||
|
@ -219,30 +229,34 @@ func (m *EtcdTestServer) waitUntilUp() error {
|
|||
|
||||
// Terminate will shutdown the running etcd server
|
||||
func (m *EtcdTestServer) Terminate(t *testing.T) {
|
||||
m.Client = nil
|
||||
m.s.Stop()
|
||||
// TODO: This is a pretty ugly hack to workaround races during closing
|
||||
// in-memory etcd server in unit tests - see #18928 for more details.
|
||||
// We should get rid of it as soon as we have a proper fix - etcd clients
|
||||
// have overwritten transport counting opened connections (probably by
|
||||
// overwriting Dial function) and termination function waiting for all
|
||||
// connections to be closed and stopping accepting new ones.
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
for _, hs := range m.hss {
|
||||
hs.CloseClientConnections()
|
||||
hs.Close()
|
||||
}
|
||||
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(m.CertificatesDir) > 0 {
|
||||
if err := os.RemoveAll(m.CertificatesDir); err != nil {
|
||||
if m.v3Cluster != nil {
|
||||
m.v3Cluster.Terminate(t)
|
||||
} else {
|
||||
m.Client = nil
|
||||
m.s.Stop()
|
||||
// TODO: This is a pretty ugly hack to workaround races during closing
|
||||
// in-memory etcd server in unit tests - see #18928 for more details.
|
||||
// We should get rid of it as soon as we have a proper fix - etcd clients
|
||||
// have overwritten transport counting opened connections (probably by
|
||||
// overwriting Dial function) and termination function waiting for all
|
||||
// connections to be closed and stopping accepting new ones.
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
for _, hs := range m.hss {
|
||||
hs.CloseClientConnections()
|
||||
hs.Close()
|
||||
}
|
||||
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(m.CertificatesDir) > 0 {
|
||||
if err := os.RemoveAll(m.CertificatesDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewEtcdTestClientServer creates a new client and server for testing
|
||||
// NewEtcdTestClientServer DEPRECATED creates a new client and server for testing
|
||||
func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
|
||||
server := configureTestCluster(t, "foo", true)
|
||||
err := server.launch(t)
|
||||
|
@ -269,7 +283,7 @@ func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
|
|||
return server
|
||||
}
|
||||
|
||||
// NewUnsecuredEtcdTestClientServer creates a new client and server for testing
|
||||
// NewUnsecuredEtcdTestClientServer DEPRECATED creates a new client and server for testing
|
||||
func NewUnsecuredEtcdTestClientServer(t *testing.T) *EtcdTestServer {
|
||||
server := configureTestCluster(t, "foo", false)
|
||||
err := server.launch(t)
|
||||
|
@ -294,3 +308,18 @@ func NewUnsecuredEtcdTestClientServer(t *testing.T) *EtcdTestServer {
|
|||
}
|
||||
return server
|
||||
}
|
||||
|
||||
// NewEtcd3TestClientServer creates a new client and server for testing
|
||||
func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) {
|
||||
server := &EtcdTestServer{
|
||||
v3Cluster: integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}),
|
||||
}
|
||||
server.v3Client = server.v3Cluster.RandClient()
|
||||
config := &storagebackend.Config{
|
||||
Type: "etcd3",
|
||||
Prefix: etcdtest.PathPrefix(),
|
||||
ServerList: server.v3Client.Endpoints(),
|
||||
DeserializationCacheSize: etcdtest.DeserializationCacheSize,
|
||||
}
|
||||
return server, config
|
||||
}
|
||||
|
|
|
@ -36,7 +36,11 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// NOTE: Client relies on nil tlsConfig
|
||||
// for non-secure connections, update the implicit variable
|
||||
if len(c.CertFile) == 0 && len(c.KeyFile) == 0 && len(c.CAFile) == 0 {
|
||||
tlsConfig = nil
|
||||
}
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: c.ServerList,
|
||||
TLS: tlsConfig,
|
||||
|
|
Loading…
Reference in New Issue