From 7146ec9d49c603d365c0186da3863d7c023d343c Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 27 Oct 2014 17:56:33 -0700 Subject: [PATCH] Implement kubernetes & kubernetes-ro services --- cmd/apiserver/apiserver.go | 39 ++++++++-- pkg/master/master.go | 77 +++++++++++++++++++ pkg/master/publish.go | 113 ++++++++++++++++++++++++++++ pkg/registry/service/rest.go | 3 + pkg/service/endpoints_controller.go | 6 ++ 5 files changed, 232 insertions(+), 6 deletions(-) create mode 100644 pkg/master/publish.go diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index 9017630dda..7961316c68 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -42,9 +42,23 @@ import ( ) var ( - port = flag.Uint("port", 8080, "The port to listen on. Default 8080") + // Note: the weird ""+ in below lines seems to be the only way to get gofmt to + // arrange these text blocks sensibly. Grrr. + port = flag.Uint("port", 8080, ""+ + "The port to listen on. Default 8080. It is assumed that firewall rules are "+ + "set up such that this port is not reachable from outside of the cluster. It is "+ + "further assumed that port 443 on the cluster's public address is proxied to this "+ + "port. This is performed by nginx in the default setup.") address = util.IP(net.ParseIP("127.0.0.1")) - readOnlyPort = flag.Uint("read_only_port", 7080, "The port from which to serve read-only resources. If 0, don't serve on a read-only address.") + publicAddressOverride = flag.String("public_address_override", "", ""+ + "Public serving address. Read only port will be opened on this address, "+ + "and it is assumed that port 443 at this address will be proxied/redirected "+ + "to '-address':'-port'. If blank, the address in the first listed interface "+ + "will be used.") + readOnlyPort = flag.Uint("read_only_port", 7080, ""+ + "The port from which to serve read-only resources. If 0, don't serve on a "+ + "read-only address. It is assumed that firewall rules are set up such that "+ + "this port is not reachable from outside of the cluster.") apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'.") storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred") cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") @@ -184,7 +198,7 @@ func main() { n := net.IPNet(portalNet) mux := http.NewServeMux() - m := master.New(&master.Config{ + config := &master.Config{ Client: client, Cloud: cloud, EtcdHelper: helper, @@ -207,13 +221,26 @@ func main() { APIPrefix: *apiPrefix, CorsAllowedOriginList: corsAllowedOriginList, TokenAuthFile: *tokenAuthFile, - }) + ReadOnlyPort: int(*readOnlyPort), + ReadWritePort: int(*port), + PublicAddress: *publicAddressOverride, + } + m := master.New(config) + + roLocation := "" if *readOnlyPort != 0 { + roLocation = net.JoinHostPort(config.PublicAddress, strconv.Itoa(config.ReadOnlyPort)) + } + rwLocation := net.JoinHostPort(address.String(), strconv.Itoa(int(*port))) + + // See the flag commentary to understand our assumptions when opening the read-only and read-write ports. + + if roLocation != "" { // Allow 1 read-only request per second, allow up to 20 in a burst before enforcing. rl := util.NewTokenBucketRateLimiter(1.0, 20) readOnlyServer := &http.Server{ - Addr: net.JoinHostPort(address.String(), strconv.Itoa(int(*readOnlyPort))), + Addr: roLocation, Handler: apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.Handler))), ReadTimeout: 5 * time.Minute, WriteTimeout: 5 * time.Minute, @@ -226,7 +253,7 @@ func main() { } s := &http.Server{ - Addr: net.JoinHostPort(address.String(), strconv.Itoa(int(*port))), + Addr: rwLocation, Handler: apiserver.RecoverPanics(m.Handler), ReadTimeout: 5 * time.Minute, WriteTimeout: 5 * time.Minute, diff --git a/pkg/master/master.go b/pkg/master/master.go index b14b3e8851..09b9420ebb 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -19,7 +19,9 @@ package master import ( "net" "net/http" + "strconv" "strings" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -34,6 +36,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" cloudcontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/election" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" @@ -70,6 +73,16 @@ type Config struct { APIPrefix string CorsAllowedOriginList util.StringList TokenAuthFile string + + // The port on PublicAddress where a read-only server will be installed. + // Defaults to 7080 if not set. + ReadOnlyPort int + // The port on PublicAddress where a read-write server will be installed. + // Defaults to 443 if not set. + ReadWritePort int + + // If empty, the first result from net.InterfaceAddrs will be used. + PublicAddress string } // Master contains state for a Kubernetes cluster master/api server. @@ -91,8 +104,18 @@ type Master struct { apiPrefix string corsAllowedOriginList util.StringList tokenAuthFile string + // "Outputs" Handler http.Handler + + elector election.MasterElector + readOnlyServer string + readWriteServer string + electedMasterServices *util.Runner + + // lock must be held when accessing the below read-write members. + lock sync.RWMutex + electedMaster election.Master } // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version @@ -108,8 +131,44 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.MetadataAccessor}}, nil } +// setDefaults fills in any fields not set that are required to have valid data. +func setDefaults(c *Config) { + if c.ReadOnlyPort == 0 { + c.ReadOnlyPort = 7080 + } + if c.ReadWritePort == 0 { + c.ReadWritePort = 443 + } + if c.PublicAddress == "" { + addrs, err := net.InterfaceAddrs() + if err != nil { + glog.Fatalf("Unable to get network interfaces: error='%v'", err) + } + found := false + for i := range addrs { + ip, _, err := net.ParseCIDR(addrs[i].String()) + if err != nil { + glog.Errorf("Error parsing '%v': %v", addrs[i], err) + continue + } + if ip.IsLoopback() { + glog.Infof("'%v' (%v) is a loopback address, ignoring.", ip, addrs[i]) + continue + } + found = true + c.PublicAddress = ip.String() + glog.Infof("Will report %v as public IP address.", ip) + break + } + if !found { + glog.Fatalf("Unable to find suitible network address in list: %v", addrs) + } + } +} + // New returns a new instance of Master connected to the given etcd server. func New(c *Config) *Master { + setDefaults(c) minionRegistry := makeMinionRegistry(c) serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil) boundPodFactory := &pod.BasicBoundPodFactory{ @@ -131,7 +190,11 @@ func New(c *Config) *Master { apiPrefix: c.APIPrefix, corsAllowedOriginList: c.CorsAllowedOriginList, tokenAuthFile: c.TokenAuthFile, + elector: election.NewEtcdMasterElector(c.EtcdHelper.Client), + readOnlyServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadOnlyPort))), + readWriteServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadWritePort))), } + m.electedMasterServices = util.NewRunner(m.serviceWriterLoop, m.electionAnnounce) m.init(c) return m } @@ -188,6 +251,7 @@ func (m *Master) init(c *Config) { // TODO: should appear only in scheduler API group. "bindings": binding.NewREST(m.bindingRegistry), } + apiserver.NewAPIGroup(m.API_v1beta1()).InstallREST(m.mux, c.APIPrefix+"/v1beta1") apiserver.NewAPIGroup(m.API_v1beta2()).InstallREST(m.mux, c.APIPrefix+"/v1beta2") versionHandler := apiserver.APIVersionHandler("v1beta1", "v1beta2") @@ -216,6 +280,19 @@ func (m *Master) init(c *Config) { m.mux.HandleFunc("/_whoami", handleWhoAmI(authenticator)) m.Handler = handler + + if m.readWriteServer != "" { + glog.Infof("Starting election services as %v", m.readWriteServer) + go election.Notify(m.elector, "/registry/elections/k8smaster", m.readWriteServer, m.electedMasterServices) + } + + // TODO: start a goroutine to report ourselves to the elected master. +} + +func (m *Master) electionAnnounce(stop chan struct{}) { + glog.Infof("Elected as master") + <-stop + glog.Info("Lost election for master") } // API_v1beta1 returns the resources and codec for API version v1beta1. diff --git a/pkg/master/publish.go b/pkg/master/publish.go new file mode 100644 index 0000000000..6fc19d26cb --- /dev/null +++ b/pkg/master/publish.go @@ -0,0 +1,113 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +import ( + "fmt" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + + "github.com/golang/glog" +) + +func (m *Master) serviceWriterLoop(stop chan struct{}) { + for { + // Update service & endpoint records. + // TODO: when it becomes possible to change this stuff, + // stop polling and start watching. + // TODO: add endpoints of all replicas, not just the elected master. + if m.readWriteServer != "" { + if err := m.createMasterServiceIfNeeded("kubernetes", 443); err != nil { + glog.Errorf("Can't create rw service: %v", err) + } + if err := m.setEndpoints("kubernetes", []string{m.readWriteServer}); err != nil { + glog.Errorf("Can't create rw endpoints: %v", err) + } + } else { + m.deleteMasterService("kubernetes") + } + if m.readOnlyServer != "" { + if err := m.createMasterServiceIfNeeded("kubernetes-ro", 80); err != nil { + glog.Errorf("Can't create ro service: %v", err) + } + if err := m.setEndpoints("kubernetes-ro", []string{m.readOnlyServer}); err != nil { + glog.Errorf("Can't create rw endpoints: %v", err) + } + } else { + m.deleteMasterService("kubernetes-ro") + } + + select { + case <-stop: + return + case <-time.After(10 * time.Second): + } + } +} + +// createMasterServiceIfNeeded will create the specified service if it +// doesn't already exist. +func (m *Master) createMasterServiceIfNeeded(serviceName string, port int) error { + ctx := api.NewDefaultContext() + if _, err := m.serviceRegistry.GetService(ctx, serviceName); err == nil { + // The service already exists. + return nil + } + svc := &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: serviceName, + Namespace: "default", + }, + Port: port, + // We're going to add the endpoints by hand, so this selector is mainly to + // prevent identification of other pods. This selector will be useful when + // we start hosting apiserver in a pod. + Selector: map[string]string{"provider": "kubernetes", "component": "apiserver"}, + } + // Kids, don't do this at home: this is a hack. There's no good way to call the business + // logic which lives in the REST object from here. + c, err := m.storage["services"].Create(ctx, svc) + if err != nil { + return err + } + resp := <-c + if _, ok := resp.(*api.Service); ok { + // If all worked, we get back an *api.Service object. + return nil + } + return fmt.Errorf("Unexpected response: %#v", resp) +} + +func (m *Master) deleteMasterService(serviceName string) { + ctx := api.NewDefaultContext() + m.serviceRegistry.DeleteService(ctx, serviceName) +} + +// setEndpoints sets the endpoints for the given service. +func (m *Master) setEndpoints(serviceName string, endpoints []string) error { + ctx := api.NewDefaultContext() + e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName) + if err != nil { + e = &api.Endpoints{} + // Fill in ID if it didn't exist already + e.ObjectMeta.Name = serviceName + e.ObjectMeta.Namespace = "default" + } + e.Endpoints = endpoints + return m.endpointRegistry.UpdateEndpoints(ctx, e) +} diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 6aefe50839..8bd1652caf 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -48,6 +48,9 @@ type REST struct { func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, portalNet *net.IPNet) *REST { // TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd) ipa := newIPAllocator(portalNet) + if ipa == nil { + glog.Fatalf("Failed to create an IP allocator. Is subnet '%v' valid?", portalNet) + } reloadIPsFromStorage(ipa, registry) return &REST{ diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 75fb80dff8..f76cec32aa 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -51,6 +51,12 @@ func (e *EndpointController) SyncServiceEndpoints() error { } var resultErr error for _, service := range services.Items { + if service.Name == "kubernetes" || service.Name == "kubernetes-ro" { + // This is a temporary hack for supporting the master services + // until we actually start running apiserver in a pod. + continue + } + glog.Infof("About to update endpoints for service %v", service.Name) pods, err := e.client.Pods(service.Namespace).List(labels.Set(service.Selector).AsSelector()) if err != nil { glog.Errorf("Error syncing service: %#v, skipping.", service)