mirror of https://github.com/k3s-io/k3s
Merge pull request #65027 from liggitt/etcd-health-check
Automatic merge from submit-queue (batch tested with PRs 64140, 64898, 65022, 65037, 65027). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Use actual etcd client for /healthz/etcd checks * avoids redialing etcd on every health check (which makes slow DNS a false-positive healthz failure) * ensures etcd TLS setup is correct (errors verifying the etcd API or sending client credentials manifest as healthz failures) * ensures the etcd cluster is actually responsive fixes #64909 ```release-note Etcd health checks by the apiserver now ensure the apiserver can connect to and exercise the etcd API ```pull/8/head
commit
9d97913e75
|
@ -1430,10 +1430,6 @@
|
|||
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd3",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd3/preflight",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apiserver/pkg/storage/names",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
|
|
|
@ -52,8 +52,8 @@ go_library(
|
|||
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/server/resourceconfig:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd3/preflight:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
|
||||
|
|
|
@ -32,8 +32,8 @@ import (
|
|||
"k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/preflight"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
)
|
||||
|
||||
type EtcdOptions struct {
|
||||
|
@ -181,29 +181,30 @@ func (s *EtcdOptions) ApplyTo(c *server.Config) error {
|
|||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.addEtcdHealthEndpoint(c)
|
||||
if err := s.addEtcdHealthEndpoint(c); err != nil {
|
||||
return err
|
||||
}
|
||||
c.RESTOptionsGetter = &SimpleRestOptionsFactory{Options: *s}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
|
||||
s.addEtcdHealthEndpoint(c)
|
||||
if err := s.addEtcdHealthEndpoint(c); err != nil {
|
||||
return err
|
||||
}
|
||||
c.RESTOptionsGetter = &storageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) {
|
||||
func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
|
||||
healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.HealthzChecks = append(c.HealthzChecks, healthz.NamedCheck("etcd", func(r *http.Request) error {
|
||||
done, err := preflight.EtcdConnection{ServerList: s.StorageConfig.ServerList}.CheckEtcdServers()
|
||||
if !done {
|
||||
return fmt.Errorf("etcd failed")
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return healthCheck()
|
||||
}))
|
||||
return nil
|
||||
}
|
||||
|
||||
type SimpleRestOptionsFactory struct {
|
||||
|
|
|
@ -37,6 +37,7 @@ go_library(
|
|||
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
|
||||
|
|
|
@ -17,6 +17,8 @@ limitations under the License.
|
|||
package factory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
@ -30,6 +32,29 @@ import (
|
|||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
)
|
||||
|
||||
func newETCD2HealthCheck(c storagebackend.Config) (func() error, error) {
|
||||
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client, err := newETCD2Client(tr, c.ServerList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
members := etcd2client.NewMembersAPI(client)
|
||||
|
||||
return func() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
if _, err := members.List(ctx); err != nil {
|
||||
return fmt.Errorf("error listing etcd members: %v", err)
|
||||
}
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
|
||||
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
|
||||
if err != nil {
|
||||
|
|
|
@ -18,11 +18,14 @@ package factory
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
|
@ -38,7 +41,41 @@ var (
|
|||
dialTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
|
||||
func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) {
|
||||
// constructing the etcd v3 client blocks and times out if etcd is not available.
|
||||
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
|
||||
|
||||
clientValue := &atomic.Value{}
|
||||
|
||||
clientErrMsg := &atomic.Value{}
|
||||
clientErrMsg.Store("etcd client connection not yet established")
|
||||
|
||||
go wait.PollUntil(time.Second, func() (bool, error) {
|
||||
client, err := newETCD3Client(c)
|
||||
if err != nil {
|
||||
clientErrMsg.Store(err.Error())
|
||||
return false, nil
|
||||
}
|
||||
clientValue.Store(client)
|
||||
clientErrMsg.Store("")
|
||||
return true, nil
|
||||
}, wait.NeverStop)
|
||||
|
||||
return func() error {
|
||||
if errMsg := clientErrMsg.Load().(string); len(errMsg) > 0 {
|
||||
return fmt.Errorf(errMsg)
|
||||
}
|
||||
client := clientValue.Load().(*clientv3.Client)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
if _, err := client.Cluster.MemberList(ctx); err != nil {
|
||||
return fmt.Errorf("error listing etcd members: %v", err)
|
||||
}
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newETCD3Client(c storagebackend.Config) (*clientv3.Client, error) {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: c.CertFile,
|
||||
KeyFile: c.KeyFile,
|
||||
|
@ -46,7 +83,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
|
|||
}
|
||||
tlsConfig, err := tlsInfo.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
// NOTE: Client relies on nil tlsConfig
|
||||
// for non-secure connections, update the implicit variable
|
||||
|
@ -61,6 +98,11 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
|
|||
TLS: tlsConfig,
|
||||
}
|
||||
client, err := clientv3.New(cfg)
|
||||
return client, err
|
||||
}
|
||||
|
||||
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
|
||||
client, err := newETCD3Client(c)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
@ -41,3 +41,15 @@ func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
|
|||
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||
}
|
||||
}
|
||||
|
||||
// CreateHealthCheck creates a healthcheck function based on given config.
|
||||
func CreateHealthCheck(c storagebackend.Config) (func() error, error) {
|
||||
switch c.Type {
|
||||
case storagebackend.StorageTypeETCD2:
|
||||
return newETCD2HealthCheck(c)
|
||||
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
|
||||
return newETCD3HealthCheck(c)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1102,10 +1102,6 @@
|
|||
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd3",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd3/preflight",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apiserver/pkg/storage/names",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
|
|
|
@ -1074,10 +1074,6 @@
|
|||
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd3",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd3/preflight",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apiserver/pkg/storage/names",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
|
|
Loading…
Reference in New Issue