Fix issues with defragment and alarm clear on etcd startup

* Use clientv3.NewCtxClient instead of New to avoid automatic retry of all RPCs
* Only timeout status requests; allow defrag and alarm clear requests to run to completion.
* Only clear alarms on the local cluster member, not ALL cluster members

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 095e34d816)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/11326/head
Brad Davidson 2024-10-24 00:16:34 +00:00 committed by Brad Davidson
parent 3a5fa71914
commit 8c6d5a17e0
4 changed files with 197 additions and 74 deletions

4
go.mod
View File

@ -138,9 +138,11 @@ require (
github.com/vishvananda/netlink v1.2.1-beta.2
github.com/yl2chen/cidranger v1.0.2
go.etcd.io/etcd/api/v3 v3.5.16
go.etcd.io/etcd/client/pkg/v3 v3.5.16
go.etcd.io/etcd/client/v3 v3.5.16
go.etcd.io/etcd/etcdutl/v3 v3.5.13
go.etcd.io/etcd/server/v3 v3.5.16
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.27.0
golang.org/x/net v0.28.0
golang.org/x/sync v0.8.0
@ -434,7 +436,6 @@ require (
github.com/xlab/treeprint v1.2.0 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
go.etcd.io/bbolt v1.3.11 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.16 // indirect
go.etcd.io/etcd/client/v2 v2.305.16 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.16 // indirect
go.etcd.io/etcd/raft/v3 v3.5.16 // indirect
@ -456,7 +457,6 @@ require (
go.uber.org/fx v1.20.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.19.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect

View File

@ -41,8 +41,15 @@ import (
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/credentials"
snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -55,7 +62,7 @@ import (
)
const (
testTimeout = time.Second * 30
statusTimeout = time.Second * 30
manageTickerTime = time.Second * 15
learnerMaxStallTime = time.Minute * 5
memberRemovalTimeout = time.Minute * 1
@ -206,35 +213,40 @@ func (e *ETCD) Test(ctx context.Context) error {
return errors.New("etcd datastore is not started")
}
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()
endpoints := getEndpoints(e.config)
status, err := e.client.Status(ctx, endpoints[0])
status, err := e.status(ctx)
if err != nil {
return err
}
if status.IsLearner {
return errors.Wrap(err, "failed to get etcd status")
} else if status.IsLearner {
return errors.New("this server has not yet been promoted from learner to voting member")
} else if status.Leader == 0 {
return etcdserver.ErrNoLeader
}
logrus.Infof("Connected to etcd v%s - datastore using %d of %d bytes", status.Version, status.DbSizeInUse, status.DbSize)
if len(status.Errors) > 0 {
logrus.Warnf("Errors present on etcd cluster: %s", strings.Join(status.Errors, ","))
}
// defrag this node to reclaim freed space from compacted revisions
if err := e.defragment(ctx); err != nil {
return errors.Wrap(err, "failed to defragment etcd database")
}
if err := e.clearAlarms(ctx); err != nil {
return errors.Wrap(err, "failed to report and disarm etcd alarms")
// clear alarms on this node
if err := e.clearAlarms(ctx, status.Header.MemberId); err != nil {
return errors.Wrap(err, "failed to disarm etcd alarms")
}
// refresh status to see if any errors remain after clearing alarms
status, err = e.client.Status(ctx, endpoints[0])
// refresh status - note that errors may remain on other nodes, but this
// should not prevent us from continuing with startup.
status, err = e.status(ctx)
if err != nil {
return err
return errors.Wrap(err, "failed to get etcd status")
}
logrus.Infof("Datastore using %d of %d bytes after defragment", status.DbSizeInUse, status.DbSize)
if len(status.Errors) > 0 {
return fmt.Errorf("etcd cluster errors: %s", strings.Join(status.Errors, ", "))
logrus.Warnf("Errors present on etcd cluster after defragment: %s", strings.Join(status.Errors, ","))
}
members, err := e.client.MemberList(ctx)
@ -242,6 +254,7 @@ func (e *ETCD) Test(ctx context.Context) error {
return err
}
// Ensure that there is a cluster member with our peerURL and name
var memberNameUrls []string
for _, member := range members.Members {
for _, peerURL := range member.PeerURLs {
@ -253,6 +266,8 @@ func (e *ETCD) Test(ctx context.Context) error {
memberNameUrls = append(memberNameUrls, member.Name+"="+member.PeerURLs[0])
}
}
// no matching PeerURL on any Member, return an error that indicates what was expected vs what we found.
return &membershipError{members: memberNameUrls, self: e.name + "=" + e.peerURL()}
}
@ -523,7 +538,7 @@ func (e *ETCD) startClient(ctx context.Context) error {
e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert
e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey
client, err := getClient(ctx, e.config, endpoints...)
client, conn, err := getClient(ctx, e.config, endpoints...)
if err != nil {
return err
}
@ -531,9 +546,8 @@ func (e *ETCD) startClient(ctx context.Context) error {
go func() {
<-ctx.Done()
client := e.client
e.client = nil
client.Close()
conn.Close()
}()
return nil
@ -554,11 +568,11 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
return err
}
client, err := getClient(clientCtx, e.config, clientURLs...)
client, conn, err := getClient(clientCtx, e.config, clientURLs...)
if err != nil {
return err
}
defer client.Close()
defer conn.Close()
for _, member := range memberList.Members {
for _, peer := range member.PeerURLs {
@ -725,13 +739,53 @@ func (e *ETCD) infoHandler() http.Handler {
// If the runtime config does not list any endpoints, the default endpoint is used.
// The returned client should be closed when no longer needed, in order to avoid leaking GRPC
// client goroutines.
func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, error) {
cfg, err := getClientConfig(ctx, control, endpoints...)
func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, *grpc.ClientConn, error) {
logger, err := logutil.CreateDefaultZapLogger(zapcore.DebugLevel)
if err != nil {
return nil, err
return nil, nil, err
}
return clientv3.New(*cfg)
cfg, err := getClientConfig(ctx, control, endpoints...)
if err != nil {
return nil, nil, err
}
// Set up dialer and resolver options.
// This is normally handled by clientv3.New() but that wraps all the GRPC
// service with retry handlers and uses deprecated grpc.DialContext() which
// tries to establish a connection even when one isn't wanted.
if cfg.DialKeepAliveTime > 0 {
params := keepalive.ClientParameters{
Time: cfg.DialKeepAliveTime,
Timeout: cfg.DialKeepAliveTimeout,
PermitWithoutStream: cfg.PermitWithoutStream,
}
cfg.DialOptions = append(cfg.DialOptions, grpc.WithKeepaliveParams(params))
}
if cfg.TLS != nil {
creds := credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials()
cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(creds))
} else {
cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
cfg.DialOptions = append(cfg.DialOptions, grpc.WithResolvers(NewSimpleResolver(cfg.Endpoints[0])))
target := fmt.Sprintf("%s://%p/%s", scheme, cfg, authority(cfg.Endpoints[0]))
conn, err := grpc.NewClient(target, cfg.DialOptions...)
if err != nil {
return nil, nil, err
}
// Create a new client and wire up the GRPC service interfaces.
// Ref: https://github.com/etcd-io/etcd/blob/v3.5.16/client/v3/client.go#L87
client := clientv3.NewCtxClient(ctx, clientv3.WithZapLogger(logger.Named(version.Program+"-etcd-client")))
client.Cluster = clientv3.NewClusterFromClusterClient(etcdserverpb.NewClusterClient(conn), client)
client.KV = clientv3.NewKVFromKVClient(etcdserverpb.NewKVClient(conn), client)
client.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(etcdserverpb.NewMaintenanceClient(conn), client)
return client, conn, nil
}
// getClientConfig generates an etcd client config connected to the specified endpoints.
@ -851,11 +905,11 @@ func (e *ETCD) migrateFromSQLite(ctx context.Context) error {
}
defer sqliteClient.Close()
etcdClient, err := getClient(ctx, e.config)
etcdClient, conn, err := getClient(ctx, e.config)
if err != nil {
return err
}
defer etcdClient.Close()
defer conn.Close()
values, err := sqliteClient.List(ctx, "/registry/", 0)
if err != nil {
@ -984,7 +1038,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error {
return errors.New("etcd datastore already started")
}
client, err := getClient(ctx, e.config)
client, conn, err := getClient(ctx, e.config)
if err != nil {
return err
}
@ -992,9 +1046,8 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error {
go func() {
<-ctx.Done()
client := e.client
e.client = nil
client.Close()
conn.Close()
}()
if err := cp.Copy(etcdDataDir, tmpDataDir, cp.Options{PreserveOwner: true}); err != nil {
@ -1251,8 +1304,6 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre
}
func (e *ETCD) getETCDStatus(ctx context.Context, url string) (*clientv3.StatusResponse, error) {
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
resp, err := e.client.Status(ctx, url)
if err != nil {
return resp, errors.Wrap(err, "failed to check etcd member status")
@ -1363,12 +1414,10 @@ func (e *ETCD) setLearnerProgress(ctx context.Context, status *learnerProgress)
return err
}
// clearAlarms checks for any alarms on the local etcd member. If found, they are
// reported and the alarm state is cleared.
func (e *ETCD) clearAlarms(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()
// clearAlarms checks for any NOSPACE alarms on the local etcd member.
// If found, they are reported and the alarm state is cleared.
// Other alarm types are not handled.
func (e *ETCD) clearAlarms(ctx context.Context, memberID uint64) error {
if e.client == nil {
return errors.New("etcd client was nil")
}
@ -1379,22 +1428,37 @@ func (e *ETCD) clearAlarms(ctx context.Context) error {
}
for _, alarm := range alarmList.Alarms {
logrus.Warnf("Alarm on etcd member %d: %s", alarm.MemberID, alarm.Alarm)
}
if len(alarmList.Alarms) > 0 {
if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{}); err != nil {
return fmt.Errorf("etcd alarm disarm failed: %v", err)
if alarm.MemberID != memberID {
// ignore alarms on other cluster members, they should manage their own problems
continue
}
if alarm.Alarm == etcdserverpb.AlarmType_NOSPACE {
if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{MemberID: alarm.MemberID, Alarm: alarm.Alarm}); err != nil {
return fmt.Errorf("%s disarm failed: %v", alarm.Alarm, err)
}
logrus.Infof("%s disarmed successfully", alarm.Alarm)
} else {
return fmt.Errorf("%s alarm must be disarmed manually", alarm.Alarm)
}
logrus.Infof("Alarms disarmed on etcd server")
}
return nil
}
func (e *ETCD) defragment(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
// status returns status using the first etcd endpoint.
func (e *ETCD) status(ctx context.Context) (*clientv3.StatusResponse, error) {
if e.client == nil {
return nil, errors.New("etcd client was nil")
}
ctx, cancel := context.WithTimeout(ctx, statusTimeout)
defer cancel()
endpoints := getEndpoints(e.config)
return e.client.Status(ctx, endpoints[0])
}
// defragment defragments the etcd datastore using the first etcd endpoint
func (e *ETCD) defragment(ctx context.Context) error {
if e.client == nil {
return errors.New("etcd client was nil")
}
@ -1550,11 +1614,11 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error)
// GetAPIServerURLsFromETCD will try to fetch the version.Program/apiaddresses key from etcd
// and unmarshal it to a list of apiserver endpoints.
func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]string, error) {
cl, err := getClient(ctx, cfg)
cl, conn, err := getClient(ctx, cfg)
if err != nil {
return nil, err
}
defer cl.Close()
defer conn.Close()
etcdResp, err := cl.KV.Get(ctx, AddressKey)
if err != nil {
@ -1576,9 +1640,6 @@ func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]strin
// GetMembersClientURLs will list through the member lists in etcd and return
// back a combined list of client urls for each member in the cluster
func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()
members, err := e.client.MemberList(ctx)
if err != nil {
return nil, err
@ -1593,24 +1654,6 @@ func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) {
return clientURLs, nil
}
// GetMembersNames will list through the member lists in etcd and return
// back a combined list of member names
func (e *ETCD) GetMembersNames(ctx context.Context) ([]string, error) {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()
members, err := e.client.MemberList(ctx)
if err != nil {
return nil, err
}
var memberNames []string
for _, member := range members.Members {
memberNames = append(memberNames, member.Name)
}
return memberNames, nil
}
// RemoveSelf will remove the member if it exists in the cluster. This should
// only be called on a node that may have previously run etcd, but will not
// currently run etcd, to ensure that it is not a member of the cluster.

80
pkg/etcd/resolver.go Normal file
View File

@ -0,0 +1,80 @@
package etcd
import (
"net/url"
"path"
"strings"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)
const scheme = "etcd-endpoint"
type EtcdSimpleResolver struct {
*manual.Resolver
endpoint string
}
// Cribbed from https://github.com/etcd-io/etcd/blob/v3.5.16/client/v3/internal/resolver/resolver.go
// but only supports a single fixed endpoint. We use this instead of the internal etcd client resolver
// because the agent loadbalancer handles failover and we don't want etcd or grpc's special behavior.
func NewSimpleResolver(endpoint string) *EtcdSimpleResolver {
r := manual.NewBuilderWithScheme(scheme)
return &EtcdSimpleResolver{Resolver: r, endpoint: endpoint}
}
func (r *EtcdSimpleResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
res, err := r.Resolver.Build(target, cc, opts)
if err != nil {
return nil, err
}
if r.CC != nil {
addr, serverName := interpret(r.endpoint)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr, ServerName: serverName}},
})
}
return res, nil
}
func interpret(ep string) (string, string) {
if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") {
if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") {
_, absolutePath, _ := strings.Cut(ep, "://")
return "unix://" + absolutePath, path.Base(absolutePath)
}
if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") {
_, localPath, _ := strings.Cut(ep, "://")
return "unix:" + localPath, path.Base(localPath)
}
_, localPath, _ := strings.Cut(ep, ":")
return "unix:" + localPath, path.Base(localPath)
}
if strings.Contains(ep, "://") {
url, err := url.Parse(ep)
if err != nil {
return ep, ep
}
if url.Scheme == "http" || url.Scheme == "https" {
return url.Host, url.Host
}
return ep, url.Host
}
return ep, ep
}
func authority(ep string) string {
if _, authority, ok := strings.Cut(ep, "://"); ok {
return authority
}
if suff, ok := strings.CutPrefix(ep, "unix:"); ok {
return suff
}
if suff, ok := strings.CutPrefix(ep, "unixs:"); ok {
return suff
}
return ep
}

View File

@ -27,7 +27,7 @@ import (
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot"
snapshotv3 "go.etcd.io/etcd/client/v3/snapshot"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -243,7 +243,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
var sf *snapshot.File
if err := snapshotv3.NewV3(e.client.GetLogger()).Save(ctx, *cfg, snapshotPath); err != nil {
if err := snapshotv3.Save(ctx, e.client.GetLogger(), *cfg, snapshotPath); err != nil {
sf = &snapshot.File{
Name: snapshotName,
Location: "",