@ -41,8 +41,15 @@ import (
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/credentials"
snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -55,7 +62,7 @@ import (
)
const (
te stTimeout = time . Second * 30
status Timeout = time . Second * 30
manageTickerTime = time . Second * 15
learnerMaxStallTime = time . Minute * 5
memberRemovalTimeout = time . Minute * 1
@ -206,35 +213,40 @@ func (e *ETCD) Test(ctx context.Context) error {
return errors . New ( "etcd datastore is not started" )
}
ctx , cancel := context . WithTimeout ( ctx , testTimeout )
defer cancel ( )
endpoints := getEndpoints ( e . config )
status , err := e . client . Status ( ctx , endpoints [ 0 ] )
status , err := e . status ( ctx )
if err != nil {
return err
return errors . Wrap ( err , "failed to get etcd status" )
} else if status . IsLearner {
return errors . New ( "this server has not yet been promoted from learner to voting member" )
} else if status . Leader == 0 {
return etcdserver . ErrNoLeader
}
if status . IsLearner {
return errors . New ( "this server has not yet been promoted from learner to voting member" )
logrus . Infof ( "Connected to etcd v%s - datastore using %d of %d bytes" , status . Version , status . DbSizeInUse , status . DbSize )
if len ( status . Errors ) > 0 {
logrus . Warnf ( "Errors present on etcd cluster: %s" , strings . Join ( status . Errors , "," ) )
}
// defrag this node to reclaim freed space from compacted revisions
if err := e . defragment ( ctx ) ; err != nil {
return errors . Wrap ( err , "failed to defragment etcd database" )
}
if err := e . clearAlarms ( ctx ) ; err != nil {
return errors . Wrap ( err , "failed to report and disarm etcd alarms" )
// clear alarms on this node
if err := e . clearAlarms ( ctx , status . Header . MemberId ) ; err != nil {
return errors . Wrap ( err , "failed to disarm etcd alarms" )
}
// refresh status to see if any errors remain after clearing alarms
status , err = e . client . Status ( ctx , endpoints [ 0 ] )
// refresh status - note that errors may remain on other nodes, but this
// should not prevent us from continuing with startup.
status , err = e . status ( ctx )
if err != nil {
return err
return err ors. Wrap ( err , "failed to get etcd status" )
}
logrus . Infof ( "Datastore using %d of %d bytes after defragment" , status . DbSizeInUse , status . DbSize )
if len ( status . Errors ) > 0 {
return fmt . Errorf ( "etcd cluster errors : %s", strings . Join ( status . Errors , ", ") )
logrus . Warnf ( "Errors present on etcd cluster after defragment : %s", strings . Join ( status . Errors , ", ") )
}
members , err := e . client . MemberList ( ctx )
@ -242,6 +254,7 @@ func (e *ETCD) Test(ctx context.Context) error {
return err
}
// Ensure that there is a cluster member with our peerURL and name
var memberNameUrls [ ] string
for _ , member := range members . Members {
for _ , peerURL := range member . PeerURLs {
@ -253,6 +266,8 @@ func (e *ETCD) Test(ctx context.Context) error {
memberNameUrls = append ( memberNameUrls , member . Name + "=" + member . PeerURLs [ 0 ] )
}
}
// no matching PeerURL on any Member, return an error that indicates what was expected vs what we found.
return & membershipError { members : memberNameUrls , self : e . name + "=" + e . peerURL ( ) }
}
@ -523,7 +538,7 @@ func (e *ETCD) startClient(ctx context.Context) error {
e . config . Datastore . BackendTLSConfig . CertFile = e . config . Runtime . ClientETCDCert
e . config . Datastore . BackendTLSConfig . KeyFile = e . config . Runtime . ClientETCDKey
client , err := getClient ( ctx , e . config , endpoints ... )
client , conn, err := getClient ( ctx , e . config , endpoints ... )
if err != nil {
return err
}
@ -531,9 +546,8 @@ func (e *ETCD) startClient(ctx context.Context) error {
go func ( ) {
<- ctx . Done ( )
client := e . client
e . client = nil
c lient . Close ( )
c onn . Close ( )
} ( )
return nil
@ -554,11 +568,11 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
return err
}
client , err := getClient ( clientCtx , e . config , clientURLs ... )
client , conn, err := getClient ( clientCtx , e . config , clientURLs ... )
if err != nil {
return err
}
defer c lient . Close ( )
defer c onn . Close ( )
for _ , member := range memberList . Members {
for _ , peer := range member . PeerURLs {
@ -725,13 +739,53 @@ func (e *ETCD) infoHandler() http.Handler {
// If the runtime config does not list any endpoints, the default endpoint is used.
// The returned client should be closed when no longer needed, in order to avoid leaking GRPC
// client goroutines.
func getClient ( ctx context . Context , control * config . Control , endpoints ... string ) ( * clientv3 . Client , error ) {
func getClient ( ctx context . Context , control * config . Control , endpoints ... string ) ( * clientv3 . Client , * grpc . ClientConn , error ) {
logger , err := logutil . CreateDefaultZapLogger ( zapcore . DebugLevel )
if err != nil {
return nil , nil , err
}
cfg , err := getClientConfig ( ctx , control , endpoints ... )
if err != nil {
return nil , err
return nil , nil , err
}
// Set up dialer and resolver options.
// This is normally handled by clientv3.New() but that wraps all the GRPC
// service with retry handlers and uses deprecated grpc.DialContext() which
// tries to establish a connection even when one isn't wanted.
if cfg . DialKeepAliveTime > 0 {
params := keepalive . ClientParameters {
Time : cfg . DialKeepAliveTime ,
Timeout : cfg . DialKeepAliveTimeout ,
PermitWithoutStream : cfg . PermitWithoutStream ,
}
cfg . DialOptions = append ( cfg . DialOptions , grpc . WithKeepaliveParams ( params ) )
}
if cfg . TLS != nil {
creds := credentials . NewBundle ( credentials . Config { TLSConfig : cfg . TLS } ) . TransportCredentials ( )
cfg . DialOptions = append ( cfg . DialOptions , grpc . WithTransportCredentials ( creds ) )
} else {
cfg . DialOptions = append ( cfg . DialOptions , grpc . WithTransportCredentials ( insecure . NewCredentials ( ) ) )
}
return clientv3 . New ( * cfg )
cfg . DialOptions = append ( cfg . DialOptions , grpc . WithResolvers ( NewSimpleResolver ( cfg . Endpoints [ 0 ] ) ) )
target := fmt . Sprintf ( "%s://%p/%s" , scheme , cfg , authority ( cfg . Endpoints [ 0 ] ) )
conn , err := grpc . NewClient ( target , cfg . DialOptions ... )
if err != nil {
return nil , nil , err
}
// Create a new client and wire up the GRPC service interfaces.
// Ref: https://github.com/etcd-io/etcd/blob/v3.5.16/client/v3/client.go#L87
client := clientv3 . NewCtxClient ( ctx , clientv3 . WithZapLogger ( logger . Named ( version . Program + "-etcd-client" ) ) )
client . Cluster = clientv3 . NewClusterFromClusterClient ( etcdserverpb . NewClusterClient ( conn ) , client )
client . KV = clientv3 . NewKVFromKVClient ( etcdserverpb . NewKVClient ( conn ) , client )
client . Maintenance = clientv3 . NewMaintenanceFromMaintenanceClient ( etcdserverpb . NewMaintenanceClient ( conn ) , client )
return client , conn , nil
}
// getClientConfig generates an etcd client config connected to the specified endpoints.
@ -851,11 +905,11 @@ func (e *ETCD) migrateFromSQLite(ctx context.Context) error {
}
defer sqliteClient . Close ( )
etcdClient , err := getClient ( ctx , e . config )
etcdClient , conn, err := getClient ( ctx , e . config )
if err != nil {
return err
}
defer etcdClient . Close ( )
defer conn . Close ( )
values , err := sqliteClient . List ( ctx , "/registry/" , 0 )
if err != nil {
@ -984,7 +1038,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error {
return errors . New ( "etcd datastore already started" )
}
client , err := getClient ( ctx , e . config )
client , conn, err := getClient ( ctx , e . config )
if err != nil {
return err
}
@ -992,9 +1046,8 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error {
go func ( ) {
<- ctx . Done ( )
client := e . client
e . client = nil
c lient . Close ( )
c onn . Close ( )
} ( )
if err := cp . Copy ( etcdDataDir , tmpDataDir , cp . Options { PreserveOwner : true } ) ; err != nil {
@ -1251,8 +1304,6 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre
}
func ( e * ETCD ) getETCDStatus ( ctx context . Context , url string ) ( * clientv3 . StatusResponse , error ) {
ctx , cancel := context . WithTimeout ( ctx , defaultDialTimeout )
defer cancel ( )
resp , err := e . client . Status ( ctx , url )
if err != nil {
return resp , errors . Wrap ( err , "failed to check etcd member status" )
@ -1363,12 +1414,10 @@ func (e *ETCD) setLearnerProgress(ctx context.Context, status *learnerProgress)
return err
}
// clearAlarms checks for any alarms on the local etcd member. If found, they are
// reported and the alarm state is cleared.
func ( e * ETCD ) clearAlarms ( ctx context . Context ) error {
ctx , cancel := context . WithTimeout ( ctx , testTimeout )
defer cancel ( )
// clearAlarms checks for any NOSPACE alarms on the local etcd member.
// If found, they are reported and the alarm state is cleared.
// Other alarm types are not handled.
func ( e * ETCD ) clearAlarms ( ctx context . Context , memberID uint64 ) error {
if e . client == nil {
return errors . New ( "etcd client was nil" )
}
@ -1379,22 +1428,37 @@ func (e *ETCD) clearAlarms(ctx context.Context) error {
}
for _ , alarm := range alarmList . Alarms {
logrus . Warnf ( "Alarm on etcd member %d: %s" , alarm . MemberID , alarm . Alarm )
}
if len ( alarmList . Alarms ) > 0 {
if _ , err := e . client . AlarmDisarm ( ctx , & clientv3 . AlarmMember { } ) ; err != nil {
return fmt . Errorf ( "etcd alarm disarm failed: %v" , err )
if alarm . MemberID != memberID {
// ignore alarms on other cluster members, they should manage their own problems
continue
}
if alarm . Alarm == etcdserverpb . AlarmType_NOSPACE {
if _ , err := e . client . AlarmDisarm ( ctx , & clientv3 . AlarmMember { MemberID : alarm . MemberID , Alarm : alarm . Alarm } ) ; err != nil {
return fmt . Errorf ( "%s disarm failed: %v" , alarm . Alarm , err )
}
logrus . Infof ( "%s disarmed successfully" , alarm . Alarm )
} else {
return fmt . Errorf ( "%s alarm must be disarmed manually" , alarm . Alarm )
}
logrus . Infof ( "Alarms disarmed on etcd server" )
}
return nil
}
func ( e * ETCD ) defragment ( ctx context . Context ) error {
ctx , cancel := context . WithTimeout ( ctx , testTimeout )
// status returns status using the first etcd endpoint.
func ( e * ETCD ) status ( ctx context . Context ) ( * clientv3 . StatusResponse , error ) {
if e . client == nil {
return nil , errors . New ( "etcd client was nil" )
}
ctx , cancel := context . WithTimeout ( ctx , statusTimeout )
defer cancel ( )
endpoints := getEndpoints ( e . config )
return e . client . Status ( ctx , endpoints [ 0 ] )
}
// defragment defragments the etcd datastore using the first etcd endpoint
func ( e * ETCD ) defragment ( ctx context . Context ) error {
if e . client == nil {
return errors . New ( "etcd client was nil" )
}
@ -1550,11 +1614,11 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error)
// GetAPIServerURLsFromETCD will try to fetch the version.Program/apiaddresses key from etcd
// and unmarshal it to a list of apiserver endpoints.
func GetAPIServerURLsFromETCD ( ctx context . Context , cfg * config . Control ) ( [ ] string , error ) {
cl , err := getClient ( ctx , cfg )
cl , conn, err := getClient ( ctx , cfg )
if err != nil {
return nil , err
}
defer c l . Close ( )
defer c onn . Close ( )
etcdResp , err := cl . KV . Get ( ctx , AddressKey )
if err != nil {
@ -1576,9 +1640,6 @@ func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]strin
// GetMembersClientURLs will list through the member lists in etcd and return
// back a combined list of client urls for each member in the cluster
func ( e * ETCD ) GetMembersClientURLs ( ctx context . Context ) ( [ ] string , error ) {
ctx , cancel := context . WithTimeout ( ctx , testTimeout )
defer cancel ( )
members , err := e . client . MemberList ( ctx )
if err != nil {
return nil , err
@ -1593,24 +1654,6 @@ func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) {
return clientURLs , nil
}
// GetMembersNames will list through the member lists in etcd and return
// back a combined list of member names
func ( e * ETCD ) GetMembersNames ( ctx context . Context ) ( [ ] string , error ) {
ctx , cancel := context . WithTimeout ( ctx , testTimeout )
defer cancel ( )
members , err := e . client . MemberList ( ctx )
if err != nil {
return nil , err
}
var memberNames [ ] string
for _ , member := range members . Members {
memberNames = append ( memberNames , member . Name )
}
return memberNames , nil
}
// RemoveSelf will remove the member if it exists in the cluster. This should
// only be called on a node that may have previously run etcd, but will not
// currently run etcd, to ensure that it is not a member of the cluster.