diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 5a519bdcff..f875a24ad1 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "path/filepath" + "sync" "testing" "time" @@ -15,11 +16,23 @@ import ( testutil "github.com/k3s-io/k3s/tests" "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" + "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" utilnet "k8s.io/apimachinery/pkg/util/net" ) +func init() { + logrus.SetLevel(logrus.DebugLevel) +} + func mustGetAddress() string { ipAddr, err := utilnet.ChooseHostInterface() if err != nil { @@ -76,7 +89,7 @@ func Test_UnitETCD_IsInitialized(t *testing.T) { wantErr bool }{ { - name: "Directory exists", + name: "directory exists", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -95,7 +108,7 @@ func Test_UnitETCD_IsInitialized(t *testing.T) { want: true, }, { - name: "Directory does not exist", + name: "directory does not exist", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -117,9 +130,6 @@ func Test_UnitETCD_IsInitialized(t *testing.T) { }, } - // enable logging - logrus.SetLevel(logrus.DebugLevel) - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { e := NewETCD() @@ -159,7 +169,7 @@ func Test_UnitETCD_Register(t *testing.T) { wantErr bool }{ { - name: "Call Register with standard config", + name: "standard config", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -174,7 +184,7 @@ func Test_UnitETCD_Register(t *testing.T) { }, }, { - name: "Call Register with a tombstone file created", + name: "with a tombstone file created", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -249,7 +259,7 @@ func Test_UnitETCD_Start(t *testing.T) { wantErr bool }{ { - name: "Start etcd without clientAccessInfo and without snapshots", + name: "nil clientAccessInfo and nil cron", fields: fields{ config: generateTestConfig(), address: mustGetAddress(), @@ -266,17 +276,18 @@ func Test_UnitETCD_Start(t *testing.T) { }, teardown: func(e *ETCD, ctxInfo *contextInfo) error { // RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes - if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { - return err - } + err := e.RemoveSelf(ctxInfo.ctx) ctxInfo.cancel() - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) + if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { + return err + } return nil }, }, { - name: "Start etcd without clientAccessInfo on", + name: "nil clientAccessInfo", fields: fields{ config: generateTestConfig(), address: mustGetAddress(), @@ -293,17 +304,18 @@ func Test_UnitETCD_Start(t *testing.T) { }, teardown: func(e *ETCD, ctxInfo *contextInfo) error { // RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes - if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { - return err - } + err := e.RemoveSelf(ctxInfo.ctx) ctxInfo.cancel() time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) + if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { + return err + } return nil }, }, { - name: "Start etcd with an existing cluster", + name: "existing cluster", fields: fields{ config: generateTestConfig(), address: mustGetAddress(), @@ -322,13 +334,14 @@ func Test_UnitETCD_Start(t *testing.T) { }, teardown: func(e *ETCD, ctxInfo *contextInfo) error { // RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes - if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { - return err - } + err := e.RemoveSelf(ctxInfo.ctx) ctxInfo.cancel() time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) os.Remove(walDir(e.config)) + if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { + return err + } return nil }, }, @@ -353,8 +366,478 @@ func Test_UnitETCD_Start(t *testing.T) { } if err := tt.teardown(e, &tt.fields.context); err != nil { t.Errorf("Teardown for ETCD.Start() failed = %v", err) + } + }) + } +} + +func Test_UnitETCD_Test(t *testing.T) { + type contextInfo struct { + ctx context.Context + cancel context.CancelFunc + } + type fields struct { + context contextInfo + client *clientv3.Client + config *config.Control + name string + address string + } + type args struct { + clientAccessInfo *clientaccess.Info + } + tests := []struct { + name string + fields fields + setup func(e *ETCD, ctxInfo *contextInfo) error + teardown func(e *ETCD, ctxInfo *contextInfo) error + wantErr bool + }{ + { + name: "no server running", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "unreachable server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + e.config.Runtime.EtcdConfig.Endpoints = []string{"https://192.0.2.0:2379"} // RFC5737 + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "learner server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, true, false, false, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "corrupt server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, true, false, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "leaderless server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, false, true, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "normal server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, false, false, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: false, + }, + { + name: "alarm on other server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + extraAlarm := &etcdserverpb.AlarmMember{MemberID: 2, Alarm: etcdserverpb.AlarmType_NOSPACE} + if err := startMock(ctxInfo.ctx, e, false, false, false, time.Second, extraAlarm); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: false, + }, + { + name: "slow defrag", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, false, false, 40*time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &ETCD{ + client: tt.fields.client, + config: tt.fields.config, + name: tt.fields.name, + address: tt.fields.address, + } + + if err := tt.setup(e, &tt.fields.context); err != nil { + t.Errorf("Setup for ETCD.Test() failed = %v", err) return } + start := time.Now() + err := e.Test(tt.fields.context.ctx) + duration := time.Now().Sub(start) + t.Logf("ETCD.Test() completed in %v with err=%v", duration, err) + if (err != nil) != tt.wantErr { + t.Errorf("ETCD.Test() error = %v, wantErr %v", err, tt.wantErr) + } + if err := tt.teardown(e, &tt.fields.context); err != nil { + t.Errorf("Teardown for ETCD.Test() failed = %v", err) + } }) } } + +// startMock starts up a mock etcd grpc service with canned responses +// that can be used to test specific scenarios. +func startMock(ctx context.Context, e *ETCD, isLearner, isCorrupt, noLeader bool, defragDelay time.Duration, extraAlarms ...*etcdserverpb.AlarmMember) error { + address := authority(getEndpoints(e.config)[0]) + // listen on endpoint and close listener on context cancel + listener, err := net.Listen("tcp", address) + if err != nil { + return err + } + + // set up tls if enabled + gopts := []grpc.ServerOption{} + if e.config.Datastore.ServerTLSConfig.CertFile != "" && e.config.Datastore.ServerTLSConfig.KeyFile != "" { + creds, err := credentials.NewServerTLSFromFile(e.config.Datastore.ServerTLSConfig.CertFile, e.config.Datastore.ServerTLSConfig.KeyFile) + if err != nil { + return err + } + gopts = append(gopts, grpc.Creds(creds)) + } + server := grpc.NewServer(gopts...) + + mock := &mockEtcd{ + e: e, + mu: &sync.RWMutex{}, + isLearner: isLearner, + isCorrupt: isCorrupt, + noLeader: noLeader, + defragDelay: defragDelay, + extraAlarms: extraAlarms, + } + + // register grpc services + etcdserverpb.RegisterKVServer(server, mock) + etcdserverpb.RegisterClusterServer(server, mock) + etcdserverpb.RegisterMaintenanceServer(server, mock) + + hsrv := health.NewServer() + hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) + healthpb.RegisterHealthServer(server, hsrv) + + reflection.Register(server) + + // shutdown on context cancel + go func() { + <-ctx.Done() + server.GracefulStop() + listener.Close() + }() + + // start serving + go func() { + logrus.Infof("Mock etcd server starting on %s", listener.Addr()) + logrus.Infof("Mock etcd server exited: %v", server.Serve(listener)) + }() + + return nil +} + +type mockEtcd struct { + e *ETCD + mu *sync.RWMutex + calls map[string]int + isLearner bool + isCorrupt bool + noLeader bool + defragDelay time.Duration + extraAlarms []*etcdserverpb.AlarmMember +} + +// increment call counter for this function +func (m *mockEtcd) inc(call string) { + m.mu.Lock() + defer m.mu.Unlock() + if m.calls == nil { + m.calls = map[string]int{} + } + m.calls[call] = m.calls[call] + 1 +} + +// get call counter for this function +func (m *mockEtcd) get(call string) int { + m.mu.RLock() + defer m.mu.RUnlock() + return m.calls[call] +} + +// get alarm list +func (m *mockEtcd) alarms() []*etcdserverpb.AlarmMember { + alarms := m.extraAlarms + if m.get("alarm") < 2 { + // on the first check, return NOSPACE so that we can clear it after defragging + alarms = append(alarms, &etcdserverpb.AlarmMember{ + Alarm: etcdserverpb.AlarmType_NOSPACE, + MemberID: 1, + }) + } + if m.isCorrupt { + // return CORRUPT if so requested + alarms = append(alarms, &etcdserverpb.AlarmMember{ + Alarm: etcdserverpb.AlarmType_CORRUPT, + MemberID: 1, + }) + } + return alarms +} + +// KV mocks +func (m *mockEtcd) Range(context.Context, *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error) { + m.inc("range") + return nil, unsupported("range") +} +func (m *mockEtcd) Put(context.Context, *etcdserverpb.PutRequest) (*etcdserverpb.PutResponse, error) { + m.inc("put") + return nil, unsupported("put") +} +func (m *mockEtcd) DeleteRange(context.Context, *etcdserverpb.DeleteRangeRequest) (*etcdserverpb.DeleteRangeResponse, error) { + m.inc("deleterange") + return nil, unsupported("deleterange") +} +func (m *mockEtcd) Txn(context.Context, *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) { + m.inc("txn") + return nil, unsupported("txn") +} +func (m *mockEtcd) Compact(context.Context, *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) { + m.inc("compact") + return nil, unsupported("compact") +} + +// Maintenance mocks +func (m *mockEtcd) Alarm(ctx context.Context, r *etcdserverpb.AlarmRequest) (*etcdserverpb.AlarmResponse, error) { + m.inc("alarm") + res := &etcdserverpb.AlarmResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + } + if r.Action == etcdserverpb.AlarmRequest_GET { + res.Alarms = m.alarms() + } + return res, nil +} +func (m *mockEtcd) Status(context.Context, *etcdserverpb.StatusRequest) (*etcdserverpb.StatusResponse, error) { + m.inc("status") + res := &etcdserverpb.StatusResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + Leader: 1, + Version: "v3.5.0-mock0", + DbSize: 1024, + DbSizeInUse: 512, + IsLearner: m.isLearner, + } + if m.noLeader { + res.Leader = 0 + res.Errors = append(res.Errors, etcdserver.ErrNoLeader.Error()) + } + for _, a := range m.alarms() { + res.Errors = append(res.Errors, a.String()) + } + return res, nil +} +func (m *mockEtcd) Defragment(ctx context.Context, r *etcdserverpb.DefragmentRequest) (*etcdserverpb.DefragmentResponse, error) { + m.inc("defragment") + // delay defrag response by configured time, or until the request is cancelled + select { + case <-ctx.Done(): + case <-time.After(m.defragDelay): + } + return &etcdserverpb.DefragmentResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + }, nil +} +func (m *mockEtcd) Hash(context.Context, *etcdserverpb.HashRequest) (*etcdserverpb.HashResponse, error) { + m.inc("hash") + return nil, unsupported("hash") +} +func (m *mockEtcd) HashKV(context.Context, *etcdserverpb.HashKVRequest) (*etcdserverpb.HashKVResponse, error) { + m.inc("hashkv") + return nil, unsupported("hashkv") +} +func (m *mockEtcd) Snapshot(*etcdserverpb.SnapshotRequest, etcdserverpb.Maintenance_SnapshotServer) error { + m.inc("snapshot") + return unsupported("snapshot") +} +func (m *mockEtcd) MoveLeader(context.Context, *etcdserverpb.MoveLeaderRequest) (*etcdserverpb.MoveLeaderResponse, error) { + m.inc("moveleader") + return nil, unsupported("moveleader") +} +func (m *mockEtcd) Downgrade(context.Context, *etcdserverpb.DowngradeRequest) (*etcdserverpb.DowngradeResponse, error) { + m.inc("downgrade") + return nil, unsupported("downgrade") +} + +// Cluster mocks +func (m *mockEtcd) MemberAdd(context.Context, *etcdserverpb.MemberAddRequest) (*etcdserverpb.MemberAddResponse, error) { + m.inc("memberadd") + return nil, unsupported("memberadd") +} +func (m *mockEtcd) MemberRemove(context.Context, *etcdserverpb.MemberRemoveRequest) (*etcdserverpb.MemberRemoveResponse, error) { + m.inc("memberremove") + return nil, etcdserver.ErrNotEnoughStartedMembers +} +func (m *mockEtcd) MemberUpdate(context.Context, *etcdserverpb.MemberUpdateRequest) (*etcdserverpb.MemberUpdateResponse, error) { + m.inc("memberupdate") + return nil, unsupported("memberupdate") +} +func (m *mockEtcd) MemberList(context.Context, *etcdserverpb.MemberListRequest) (*etcdserverpb.MemberListResponse, error) { + m.inc("memberlist") + scheme := "http" + if m.e.config.Datastore.ServerTLSConfig.CertFile != "" { + scheme = "https" + } + + return &etcdserverpb.MemberListResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + Members: []*etcdserverpb.Member{ + { + ID: 1, + Name: m.e.name, + IsLearner: m.isLearner, + ClientURLs: []string{scheme + "://127.0.0.1:2379"}, + PeerURLs: []string{scheme + "://" + m.e.address + ":2380"}, + }, + }, + }, nil +} + +func (m *mockEtcd) MemberPromote(context.Context, *etcdserverpb.MemberPromoteRequest) (*etcdserverpb.MemberPromoteResponse, error) { + m.inc("memberpromote") + return nil, unsupported("memberpromote") +} + +func unsupported(field string) error { + return status.New(codes.Unimplemented, field+" is not implemented").Err() +} diff --git a/tests/unit.go b/tests/unit.go index ee76af0385..5bc40cea3b 100644 --- a/tests/unit.go +++ b/tests/unit.go @@ -54,6 +54,10 @@ func GenerateRuntime(cnf *config.Control) error { deps.CreateRuntimeCertFiles(cnf) + cnf.Datastore.ServerTLSConfig.CAFile = cnf.Runtime.ETCDServerCA + cnf.Datastore.ServerTLSConfig.CertFile = cnf.Runtime.ServerETCDCert + cnf.Datastore.ServerTLSConfig.KeyFile = cnf.Runtime.ServerETCDKey + return deps.GenServerDeps(cnf) }