componentstatus: support client cert health check

etcd has support for client-cert-auth, which can be configured via the flag `--ca-file`,
when that is enabled, all the client requests must present with a client certificate,
however, the current component status check uses a single transport for all of the checks,
this is wrong, the checks should be different for each of different component, and make
each of them use different transport(tls configurations).
pull/6/head
zhouhaibing089 2017-01-11 14:40:09 +08:00 committed by haibzhou
parent dee81ed56a
commit b1040171b6
9 changed files with 93 additions and 33 deletions

View File

@ -32,7 +32,12 @@ import (
func New() HTTPProber { func New() HTTPProber {
tlsConfig := &tls.Config{InsecureSkipVerify: true} tlsConfig := &tls.Config{InsecureSkipVerify: true}
transport := utilnet.SetTransportDefaults(&http.Transport{TLSClientConfig: tlsConfig, DisableKeepAlives: true}) return NewWithTLSConfig(tlsConfig)
}
// NewWithTLSConfig takes tls config as parameter.
func NewWithTLSConfig(config *tls.Config) HTTPProber {
transport := utilnet.SetTransportDefaults(&http.Transport{TLSClientConfig: config, DisableKeepAlives: true})
return httpProber{transport} return httpProber{transport}
} }

View File

@ -27,19 +27,16 @@ import (
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/probe"
httpprober "k8s.io/kubernetes/pkg/probe/http"
) )
type REST struct { type REST struct {
GetServersToValidate func() map[string]Server GetServersToValidate func() map[string]*Server
prober httpprober.HTTPProber
} }
// NewStorage returns a new REST. // NewStorage returns a new REST.
func NewStorage(serverRetriever func() map[string]Server) *REST { func NewStorage(serverRetriever func() map[string]*Server) *REST {
return &REST{ return &REST{
GetServersToValidate: serverRetriever, GetServersToValidate: serverRetriever,
prober: httpprober.New(),
} }
} }
@ -60,7 +57,7 @@ func (rs *REST) List(ctx genericapirequest.Context, options *metainternalversion
wait.Add(len(servers)) wait.Add(len(servers))
statuses := make(chan api.ComponentStatus, len(servers)) statuses := make(chan api.ComponentStatus, len(servers))
for k, v := range servers { for k, v := range servers {
go func(name string, server Server) { go func(name string, server *Server) {
defer wait.Done() defer wait.Done()
status := rs.getComponentStatus(name, server) status := rs.getComponentStatus(name, server)
statuses <- *status statuses <- *status
@ -97,8 +94,8 @@ func ToConditionStatus(s probe.Result) api.ConditionStatus {
} }
} }
func (rs *REST) getComponentStatus(name string, server Server) *api.ComponentStatus { func (rs *REST) getComponentStatus(name string, server *Server) *api.ComponentStatus {
status, msg, err := server.DoServerCheck(rs.prober) status, msg, err := server.DoServerCheck()
errorMsg := "" errorMsg := ""
if err != nil { if err != nil {
errorMsg = err.Error() errorMsg = err.Error()

View File

@ -50,17 +50,17 @@ type testResponse struct {
} }
func NewTestREST(resp testResponse) *REST { func NewTestREST(resp testResponse) *REST {
prober := &fakeHttpProber{
result: resp.result,
body: resp.data,
err: resp.err,
}
return &REST{ return &REST{
GetServersToValidate: func() map[string]Server { GetServersToValidate: func() map[string]*Server {
return map[string]Server{ return map[string]*Server{
"test1": {Addr: "testserver1", Port: 8000, Path: "/healthz"}, "test1": {Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober},
} }
}, },
prober: &fakeHttpProber{
result: resp.result,
body: resp.data,
err: resp.err,
},
} }
} }

View File

@ -17,8 +17,9 @@ limitations under the License.
package componentstatus package componentstatus
import ( import (
"crypto/tls"
"net/http" "net/http"
"sync"
"time" "time"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
@ -42,7 +43,10 @@ type Server struct {
Port int Port int
Path string Path string
EnableHTTPS bool EnableHTTPS bool
TLSConfig *tls.Config
Validate ValidatorFn Validate ValidatorFn
Prober httpprober.HTTPProber
Once sync.Once
} }
type ServerStatus struct { type ServerStatus struct {
@ -58,14 +62,22 @@ type ServerStatus struct {
Err string `json:"err,omitempty"` Err string `json:"err,omitempty"`
} }
func (server *Server) DoServerCheck(prober httpprober.HTTPProber) (probe.Result, string, error) { func (server *Server) DoServerCheck() (probe.Result, string, error) {
// setup the prober
server.Once.Do(func() {
if server.Prober != nil {
return
}
server.Prober = httpprober.NewWithTLSConfig(server.TLSConfig)
})
scheme := "http" scheme := "http"
if server.EnableHTTPS { if server.EnableHTTPS {
scheme = "https" scheme = "https"
} }
url := utilnet.FormatURL(scheme, server.Addr, server.Port, server.Path) url := utilnet.FormatURL(scheme, server.Addr, server.Port, server.Path)
result, data, err := prober.Probe(url, nil, probeTimeOut) result, data, err := server.Prober.Probe(url, nil, probeTimeOut)
if err != nil { if err != nil {
return probe.Unknown, "", err return probe.Unknown, "", err

View File

@ -59,7 +59,8 @@ func TestValidate(t *testing.T) {
} }
s.Validate = test.validator s.Validate = test.validator
result, data, err := s.DoServerCheck(fakeProber) s.Prober = fakeProber
result, data, err := s.DoServerCheck()
if test.expectErr && err == nil { if test.expectErr && err == nil {
t.Error("unexpected non-error") t.Error("unexpected non-error")
} }

View File

@ -15,6 +15,7 @@ go_test(
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
], ],
) )

View File

@ -242,14 +242,14 @@ type componentStatusStorage struct {
storageFactory serverstorage.StorageFactory storageFactory serverstorage.StorageFactory
} }
func (s componentStatusStorage) serversToValidate() map[string]componentstatus.Server { func (s componentStatusStorage) serversToValidate() map[string]*componentstatus.Server {
serversToValidate := map[string]componentstatus.Server{ serversToValidate := map[string]*componentstatus.Server{
"controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"}, "controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
"scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"}, "scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
} }
for ix, machine := range s.storageFactory.Backends() { for ix, machine := range s.storageFactory.Backends() {
etcdUrl, err := url.Parse(machine) etcdUrl, err := url.Parse(machine.Server)
if err != nil { if err != nil {
glog.Errorf("Failed to parse etcd url for validation: %v", err) glog.Errorf("Failed to parse etcd url for validation: %v", err)
continue continue
@ -269,9 +269,10 @@ func (s componentStatusStorage) serversToValidate() map[string]componentstatus.S
port = 2379 port = 2379
} }
// TODO: etcd health checking should be abstracted in the storage tier // TODO: etcd health checking should be abstracted in the storage tier
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = componentstatus.Server{ serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.Server{
Addr: addr, Addr: addr,
EnableHTTPS: etcdUrl.Scheme == "https", EnableHTTPS: etcdUrl.Scheme == "https",
TLSConfig: machine.TLSConfig,
Port: port, Port: port,
Path: "/health", Path: "/health",
Validate: etcdutil.EtcdHealthCheck, Validate: etcdutil.EtcdHealthCheck,

View File

@ -20,6 +20,7 @@ import (
"testing" "testing"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
) )
@ -47,6 +48,6 @@ func (f fakeStorageFactory) ResourcePrefix(groupResource schema.GroupResource) s
return "" return ""
} }
func (f fakeStorageFactory) Backends() []string { func (f fakeStorageFactory) Backends() []storage.Backend {
return []string{"etcd-0"} return []storage.Backend{{Server: "etcd-0"}}
} }

View File

@ -17,6 +17,9 @@ limitations under the License.
package storage package storage
import ( import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"strings" "strings"
"github.com/golang/glog" "github.com/golang/glog"
@ -27,6 +30,15 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
) )
// Backend describes the storage servers, the information here should be enough
// for health validations.
type Backend struct {
// the url of storage backend like: https://etcd.domain:2379
Server string
// the required tls config
TLSConfig *tls.Config
}
// StorageFactory is the interface to locate the storage for a given GroupResource // StorageFactory is the interface to locate the storage for a given GroupResource
type StorageFactory interface { type StorageFactory interface {
// New finds the storage destination for the given group and resource. It will // New finds the storage destination for the given group and resource. It will
@ -40,7 +52,7 @@ type StorageFactory interface {
// Backends gets all backends for all registered storage destinations. // Backends gets all backends for all registered storage destinations.
// Used for getting all instances for health validations. // Used for getting all instances for health validations.
Backends() []string Backends() []Backend
} }
// DefaultStorageFactory takes a GroupResource and returns back its storage interface. This result includes: // DefaultStorageFactory takes a GroupResource and returns back its storage interface. This result includes:
@ -252,15 +264,45 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*
return &storageConfig, nil return &storageConfig, nil
} }
// Get all backends for all registered storage destinations. // Backends returns all backends for all registered storage destinations.
// Used for getting all instances for health validations. // Used for getting all instances for health validations.
func (s *DefaultStorageFactory) Backends() []string { func (s *DefaultStorageFactory) Backends() []Backend {
backends := sets.NewString(s.StorageConfig.ServerList...) servers := sets.NewString(s.StorageConfig.ServerList...)
for _, overrides := range s.Overrides { for _, overrides := range s.Overrides {
backends.Insert(overrides.etcdLocation...) servers.Insert(overrides.etcdLocation...)
} }
return backends.List()
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
if len(s.StorageConfig.CertFile) > 0 && len(s.StorageConfig.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(s.StorageConfig.CertFile, s.StorageConfig.KeyFile)
if err != nil {
glog.Errorf("failed to load key pair while getting backends: %s", err)
} else {
tlsConfig.Certificates = []tls.Certificate{cert}
}
}
if len(s.StorageConfig.CAFile) > 0 {
if caCert, err := ioutil.ReadFile(s.StorageConfig.CAFile); err != nil {
glog.Errorf("failed to read ca file while getting backends: %s", err)
} else {
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caPool
tlsConfig.InsecureSkipVerify = false
}
}
backends := []Backend{}
for server := range servers {
backends = append(backends, Backend{
Server: server,
TLSConfig: tlsConfig,
})
}
return backends
} }
func (s *DefaultStorageFactory) ResourcePrefix(groupResource schema.GroupResource) string { func (s *DefaultStorageFactory) ResourcePrefix(groupResource schema.GroupResource) string {