Implement kubernetes & kubernetes-ro services

pull/6/head
Daniel Smith 2014-10-27 17:56:33 -07:00
parent 3045311398
commit 7146ec9d49
5 changed files with 232 additions and 6 deletions

View File

@ -42,9 +42,23 @@ import (
) )
var ( var (
port = flag.Uint("port", 8080, "The port to listen on. Default 8080") // Note: the weird ""+ in below lines seems to be the only way to get gofmt to
// arrange these text blocks sensibly. Grrr.
port = flag.Uint("port", 8080, ""+
"The port to listen on. Default 8080. It is assumed that firewall rules are "+
"set up such that this port is not reachable from outside of the cluster. It is "+
"further assumed that port 443 on the cluster's public address is proxied to this "+
"port. This is performed by nginx in the default setup.")
address = util.IP(net.ParseIP("127.0.0.1")) address = util.IP(net.ParseIP("127.0.0.1"))
readOnlyPort = flag.Uint("read_only_port", 7080, "The port from which to serve read-only resources. If 0, don't serve on a read-only address.") publicAddressOverride = flag.String("public_address_override", "", ""+
"Public serving address. Read only port will be opened on this address, "+
"and it is assumed that port 443 at this address will be proxied/redirected "+
"to '-address':'-port'. If blank, the address in the first listed interface "+
"will be used.")
readOnlyPort = flag.Uint("read_only_port", 7080, ""+
"The port from which to serve read-only resources. If 0, don't serve on a "+
"read-only address. It is assumed that firewall rules are set up such that "+
"this port is not reachable from outside of the cluster.")
apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'.") apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'.")
storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred") storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred")
cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.")
@ -184,7 +198,7 @@ func main() {
n := net.IPNet(portalNet) n := net.IPNet(portalNet)
mux := http.NewServeMux() mux := http.NewServeMux()
m := master.New(&master.Config{ config := &master.Config{
Client: client, Client: client,
Cloud: cloud, Cloud: cloud,
EtcdHelper: helper, EtcdHelper: helper,
@ -207,13 +221,26 @@ func main() {
APIPrefix: *apiPrefix, APIPrefix: *apiPrefix,
CorsAllowedOriginList: corsAllowedOriginList, CorsAllowedOriginList: corsAllowedOriginList,
TokenAuthFile: *tokenAuthFile, TokenAuthFile: *tokenAuthFile,
})
ReadOnlyPort: int(*readOnlyPort),
ReadWritePort: int(*port),
PublicAddress: *publicAddressOverride,
}
m := master.New(config)
roLocation := ""
if *readOnlyPort != 0 { if *readOnlyPort != 0 {
roLocation = net.JoinHostPort(config.PublicAddress, strconv.Itoa(config.ReadOnlyPort))
}
rwLocation := net.JoinHostPort(address.String(), strconv.Itoa(int(*port)))
// See the flag commentary to understand our assumptions when opening the read-only and read-write ports.
if roLocation != "" {
// Allow 1 read-only request per second, allow up to 20 in a burst before enforcing. // Allow 1 read-only request per second, allow up to 20 in a burst before enforcing.
rl := util.NewTokenBucketRateLimiter(1.0, 20) rl := util.NewTokenBucketRateLimiter(1.0, 20)
readOnlyServer := &http.Server{ readOnlyServer := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.Itoa(int(*readOnlyPort))), Addr: roLocation,
Handler: apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.Handler))), Handler: apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.Handler))),
ReadTimeout: 5 * time.Minute, ReadTimeout: 5 * time.Minute,
WriteTimeout: 5 * time.Minute, WriteTimeout: 5 * time.Minute,
@ -226,7 +253,7 @@ func main() {
} }
s := &http.Server{ s := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.Itoa(int(*port))), Addr: rwLocation,
Handler: apiserver.RecoverPanics(m.Handler), Handler: apiserver.RecoverPanics(m.Handler),
ReadTimeout: 5 * time.Minute, ReadTimeout: 5 * time.Minute,
WriteTimeout: 5 * time.Minute, WriteTimeout: 5 * time.Minute,

View File

@ -19,7 +19,9 @@ package master
import ( import (
"net" "net"
"net/http" "net/http"
"strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -34,6 +36,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
cloudcontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" cloudcontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/election"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
@ -70,6 +73,16 @@ type Config struct {
APIPrefix string APIPrefix string
CorsAllowedOriginList util.StringList CorsAllowedOriginList util.StringList
TokenAuthFile string TokenAuthFile string
// The port on PublicAddress where a read-only server will be installed.
// Defaults to 7080 if not set.
ReadOnlyPort int
// The port on PublicAddress where a read-write server will be installed.
// Defaults to 443 if not set.
ReadWritePort int
// If empty, the first result from net.InterfaceAddrs will be used.
PublicAddress string
} }
// Master contains state for a Kubernetes cluster master/api server. // Master contains state for a Kubernetes cluster master/api server.
@ -91,8 +104,18 @@ type Master struct {
apiPrefix string apiPrefix string
corsAllowedOriginList util.StringList corsAllowedOriginList util.StringList
tokenAuthFile string tokenAuthFile string
// "Outputs" // "Outputs"
Handler http.Handler Handler http.Handler
elector election.MasterElector
readOnlyServer string
readWriteServer string
electedMasterServices *util.Runner
// lock must be held when accessing the below read-write members.
lock sync.RWMutex
electedMaster election.Master
} }
// NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version
@ -108,8 +131,44 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe
return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.MetadataAccessor}}, nil return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.MetadataAccessor}}, nil
} }
// setDefaults fills in any fields not set that are required to have valid data.
func setDefaults(c *Config) {
if c.ReadOnlyPort == 0 {
c.ReadOnlyPort = 7080
}
if c.ReadWritePort == 0 {
c.ReadWritePort = 443
}
if c.PublicAddress == "" {
addrs, err := net.InterfaceAddrs()
if err != nil {
glog.Fatalf("Unable to get network interfaces: error='%v'", err)
}
found := false
for i := range addrs {
ip, _, err := net.ParseCIDR(addrs[i].String())
if err != nil {
glog.Errorf("Error parsing '%v': %v", addrs[i], err)
continue
}
if ip.IsLoopback() {
glog.Infof("'%v' (%v) is a loopback address, ignoring.", ip, addrs[i])
continue
}
found = true
c.PublicAddress = ip.String()
glog.Infof("Will report %v as public IP address.", ip)
break
}
if !found {
glog.Fatalf("Unable to find suitible network address in list: %v", addrs)
}
}
}
// New returns a new instance of Master connected to the given etcd server. // New returns a new instance of Master connected to the given etcd server.
func New(c *Config) *Master { func New(c *Config) *Master {
setDefaults(c)
minionRegistry := makeMinionRegistry(c) minionRegistry := makeMinionRegistry(c)
serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil) serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil)
boundPodFactory := &pod.BasicBoundPodFactory{ boundPodFactory := &pod.BasicBoundPodFactory{
@ -131,7 +190,11 @@ func New(c *Config) *Master {
apiPrefix: c.APIPrefix, apiPrefix: c.APIPrefix,
corsAllowedOriginList: c.CorsAllowedOriginList, corsAllowedOriginList: c.CorsAllowedOriginList,
tokenAuthFile: c.TokenAuthFile, tokenAuthFile: c.TokenAuthFile,
elector: election.NewEtcdMasterElector(c.EtcdHelper.Client),
readOnlyServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadOnlyPort))),
readWriteServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadWritePort))),
} }
m.electedMasterServices = util.NewRunner(m.serviceWriterLoop, m.electionAnnounce)
m.init(c) m.init(c)
return m return m
} }
@ -188,6 +251,7 @@ func (m *Master) init(c *Config) {
// TODO: should appear only in scheduler API group. // TODO: should appear only in scheduler API group.
"bindings": binding.NewREST(m.bindingRegistry), "bindings": binding.NewREST(m.bindingRegistry),
} }
apiserver.NewAPIGroup(m.API_v1beta1()).InstallREST(m.mux, c.APIPrefix+"/v1beta1") apiserver.NewAPIGroup(m.API_v1beta1()).InstallREST(m.mux, c.APIPrefix+"/v1beta1")
apiserver.NewAPIGroup(m.API_v1beta2()).InstallREST(m.mux, c.APIPrefix+"/v1beta2") apiserver.NewAPIGroup(m.API_v1beta2()).InstallREST(m.mux, c.APIPrefix+"/v1beta2")
versionHandler := apiserver.APIVersionHandler("v1beta1", "v1beta2") versionHandler := apiserver.APIVersionHandler("v1beta1", "v1beta2")
@ -216,6 +280,19 @@ func (m *Master) init(c *Config) {
m.mux.HandleFunc("/_whoami", handleWhoAmI(authenticator)) m.mux.HandleFunc("/_whoami", handleWhoAmI(authenticator))
m.Handler = handler m.Handler = handler
if m.readWriteServer != "" {
glog.Infof("Starting election services as %v", m.readWriteServer)
go election.Notify(m.elector, "/registry/elections/k8smaster", m.readWriteServer, m.electedMasterServices)
}
// TODO: start a goroutine to report ourselves to the elected master.
}
func (m *Master) electionAnnounce(stop chan struct{}) {
glog.Infof("Elected as master")
<-stop
glog.Info("Lost election for master")
} }
// API_v1beta1 returns the resources and codec for API version v1beta1. // API_v1beta1 returns the resources and codec for API version v1beta1.

113
pkg/master/publish.go Normal file
View File

@ -0,0 +1,113 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master
import (
"fmt"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/golang/glog"
)
func (m *Master) serviceWriterLoop(stop chan struct{}) {
for {
// Update service & endpoint records.
// TODO: when it becomes possible to change this stuff,
// stop polling and start watching.
// TODO: add endpoints of all replicas, not just the elected master.
if m.readWriteServer != "" {
if err := m.createMasterServiceIfNeeded("kubernetes", 443); err != nil {
glog.Errorf("Can't create rw service: %v", err)
}
if err := m.setEndpoints("kubernetes", []string{m.readWriteServer}); err != nil {
glog.Errorf("Can't create rw endpoints: %v", err)
}
} else {
m.deleteMasterService("kubernetes")
}
if m.readOnlyServer != "" {
if err := m.createMasterServiceIfNeeded("kubernetes-ro", 80); err != nil {
glog.Errorf("Can't create ro service: %v", err)
}
if err := m.setEndpoints("kubernetes-ro", []string{m.readOnlyServer}); err != nil {
glog.Errorf("Can't create rw endpoints: %v", err)
}
} else {
m.deleteMasterService("kubernetes-ro")
}
select {
case <-stop:
return
case <-time.After(10 * time.Second):
}
}
}
// createMasterServiceIfNeeded will create the specified service if it
// doesn't already exist.
func (m *Master) createMasterServiceIfNeeded(serviceName string, port int) error {
ctx := api.NewDefaultContext()
if _, err := m.serviceRegistry.GetService(ctx, serviceName); err == nil {
// The service already exists.
return nil
}
svc := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: serviceName,
Namespace: "default",
},
Port: port,
// We're going to add the endpoints by hand, so this selector is mainly to
// prevent identification of other pods. This selector will be useful when
// we start hosting apiserver in a pod.
Selector: map[string]string{"provider": "kubernetes", "component": "apiserver"},
}
// Kids, don't do this at home: this is a hack. There's no good way to call the business
// logic which lives in the REST object from here.
c, err := m.storage["services"].Create(ctx, svc)
if err != nil {
return err
}
resp := <-c
if _, ok := resp.(*api.Service); ok {
// If all worked, we get back an *api.Service object.
return nil
}
return fmt.Errorf("Unexpected response: %#v", resp)
}
func (m *Master) deleteMasterService(serviceName string) {
ctx := api.NewDefaultContext()
m.serviceRegistry.DeleteService(ctx, serviceName)
}
// setEndpoints sets the endpoints for the given service.
func (m *Master) setEndpoints(serviceName string, endpoints []string) error {
ctx := api.NewDefaultContext()
e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName)
if err != nil {
e = &api.Endpoints{}
// Fill in ID if it didn't exist already
e.ObjectMeta.Name = serviceName
e.ObjectMeta.Namespace = "default"
}
e.Endpoints = endpoints
return m.endpointRegistry.UpdateEndpoints(ctx, e)
}

View File

@ -48,6 +48,9 @@ type REST struct {
func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, portalNet *net.IPNet) *REST { func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, portalNet *net.IPNet) *REST {
// TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd) // TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd)
ipa := newIPAllocator(portalNet) ipa := newIPAllocator(portalNet)
if ipa == nil {
glog.Fatalf("Failed to create an IP allocator. Is subnet '%v' valid?", portalNet)
}
reloadIPsFromStorage(ipa, registry) reloadIPsFromStorage(ipa, registry)
return &REST{ return &REST{

View File

@ -51,6 +51,12 @@ func (e *EndpointController) SyncServiceEndpoints() error {
} }
var resultErr error var resultErr error
for _, service := range services.Items { for _, service := range services.Items {
if service.Name == "kubernetes" || service.Name == "kubernetes-ro" {
// This is a temporary hack for supporting the master services
// until we actually start running apiserver in a pod.
continue
}
glog.Infof("About to update endpoints for service %v", service.Name)
pods, err := e.client.Pods(service.Namespace).List(labels.Set(service.Selector).AsSelector()) pods, err := e.client.Pods(service.Namespace).List(labels.Set(service.Selector).AsSelector())
if err != nil { if err != nil {
glog.Errorf("Error syncing service: %#v, skipping.", service) glog.Errorf("Error syncing service: %#v, skipping.", service)