k3s/pkg/cloudprovider/servicecontroller/servicecontroller.go

434 lines
14 KiB
Go
Raw Normal View History

/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package servicecontroller
import (
"fmt"
"net"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/golang/glog"
)
const (
clientRetryCount = 5
clientRetryInterval = 5 * time.Second
retryable = true
notRetryable = false
)
type ServiceController struct {
cloud cloudprovider.Interface
kubeClient client.Interface
clusterName string
balancer cloudprovider.TCPLoadBalancer
zone cloudprovider.Zone
mu sync.Mutex // protects serviceMap
serviceMap map[string]*api.Service // keys generated by cache.MetaNamespaceKeyFunc
}
// New returns a new service controller to keep cloud provider service resources
// (like external load balancers) in sync with the registry.
func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName string) *ServiceController {
return &ServiceController{
cloud: cloud,
kubeClient: kubeClient,
clusterName: clusterName,
serviceMap: make(map[string]*api.Service),
}
}
// Run starts a background goroutine that watches for changes to services that
// have (or had) externalLoadBalancers=true and ensures that they have external
// load balancers created and deleted appropriately.
func (s *ServiceController) Run() error {
if err := s.init(); err != nil {
return err
}
// We have to make this check beecause the ListWatch that we use in
// WatchServices requires Client functions that aren't in the interface
// for some reason.
if _, ok := s.kubeClient.(*client.Client); !ok {
return fmt.Errorf("ServiceController only works with real Client objects, but was passed something else satisfying the client Interface.")
}
go s.watchServices()
return nil
}
func (s *ServiceController) init() error {
if s.cloud == nil {
return fmt.Errorf("ServiceController should not be run without a cloudprovider.")
}
balancer, ok := s.cloud.TCPLoadBalancer()
if !ok {
return fmt.Errorf("the cloud provider does not support external TCP load balancers.")
}
s.balancer = balancer
zones, ok := s.cloud.Zones()
if !ok {
return fmt.Errorf("the cloud provider does not support zone enumeration, which is required for creating external load balancers.")
}
zone, err := zones.GetZone()
if err != nil {
return fmt.Errorf("failed to get zone from cloud provider, will not be able to create external load balancers: %v", err)
}
s.zone = zone
return nil
}
func (s *ServiceController) watchServices() {
// Get the currently existing set of services and then all future creates
// and updates of services.
// TODO: Add a compressor that intelligently squashes together updates?
keyLister := cache.KeyListerFunc(func() []string { return s.listKeys() })
serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, keyLister)
lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything())
cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run()
// TODO: Add proper retries rather than just re-adding to the queue?
for {
newItem := serviceQueue.Pop()
deltas, ok := newItem.(cache.Deltas)
if !ok {
glog.Errorf("Received object from service watcher that wasn't Deltas: %+v", newItem)
}
delta := deltas.Newest()
if delta == nil {
glog.Errorf("Received nil delta from watcher queue.")
continue
}
err, shouldRetry := s.processDelta(delta)
if shouldRetry {
// Add the failed service back to the queue so we'll retry it.
glog.Errorf("Failed to process service delta. Retrying: %v", err)
time.Sleep(5 * time.Second)
serviceQueue.AddIfNotPresent(deltas)
} else if err != nil {
glog.Errorf("Failed to process service delta. Not retrying: %v", err)
}
}
}
// Returns an error if processing the delta failed, along with a boolean
// indicator of whether the processing should be retried.
func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
service, ok := delta.Object.(*api.Service)
if !ok {
// If the DeltaFIFO saw a key in our cache that it didn't know about, it
// can send a deletion with an unknown state. Grab the service from our
// cache for deleting.
key, ok := delta.Object.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable
}
service, ok = s.getService(key.Key)
if !ok {
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable
}
delta.Object = service
}
glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, service)
// TODO: Make this more parallel. The only things that need to serialized
// are changes to services with the same namespace and name.
// TODO: Handle added, updated, and sync differently?
switch delta.Type {
case cache.Added:
fallthrough
case cache.Updated:
fallthrough
case cache.Sync:
return s.createLoadBalancerIfNeeded(service)
case cache.Deleted:
return s.handleDelete(service)
default:
glog.Errorf("Unexpected delta type: %v", delta.Type)
}
return nil, notRetryable
}
// Returns whatever error occurred along with a boolean indicator of whether it
// should be retried.
func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (error, bool) {
namespacedName, err := cache.MetaNamespaceKeyFunc(service)
if err != nil {
return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable
}
cachedService, cached := s.getService(namespacedName)
if cached && !needsUpdate(cachedService, service) {
glog.Infof("LB already exists and doesn't need update for service %s", namespacedName)
return nil, notRetryable
}
if cached {
// If the service already exists but needs to be updated, delete it so that
// we can recreate it cleanly.
if cachedService.Spec.CreateExternalLoadBalancer {
glog.Infof("Deleting existing load balancer for service %s that needs an updated load balancer.", namespacedName)
if err := s.ensureLBDeleted(cachedService); err != nil {
return err, retryable
}
}
} else {
// If we don't have any cached memory of the load balancer and it already
// exists, optimistically consider our work done.
// TODO: If we could read the spec of the existing load balancer, we could
// determine if an update is necessary.
exists, err := s.balancer.TCPLoadBalancerExists(s.loadBalancerName(service), s.zone.Region)
if err != nil {
return fmt.Errorf("Error getting LB for service %s", namespacedName), retryable
}
if exists && len(service.Spec.PublicIPs) == 0 {
// The load balancer exists, but we apparently don't know about its public
// IPs, so just delete it and recreate it to get back to a sane state.
// TODO: Ideally the cloud provider interface would return the IP for us.
glog.Infof("Deleting old LB for service with no public IPs %s", namespacedName)
if err := s.ensureLBDeleted(service); err != nil {
return err, retryable
}
} else if exists {
// TODO: Better handle updates for non-cached services, this is optimistic.
glog.Infof("LB already exists for service %s", namespacedName)
return nil, notRetryable
}
}
if !service.Spec.CreateExternalLoadBalancer {
glog.Infof("Not creating LB for service %s that doesn't want one.", namespacedName)
return nil, notRetryable
}
glog.V(2).Infof("Creating LB for service %s", namespacedName)
// The load balancer doesn't exist yet, so create it.
publicIPstring := fmt.Sprint(service.Spec.PublicIPs)
err = s.createExternalLoadBalancer(service)
if err != nil {
return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable
}
if publicIPstring == fmt.Sprint(service.Spec.PublicIPs) {
glog.Infof("Not persisting unchanged service to registry.")
return nil, notRetryable
}
s.setService(namespacedName, service)
// If creating the load balancer succeeded, persist the updated service.
if err = s.persistUpdate(service); err != nil {
return fmt.Errorf("Failed to persist updated publicIPs to apiserver, even after retries. Giving up: %v", err), notRetryable
}
return nil, notRetryable
}
// TODO(a-robinson): Handle repeated failures due to ResourceVersion changes or
// the object having been deleted.
func (s *ServiceController) persistUpdate(service *api.Service) error {
var err error
for i := 0; i < clientRetryCount; i++ {
_, err = s.kubeClient.Services(service.Namespace).Update(service)
if err == nil {
return nil
}
glog.Warningf("Failed to persist updated PublicIPs to service %s after creating its external load balancer: %v",
service.Name, err)
time.Sleep(clientRetryInterval)
}
return err
}
func (s *ServiceController) createExternalLoadBalancer(service *api.Service) error {
ports, err := getTCPPorts(service)
if err != nil {
return err
}
nodes, err := s.kubeClient.Nodes().List(labels.Everything())
if err != nil {
return err
}
name := s.loadBalancerName(service)
if len(service.Spec.PublicIPs) > 0 {
for _, publicIP := range service.Spec.PublicIPs {
// TODO: Make this actually work for multiple IPs by using different
// names for each. For now, we'll just create the first and break.
endpoint, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP),
ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity)
if err != nil {
return err
}
service.Spec.PublicIPs = []string{endpoint}
break
}
} else {
endpoint, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil,
ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity)
if err != nil {
return err
}
service.Spec.PublicIPs = []string{endpoint}
}
return nil
}
// Returns whatever error occurred along with a boolean indicator of whether it
// should be retried.
func (s *ServiceController) handleDelete(service *api.Service) (error, bool) {
if err := s.ensureLBDeleted(service); err != nil {
return err, retryable
}
namespacedName, err := cache.MetaNamespaceKeyFunc(service)
if err != nil {
// This is panic-worthy, since the queue shouldn't have been able to
// handle the service if it couldn't generate a name for it.
return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable
}
s.deleteService(namespacedName)
return nil, notRetryable
}
// Ensures that the load balancer associated with the given service is deleted,
// doing the deletion if necessary.
func (s *ServiceController) ensureLBDeleted(service *api.Service) error {
// This is only needed because not all delete load balancer implementations
// are currently idempotent to the LB not existing.
if exists, err := s.balancer.TCPLoadBalancerExists(s.loadBalancerName(service), s.zone.Region); err != nil {
return err
} else if !exists {
return nil
}
if err := s.balancer.DeleteTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region); err != nil {
return err
}
return nil
}
// listKeys implements the interface required by DeltaFIFO to list the keys we
// already know about.
func (s *ServiceController) listKeys() []string {
s.mu.Lock()
defer s.mu.Unlock()
keys := make([]string, 0, len(s.serviceMap))
for k := range s.serviceMap {
keys = append(keys, k)
}
return keys
}
func (s *ServiceController) getService(serviceName string) (*api.Service, bool) {
s.mu.Lock()
defer s.mu.Unlock()
info, ok := s.serviceMap[serviceName]
return info, ok
}
func (s *ServiceController) setService(serviceName string, info *api.Service) {
s.mu.Lock()
defer s.mu.Unlock()
s.serviceMap[serviceName] = info
}
func (s *ServiceController) deleteService(serviceName string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.serviceMap, serviceName)
}
func needsUpdate(oldService *api.Service, newService *api.Service) bool {
if !oldService.Spec.CreateExternalLoadBalancer && !newService.Spec.CreateExternalLoadBalancer {
return false
}
if oldService.Spec.CreateExternalLoadBalancer != newService.Spec.CreateExternalLoadBalancer {
return true
}
if !portsEqual(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
return true
}
if len(oldService.Spec.PublicIPs) != len(newService.Spec.PublicIPs) {
return true
}
for i := range oldService.Spec.PublicIPs {
if oldService.Spec.PublicIPs[i] != newService.Spec.PublicIPs[i] {
return true
}
}
return false
}
// TODO: Use a shorter name that's less likely to be longer than cloud
// providers' length limits.
func (s *ServiceController) loadBalancerName(service *api.Service) string {
return s.cloud.GetLoadBalancerName(s.clusterName, service.Namespace, service.Name)
}
func getTCPPorts(service *api.Service) ([]int, error) {
ports := []int{}
for i := range service.Spec.Ports {
// TODO: Support UDP. Remove the check from the API validation package once
// it's supported.
sp := &service.Spec.Ports[i]
if sp.Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("external load balancers for non TCP services are not currently supported.")
}
ports = append(ports, sp.Port)
}
return ports, nil
}
func portsEqual(x, y *api.Service) bool {
xPorts, err := getTCPPorts(x)
if err != nil {
return false
}
yPorts, err := getTCPPorts(y)
if err != nil {
return false
}
if len(xPorts) != len(yPorts) {
return false
}
// Use a map for comparison since port slices aren't necessarily sorted.
xPortMap := make(map[int]bool)
for _, xPort := range xPorts {
xPortMap[xPort] = true
}
for _, yPort := range yPorts {
if !xPortMap[yPort] {
return false
}
}
return true
}
func hostsFromNodeList(list *api.NodeList) []string {
result := make([]string, len(list.Items))
for ix := range list.Items {
result[ix] = list.Items[ix].Name
}
return result
}