Add tests for ETCD.Test()

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/10368/merge
Brad Davidson 1 month ago committed by Brad Davidson
parent 095e34d816
commit a39e191906

@ -6,6 +6,7 @@ import (
"net/http"
"os"
"path/filepath"
"sync"
"testing"
"time"
@ -15,11 +16,23 @@ import (
testutil "github.com/k3s-io/k3s/tests"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
utilnet "k8s.io/apimachinery/pkg/util/net"
)
func init() {
logrus.SetLevel(logrus.DebugLevel)
}
func mustGetAddress() string {
ipAddr, err := utilnet.ChooseHostInterface()
if err != nil {
@ -76,7 +89,7 @@ func Test_UnitETCD_IsInitialized(t *testing.T) {
wantErr bool
}{
{
name: "Directory exists",
name: "directory exists",
args: args{
ctx: context.TODO(),
config: generateTestConfig(),
@ -95,7 +108,7 @@ func Test_UnitETCD_IsInitialized(t *testing.T) {
want: true,
},
{
name: "Directory does not exist",
name: "directory does not exist",
args: args{
ctx: context.TODO(),
config: generateTestConfig(),
@ -117,9 +130,6 @@ func Test_UnitETCD_IsInitialized(t *testing.T) {
},
}
// enable logging
logrus.SetLevel(logrus.DebugLevel)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := NewETCD()
@ -159,7 +169,7 @@ func Test_UnitETCD_Register(t *testing.T) {
wantErr bool
}{
{
name: "Call Register with standard config",
name: "standard config",
args: args{
ctx: context.TODO(),
config: generateTestConfig(),
@ -174,7 +184,7 @@ func Test_UnitETCD_Register(t *testing.T) {
},
},
{
name: "Call Register with a tombstone file created",
name: "with a tombstone file created",
args: args{
ctx: context.TODO(),
config: generateTestConfig(),
@ -249,7 +259,7 @@ func Test_UnitETCD_Start(t *testing.T) {
wantErr bool
}{
{
name: "Start etcd without clientAccessInfo and without snapshots",
name: "nil clientAccessInfo and nil cron",
fields: fields{
config: generateTestConfig(),
address: mustGetAddress(),
@ -266,17 +276,18 @@ func Test_UnitETCD_Start(t *testing.T) {
},
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
// RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes
if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() {
return err
}
err := e.RemoveSelf(ctxInfo.ctx)
ctxInfo.cancel()
time.Sleep(10 * time.Second)
time.Sleep(5 * time.Second)
testutil.CleanupDataDir(e.config)
if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() {
return err
}
return nil
},
},
{
name: "Start etcd without clientAccessInfo on",
name: "nil clientAccessInfo",
fields: fields{
config: generateTestConfig(),
address: mustGetAddress(),
@ -293,17 +304,18 @@ func Test_UnitETCD_Start(t *testing.T) {
},
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
// RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes
if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() {
return err
}
err := e.RemoveSelf(ctxInfo.ctx)
ctxInfo.cancel()
time.Sleep(5 * time.Second)
testutil.CleanupDataDir(e.config)
if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() {
return err
}
return nil
},
},
{
name: "Start etcd with an existing cluster",
name: "existing cluster",
fields: fields{
config: generateTestConfig(),
address: mustGetAddress(),
@ -322,13 +334,14 @@ func Test_UnitETCD_Start(t *testing.T) {
},
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
// RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes
if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() {
return err
}
err := e.RemoveSelf(ctxInfo.ctx)
ctxInfo.cancel()
time.Sleep(5 * time.Second)
testutil.CleanupDataDir(e.config)
os.Remove(walDir(e.config))
if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() {
return err
}
return nil
},
},
@ -353,8 +366,478 @@ func Test_UnitETCD_Start(t *testing.T) {
}
if err := tt.teardown(e, &tt.fields.context); err != nil {
t.Errorf("Teardown for ETCD.Start() failed = %v", err)
}
})
}
}
func Test_UnitETCD_Test(t *testing.T) {
type contextInfo struct {
ctx context.Context
cancel context.CancelFunc
}
type fields struct {
context contextInfo
client *clientv3.Client
config *config.Control
name string
address string
}
type args struct {
clientAccessInfo *clientaccess.Info
}
tests := []struct {
name string
fields fields
setup func(e *ETCD, ctxInfo *contextInfo) error
teardown func(e *ETCD, ctxInfo *contextInfo) error
wantErr bool
}{
{
name: "no server running",
fields: fields{
config: generateTestConfig(),
address: mustGetAddress(),
name: "default",
},
setup: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
testutil.GenerateRuntime(e.config)
return e.startClient(ctxInfo.ctx)
},
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.cancel()
time.Sleep(1 * time.Second)
testutil.CleanupDataDir(e.config)
return nil
},
wantErr: true,
},
{
name: "unreachable server",
fields: fields{
config: generateTestConfig(),
address: mustGetAddress(),
name: "default",
},
setup: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
testutil.GenerateRuntime(e.config)
e.config.Runtime.EtcdConfig.Endpoints = []string{"https://192.0.2.0:2379"} // RFC5737
return e.startClient(ctxInfo.ctx)
},
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.cancel()
time.Sleep(1 * time.Second)
testutil.CleanupDataDir(e.config)
return nil
},
wantErr: true,
},
{
name: "learner server",
fields: fields{
config: generateTestConfig(),
address: mustGetAddress(),
name: "default",
},
setup: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
testutil.GenerateRuntime(e.config)
if err := startMock(ctxInfo.ctx, e, true, false, false, time.Second); err != nil {
return err
}
return e.startClient(ctxInfo.ctx)
},
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.cancel()
time.Sleep(1 * time.Second)
testutil.CleanupDataDir(e.config)
return nil
},
wantErr: true,
},
{
name: "corrupt server",
fields: fields{
config: generateTestConfig(),
address: mustGetAddress(),
name: "default",
},
setup: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
testutil.GenerateRuntime(e.config)
if err := startMock(ctxInfo.ctx, e, false, true, false, time.Second); err != nil {
return err
}
return e.startClient(ctxInfo.ctx)
},
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.cancel()
time.Sleep(1 * time.Second)
testutil.CleanupDataDir(e.config)
return nil
},
wantErr: true,
},
{
name: "leaderless server",
fields: fields{
config: generateTestConfig(),
address: mustGetAddress(),
name: "default",
},
setup: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
testutil.GenerateRuntime(e.config)
if err := startMock(ctxInfo.ctx, e, false, false, true, time.Second); err != nil {
return err
}
return e.startClient(ctxInfo.ctx)
},
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.cancel()
time.Sleep(1 * time.Second)
testutil.CleanupDataDir(e.config)
return nil
},
wantErr: true,
},
{
name: "normal server",
fields: fields{
config: generateTestConfig(),
address: mustGetAddress(),
name: "default",
},
setup: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
testutil.GenerateRuntime(e.config)
if err := startMock(ctxInfo.ctx, e, false, false, false, time.Second); err != nil {
return err
}
return e.startClient(ctxInfo.ctx)
},
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.cancel()
time.Sleep(1 * time.Second)
testutil.CleanupDataDir(e.config)
return nil
},
wantErr: false,
},
{
name: "alarm on other server",
fields: fields{
config: generateTestConfig(),
address: mustGetAddress(),
name: "default",
},
setup: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
testutil.GenerateRuntime(e.config)
extraAlarm := &etcdserverpb.AlarmMember{MemberID: 2, Alarm: etcdserverpb.AlarmType_NOSPACE}
if err := startMock(ctxInfo.ctx, e, false, false, false, time.Second, extraAlarm); err != nil {
return err
}
return e.startClient(ctxInfo.ctx)
},
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.cancel()
time.Sleep(1 * time.Second)
testutil.CleanupDataDir(e.config)
return nil
},
wantErr: false,
},
{
name: "slow defrag",
fields: fields{
config: generateTestConfig(),
address: mustGetAddress(),
name: "default",
},
setup: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
testutil.GenerateRuntime(e.config)
if err := startMock(ctxInfo.ctx, e, false, false, false, 40*time.Second); err != nil {
return err
}
return e.startClient(ctxInfo.ctx)
},
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
ctxInfo.cancel()
time.Sleep(1 * time.Second)
testutil.CleanupDataDir(e.config)
return nil
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &ETCD{
client: tt.fields.client,
config: tt.fields.config,
name: tt.fields.name,
address: tt.fields.address,
}
if err := tt.setup(e, &tt.fields.context); err != nil {
t.Errorf("Setup for ETCD.Test() failed = %v", err)
return
}
start := time.Now()
err := e.Test(tt.fields.context.ctx)
duration := time.Now().Sub(start)
t.Logf("ETCD.Test() completed in %v with err=%v", duration, err)
if (err != nil) != tt.wantErr {
t.Errorf("ETCD.Test() error = %v, wantErr %v", err, tt.wantErr)
}
if err := tt.teardown(e, &tt.fields.context); err != nil {
t.Errorf("Teardown for ETCD.Test() failed = %v", err)
}
})
}
}
// startMock starts up a mock etcd grpc service with canned responses
// that can be used to test specific scenarios.
func startMock(ctx context.Context, e *ETCD, isLearner, isCorrupt, noLeader bool, defragDelay time.Duration, extraAlarms ...*etcdserverpb.AlarmMember) error {
address := authority(getEndpoints(e.config)[0])
// listen on endpoint and close listener on context cancel
listener, err := net.Listen("tcp", address)
if err != nil {
return err
}
// set up tls if enabled
gopts := []grpc.ServerOption{}
if e.config.Datastore.ServerTLSConfig.CertFile != "" && e.config.Datastore.ServerTLSConfig.KeyFile != "" {
creds, err := credentials.NewServerTLSFromFile(e.config.Datastore.ServerTLSConfig.CertFile, e.config.Datastore.ServerTLSConfig.KeyFile)
if err != nil {
return err
}
gopts = append(gopts, grpc.Creds(creds))
}
server := grpc.NewServer(gopts...)
mock := &mockEtcd{
e: e,
mu: &sync.RWMutex{},
isLearner: isLearner,
isCorrupt: isCorrupt,
noLeader: noLeader,
defragDelay: defragDelay,
extraAlarms: extraAlarms,
}
// register grpc services
etcdserverpb.RegisterKVServer(server, mock)
etcdserverpb.RegisterClusterServer(server, mock)
etcdserverpb.RegisterMaintenanceServer(server, mock)
hsrv := health.NewServer()
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(server, hsrv)
reflection.Register(server)
// shutdown on context cancel
go func() {
<-ctx.Done()
server.GracefulStop()
listener.Close()
}()
// start serving
go func() {
logrus.Infof("Mock etcd server starting on %s", listener.Addr())
logrus.Infof("Mock etcd server exited: %v", server.Serve(listener))
}()
return nil
}
type mockEtcd struct {
e *ETCD
mu *sync.RWMutex
calls map[string]int
isLearner bool
isCorrupt bool
noLeader bool
defragDelay time.Duration
extraAlarms []*etcdserverpb.AlarmMember
}
// increment call counter for this function
func (m *mockEtcd) inc(call string) {
m.mu.Lock()
defer m.mu.Unlock()
if m.calls == nil {
m.calls = map[string]int{}
}
m.calls[call] = m.calls[call] + 1
}
// get call counter for this function
func (m *mockEtcd) get(call string) int {
m.mu.RLock()
defer m.mu.RUnlock()
return m.calls[call]
}
// get alarm list
func (m *mockEtcd) alarms() []*etcdserverpb.AlarmMember {
alarms := m.extraAlarms
if m.get("alarm") < 2 {
// on the first check, return NOSPACE so that we can clear it after defragging
alarms = append(alarms, &etcdserverpb.AlarmMember{
Alarm: etcdserverpb.AlarmType_NOSPACE,
MemberID: 1,
})
}
if m.isCorrupt {
// return CORRUPT if so requested
alarms = append(alarms, &etcdserverpb.AlarmMember{
Alarm: etcdserverpb.AlarmType_CORRUPT,
MemberID: 1,
})
}
return alarms
}
// KV mocks
func (m *mockEtcd) Range(context.Context, *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error) {
m.inc("range")
return nil, unsupported("range")
}
func (m *mockEtcd) Put(context.Context, *etcdserverpb.PutRequest) (*etcdserverpb.PutResponse, error) {
m.inc("put")
return nil, unsupported("put")
}
func (m *mockEtcd) DeleteRange(context.Context, *etcdserverpb.DeleteRangeRequest) (*etcdserverpb.DeleteRangeResponse, error) {
m.inc("deleterange")
return nil, unsupported("deleterange")
}
func (m *mockEtcd) Txn(context.Context, *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) {
m.inc("txn")
return nil, unsupported("txn")
}
func (m *mockEtcd) Compact(context.Context, *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) {
m.inc("compact")
return nil, unsupported("compact")
}
// Maintenance mocks
func (m *mockEtcd) Alarm(ctx context.Context, r *etcdserverpb.AlarmRequest) (*etcdserverpb.AlarmResponse, error) {
m.inc("alarm")
res := &etcdserverpb.AlarmResponse{
Header: &etcdserverpb.ResponseHeader{
MemberId: 1,
},
}
if r.Action == etcdserverpb.AlarmRequest_GET {
res.Alarms = m.alarms()
}
return res, nil
}
func (m *mockEtcd) Status(context.Context, *etcdserverpb.StatusRequest) (*etcdserverpb.StatusResponse, error) {
m.inc("status")
res := &etcdserverpb.StatusResponse{
Header: &etcdserverpb.ResponseHeader{
MemberId: 1,
},
Leader: 1,
Version: "v3.5.0-mock0",
DbSize: 1024,
DbSizeInUse: 512,
IsLearner: m.isLearner,
}
if m.noLeader {
res.Leader = 0
res.Errors = append(res.Errors, etcdserver.ErrNoLeader.Error())
}
for _, a := range m.alarms() {
res.Errors = append(res.Errors, a.String())
}
return res, nil
}
func (m *mockEtcd) Defragment(ctx context.Context, r *etcdserverpb.DefragmentRequest) (*etcdserverpb.DefragmentResponse, error) {
m.inc("defragment")
// delay defrag response by configured time, or until the request is cancelled
select {
case <-ctx.Done():
case <-time.After(m.defragDelay):
}
return &etcdserverpb.DefragmentResponse{
Header: &etcdserverpb.ResponseHeader{
MemberId: 1,
},
}, nil
}
func (m *mockEtcd) Hash(context.Context, *etcdserverpb.HashRequest) (*etcdserverpb.HashResponse, error) {
m.inc("hash")
return nil, unsupported("hash")
}
func (m *mockEtcd) HashKV(context.Context, *etcdserverpb.HashKVRequest) (*etcdserverpb.HashKVResponse, error) {
m.inc("hashkv")
return nil, unsupported("hashkv")
}
func (m *mockEtcd) Snapshot(*etcdserverpb.SnapshotRequest, etcdserverpb.Maintenance_SnapshotServer) error {
m.inc("snapshot")
return unsupported("snapshot")
}
func (m *mockEtcd) MoveLeader(context.Context, *etcdserverpb.MoveLeaderRequest) (*etcdserverpb.MoveLeaderResponse, error) {
m.inc("moveleader")
return nil, unsupported("moveleader")
}
func (m *mockEtcd) Downgrade(context.Context, *etcdserverpb.DowngradeRequest) (*etcdserverpb.DowngradeResponse, error) {
m.inc("downgrade")
return nil, unsupported("downgrade")
}
// Cluster mocks
func (m *mockEtcd) MemberAdd(context.Context, *etcdserverpb.MemberAddRequest) (*etcdserverpb.MemberAddResponse, error) {
m.inc("memberadd")
return nil, unsupported("memberadd")
}
func (m *mockEtcd) MemberRemove(context.Context, *etcdserverpb.MemberRemoveRequest) (*etcdserverpb.MemberRemoveResponse, error) {
m.inc("memberremove")
return nil, etcdserver.ErrNotEnoughStartedMembers
}
func (m *mockEtcd) MemberUpdate(context.Context, *etcdserverpb.MemberUpdateRequest) (*etcdserverpb.MemberUpdateResponse, error) {
m.inc("memberupdate")
return nil, unsupported("memberupdate")
}
func (m *mockEtcd) MemberList(context.Context, *etcdserverpb.MemberListRequest) (*etcdserverpb.MemberListResponse, error) {
m.inc("memberlist")
scheme := "http"
if m.e.config.Datastore.ServerTLSConfig.CertFile != "" {
scheme = "https"
}
return &etcdserverpb.MemberListResponse{
Header: &etcdserverpb.ResponseHeader{
MemberId: 1,
},
Members: []*etcdserverpb.Member{
{
ID: 1,
Name: m.e.name,
IsLearner: m.isLearner,
ClientURLs: []string{scheme + "://127.0.0.1:2379"},
PeerURLs: []string{scheme + "://" + m.e.address + ":2380"},
},
},
}, nil
}
func (m *mockEtcd) MemberPromote(context.Context, *etcdserverpb.MemberPromoteRequest) (*etcdserverpb.MemberPromoteResponse, error) {
m.inc("memberpromote")
return nil, unsupported("memberpromote")
}
func unsupported(field string) error {
return status.New(codes.Unimplemented, field+" is not implemented").Err()
}

@ -54,6 +54,10 @@ func GenerateRuntime(cnf *config.Control) error {
deps.CreateRuntimeCertFiles(cnf)
cnf.Datastore.ServerTLSConfig.CAFile = cnf.Runtime.ETCDServerCA
cnf.Datastore.ServerTLSConfig.CertFile = cnf.Runtime.ServerETCDCert
cnf.Datastore.ServerTLSConfig.KeyFile = cnf.Runtime.ServerETCDKey
return deps.GenServerDeps(cnf)
}

Loading…
Cancel
Save