diff --git a/pkg/probe/http/http.go b/pkg/probe/http/http.go index 13e85d1ff1..d0744e792e 100644 --- a/pkg/probe/http/http.go +++ b/pkg/probe/http/http.go @@ -32,7 +32,12 @@ import ( func New() HTTPProber { tlsConfig := &tls.Config{InsecureSkipVerify: true} - transport := utilnet.SetTransportDefaults(&http.Transport{TLSClientConfig: tlsConfig, DisableKeepAlives: true}) + return NewWithTLSConfig(tlsConfig) +} + +// NewWithTLSConfig takes tls config as parameter. +func NewWithTLSConfig(config *tls.Config) HTTPProber { + transport := utilnet.SetTransportDefaults(&http.Transport{TLSClientConfig: config, DisableKeepAlives: true}) return httpProber{transport} } diff --git a/pkg/registry/core/componentstatus/rest.go b/pkg/registry/core/componentstatus/rest.go index 93ae7a1ec6..e0f0e18e4e 100644 --- a/pkg/registry/core/componentstatus/rest.go +++ b/pkg/registry/core/componentstatus/rest.go @@ -27,19 +27,16 @@ import ( "k8s.io/apiserver/pkg/registry/rest" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/probe" - httpprober "k8s.io/kubernetes/pkg/probe/http" ) type REST struct { - GetServersToValidate func() map[string]Server - prober httpprober.HTTPProber + GetServersToValidate func() map[string]*Server } // NewStorage returns a new REST. -func NewStorage(serverRetriever func() map[string]Server) *REST { +func NewStorage(serverRetriever func() map[string]*Server) *REST { return &REST{ GetServersToValidate: serverRetriever, - prober: httpprober.New(), } } @@ -60,7 +57,7 @@ func (rs *REST) List(ctx genericapirequest.Context, options *metainternalversion wait.Add(len(servers)) statuses := make(chan api.ComponentStatus, len(servers)) for k, v := range servers { - go func(name string, server Server) { + go func(name string, server *Server) { defer wait.Done() status := rs.getComponentStatus(name, server) statuses <- *status @@ -97,8 +94,8 @@ func ToConditionStatus(s probe.Result) api.ConditionStatus { } } -func (rs *REST) getComponentStatus(name string, server Server) *api.ComponentStatus { - status, msg, err := server.DoServerCheck(rs.prober) +func (rs *REST) getComponentStatus(name string, server *Server) *api.ComponentStatus { + status, msg, err := server.DoServerCheck() errorMsg := "" if err != nil { errorMsg = err.Error() diff --git a/pkg/registry/core/componentstatus/rest_test.go b/pkg/registry/core/componentstatus/rest_test.go index 1d505007fb..cf643d3690 100644 --- a/pkg/registry/core/componentstatus/rest_test.go +++ b/pkg/registry/core/componentstatus/rest_test.go @@ -50,17 +50,17 @@ type testResponse struct { } func NewTestREST(resp testResponse) *REST { + prober := &fakeHttpProber{ + result: resp.result, + body: resp.data, + err: resp.err, + } return &REST{ - GetServersToValidate: func() map[string]Server { - return map[string]Server{ - "test1": {Addr: "testserver1", Port: 8000, Path: "/healthz"}, + GetServersToValidate: func() map[string]*Server { + return map[string]*Server{ + "test1": {Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober}, } }, - prober: &fakeHttpProber{ - result: resp.result, - body: resp.data, - err: resp.err, - }, } } diff --git a/pkg/registry/core/componentstatus/validator.go b/pkg/registry/core/componentstatus/validator.go index 6683dc1c28..ae5ff62be6 100644 --- a/pkg/registry/core/componentstatus/validator.go +++ b/pkg/registry/core/componentstatus/validator.go @@ -17,8 +17,9 @@ limitations under the License. package componentstatus import ( + "crypto/tls" "net/http" - + "sync" "time" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -42,7 +43,10 @@ type Server struct { Port int Path string EnableHTTPS bool + TLSConfig *tls.Config Validate ValidatorFn + Prober httpprober.HTTPProber + Once sync.Once } type ServerStatus struct { @@ -58,14 +62,22 @@ type ServerStatus struct { Err string `json:"err,omitempty"` } -func (server *Server) DoServerCheck(prober httpprober.HTTPProber) (probe.Result, string, error) { +func (server *Server) DoServerCheck() (probe.Result, string, error) { + // setup the prober + server.Once.Do(func() { + if server.Prober != nil { + return + } + server.Prober = httpprober.NewWithTLSConfig(server.TLSConfig) + }) + scheme := "http" if server.EnableHTTPS { scheme = "https" } url := utilnet.FormatURL(scheme, server.Addr, server.Port, server.Path) - result, data, err := prober.Probe(url, nil, probeTimeOut) + result, data, err := server.Prober.Probe(url, nil, probeTimeOut) if err != nil { return probe.Unknown, "", err diff --git a/pkg/registry/core/componentstatus/validator_test.go b/pkg/registry/core/componentstatus/validator_test.go index 3c0f69edde..9ff62ba746 100644 --- a/pkg/registry/core/componentstatus/validator_test.go +++ b/pkg/registry/core/componentstatus/validator_test.go @@ -59,7 +59,8 @@ func TestValidate(t *testing.T) { } s.Validate = test.validator - result, data, err := s.DoServerCheck(fakeProber) + s.Prober = fakeProber + result, data, err := s.DoServerCheck() if test.expectErr && err == nil { t.Error("unexpected non-error") } diff --git a/pkg/registry/core/rest/BUILD b/pkg/registry/core/rest/BUILD index 545481a033..5c12c71800 100644 --- a/pkg/registry/core/rest/BUILD +++ b/pkg/registry/core/rest/BUILD @@ -15,6 +15,7 @@ go_test( tags = ["automanaged"], deps = [ "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", ], ) diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 757b64e7c5..e46a7cf333 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -242,14 +242,14 @@ type componentStatusStorage struct { storageFactory serverstorage.StorageFactory } -func (s componentStatusStorage) serversToValidate() map[string]componentstatus.Server { - serversToValidate := map[string]componentstatus.Server{ +func (s componentStatusStorage) serversToValidate() map[string]*componentstatus.Server { + serversToValidate := map[string]*componentstatus.Server{ "controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"}, "scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"}, } for ix, machine := range s.storageFactory.Backends() { - etcdUrl, err := url.Parse(machine) + etcdUrl, err := url.Parse(machine.Server) if err != nil { glog.Errorf("Failed to parse etcd url for validation: %v", err) continue @@ -269,9 +269,10 @@ func (s componentStatusStorage) serversToValidate() map[string]componentstatus.S port = 2379 } // TODO: etcd health checking should be abstracted in the storage tier - serversToValidate[fmt.Sprintf("etcd-%d", ix)] = componentstatus.Server{ + serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.Server{ Addr: addr, EnableHTTPS: etcdUrl.Scheme == "https", + TLSConfig: machine.TLSConfig, Port: port, Path: "/health", Validate: etcdutil.EtcdHealthCheck, diff --git a/pkg/registry/core/rest/storage_core_test.go b/pkg/registry/core/rest/storage_core_test.go index 348eb093d4..ba566ac30c 100644 --- a/pkg/registry/core/rest/storage_core_test.go +++ b/pkg/registry/core/rest/storage_core_test.go @@ -20,6 +20,7 @@ import ( "testing" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/storage/storagebackend" ) @@ -47,6 +48,6 @@ func (f fakeStorageFactory) ResourcePrefix(groupResource schema.GroupResource) s return "" } -func (f fakeStorageFactory) Backends() []string { - return []string{"etcd-0"} +func (f fakeStorageFactory) Backends() []storage.Backend { + return []storage.Backend{{Server: "etcd-0"}} } diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go index 98761f1827..3a247c6dae 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go @@ -17,6 +17,9 @@ limitations under the License. package storage import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" "strings" "github.com/golang/glog" @@ -27,6 +30,15 @@ import ( "k8s.io/apiserver/pkg/storage/storagebackend" ) +// Backend describes the storage servers, the information here should be enough +// for health validations. +type Backend struct { + // the url of storage backend like: https://etcd.domain:2379 + Server string + // the required tls config + TLSConfig *tls.Config +} + // StorageFactory is the interface to locate the storage for a given GroupResource type StorageFactory interface { // New finds the storage destination for the given group and resource. It will @@ -40,7 +52,7 @@ type StorageFactory interface { // Backends gets all backends for all registered storage destinations. // Used for getting all instances for health validations. - Backends() []string + Backends() []Backend } // DefaultStorageFactory takes a GroupResource and returns back its storage interface. This result includes: @@ -252,15 +264,45 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (* return &storageConfig, nil } -// Get all backends for all registered storage destinations. +// Backends returns all backends for all registered storage destinations. // Used for getting all instances for health validations. -func (s *DefaultStorageFactory) Backends() []string { - backends := sets.NewString(s.StorageConfig.ServerList...) +func (s *DefaultStorageFactory) Backends() []Backend { + servers := sets.NewString(s.StorageConfig.ServerList...) for _, overrides := range s.Overrides { - backends.Insert(overrides.etcdLocation...) + servers.Insert(overrides.etcdLocation...) } - return backends.List() + + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, + } + if len(s.StorageConfig.CertFile) > 0 && len(s.StorageConfig.KeyFile) > 0 { + cert, err := tls.LoadX509KeyPair(s.StorageConfig.CertFile, s.StorageConfig.KeyFile) + if err != nil { + glog.Errorf("failed to load key pair while getting backends: %s", err) + } else { + tlsConfig.Certificates = []tls.Certificate{cert} + } + } + if len(s.StorageConfig.CAFile) > 0 { + if caCert, err := ioutil.ReadFile(s.StorageConfig.CAFile); err != nil { + glog.Errorf("failed to read ca file while getting backends: %s", err) + } else { + caPool := x509.NewCertPool() + caPool.AppendCertsFromPEM(caCert) + tlsConfig.RootCAs = caPool + tlsConfig.InsecureSkipVerify = false + } + } + + backends := []Backend{} + for server := range servers { + backends = append(backends, Backend{ + Server: server, + TLSConfig: tlsConfig, + }) + } + return backends } func (s *DefaultStorageFactory) ResourcePrefix(groupResource schema.GroupResource) string {