Send events on ip and port allocator repair controller errors

Signed-off-by: Ferran Rodenas <rodenasf@vmware.com>
pull/6/head
Ferran Rodenas 2017-10-19 15:44:29 +02:00
parent 64ccd7665f
commit 8ed0bc1250
8 changed files with 55 additions and 24 deletions

View File

@ -48,6 +48,7 @@ const kubernetesServiceName = "kubernetes"
type Controller struct {
ServiceClient coreclient.ServicesGetter
NamespaceClient coreclient.NamespacesGetter
EventClient coreclient.EventsGetter
ServiceClusterIPRegistry rangeallocation.RangeRegistry
ServiceClusterIPInterval time.Duration
@ -77,10 +78,11 @@ type Controller struct {
}
// NewBootstrapController returns a controller for watching the core capabilities of the master
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient coreclient.ServicesGetter, nsClient coreclient.NamespacesGetter) *Controller {
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient coreclient.ServicesGetter, nsClient coreclient.NamespacesGetter, eventClient coreclient.EventsGetter) *Controller {
return &Controller{
ServiceClient: serviceClient,
NamespaceClient: nsClient,
EventClient: eventClient,
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
@ -124,8 +126,8 @@ func (c *Controller) Start() {
return
}
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
// run all of the controllers once prior to returning from Start.
if err := repairClusterIPs.RunOnce(); err != nil {

View File

@ -374,7 +374,7 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.
if c.ExtraConfig.EnableCoreControllers {
controllerName := "bootstrap-controller"
coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient)
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
}

View File

@ -11,15 +11,18 @@ go_library(
srcs = ["repair.go"],
importpath = "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/helper:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/registry/core/rangeallocation:go_default_library",
"//pkg/registry/core/service/ipallocator:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
],
)

View File

@ -21,11 +21,14 @@ import (
"net"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/helper"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
@ -54,6 +57,7 @@ type Repair struct {
network *net.IPNet
alloc rangeallocation.RangeRegistry
leaks map[string]int // counter per leaked IP
recorder record.EventRecorder
}
// How many times we need to detect a leak before we clean up. This is to
@ -62,13 +66,18 @@ const numRepairsBeforeLeakCleanup = 3
// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, eventClient coreclient.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&coreclient.EventSinkImpl{Interface: eventClient.Events("")})
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "ipallocator-repair-controller"})
return &Repair{
interval: interval,
serviceClient: serviceClient,
network: network,
alloc: alloc,
leaks: map[string]int{},
recorder: recorder,
}
}
@ -136,6 +145,7 @@ func (c *Repair) runOnce() error {
ip := net.ParseIP(svc.Spec.ClusterIP)
if ip == nil {
// cluster IP is corrupt
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotValid", "Cluster IP %s is not a valid IP; please recreate service", svc.Spec.ClusterIP)
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace))
continue
}
@ -147,22 +157,24 @@ func (c *Repair) runOnce() error {
stored.Release(ip)
} else {
// cluster IP doesn't seem to be allocated
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not allocated; repairing", svc.Spec.ClusterIP, svc.Name, svc.Namespace))
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotAllocated", "Cluster IP %s is not allocated; repairing", ip)
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not allocated; repairing", ip, svc.Name, svc.Namespace))
}
delete(c.leaks, ip.String()) // it is used, so it can't be leaked
case ipallocator.ErrAllocated:
// TODO: send event
// cluster IP is duplicate
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "Cluster IP %s was assigned to multiple services; please recreate service", ip)
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace))
case err.(*ipallocator.ErrNotInRange):
// TODO: send event
// cluster IP is out of range
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPOutOfRange", "Cluster IP %s is not within the service CIDR %s; please recreate service", ip, c.network)
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network))
case ipallocator.ErrFull:
// TODO: send event
// somehow we are out of IPs
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", rebuilt)
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ServiceCIDRFull", "Service CIDR %s is full; you must widen the CIDR in order to create new services", c.network)
return fmt.Errorf("the service CIDR %s is full; you must widen the CIDR in order to create new services", c.network)
default:
c.recorder.Eventf(&svc, v1.EventTypeWarning, "UnknownError", "Unable to allocate cluster IP %s due to an unknown error", ip)
return fmt.Errorf("unable to allocate cluster IP %s for service %s/%s due to an unknown error, exiting: %v", ip, svc.Name, svc.Namespace, err)
}
}

View File

@ -55,7 +55,7 @@ func TestRepair(t *testing.T) {
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
}
_, cidr, _ := net.ParseCIDR(ipregistry.item.Range)
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), cidr, ipregistry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
@ -68,7 +68,7 @@ func TestRepair(t *testing.T) {
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
updateErr: fmt.Errorf("test error"),
}
r = NewRepair(0, fakeClient.Core(), cidr, ipregistry)
r = NewRepair(0, fakeClient.Core(), fakeClient.Core(), cidr, ipregistry)
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
@ -96,7 +96,7 @@ func TestRepairLeak(t *testing.T) {
},
}
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), cidr, ipregistry)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
@ -169,7 +169,7 @@ func TestRepairWithExisting(t *testing.T) {
Data: dst.Data,
},
}
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), cidr, ipregistry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}

View File

@ -11,16 +11,19 @@ go_library(
srcs = ["repair.go"],
importpath = "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/registry/core/rangeallocation:go_default_library",
"//pkg/registry/core/service:go_default_library",
"//pkg/registry/core/service/portallocator:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
],
)

View File

@ -20,12 +20,15 @@ import (
"fmt"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
@ -40,6 +43,7 @@ type Repair struct {
portRange net.PortRange
alloc rangeallocation.RangeRegistry
leaks map[int]int // counter per leaked port
recorder record.EventRecorder
}
// How many times we need to detect a leak before we clean up. This is to
@ -48,13 +52,18 @@ const numRepairsBeforeLeakCleanup = 3
// NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, eventClient coreclient.EventsGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&coreclient.EventSinkImpl{Interface: eventClient.Events("")})
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "portallocator-repair-controller"})
return &Repair{
interval: interval,
serviceClient: serviceClient,
portRange: portRange,
alloc: alloc,
leaks: map[int]int{},
recorder: recorder,
}
}
@ -130,22 +139,24 @@ func (c *Repair) runOnce() error {
stored.Release(port)
} else {
// doesn't seem to be allocated
c.recorder.Eventf(svc, v1.EventTypeWarning, "PortNotAllocated", "Port %d is not allocated; repairing", port)
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace))
}
delete(c.leaks, port) // it is used, so it can't be leaked
case portallocator.ErrAllocated:
// TODO: send event
// port is duplicate, reallocate
c.recorder.Eventf(svc, v1.EventTypeWarning, "PortAlreadyAllocated", "Port %d was assigned to multiple services; please recreate service", port)
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace))
case err.(*portallocator.ErrNotInRange):
// TODO: send event
// port is out of range, reallocate
runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange))
c.recorder.Eventf(svc, v1.EventTypeWarning, "PortOutOfRange", "Port %d is not within the port range %s; please recreate service", port, c.portRange)
runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %s; please recreate", port, svc.Name, svc.Namespace, c.portRange))
case portallocator.ErrFull:
// TODO: send event
// somehow we are out of ports
return fmt.Errorf("the port range %v is full; you must widen the port range in order to create new services", c.portRange)
c.recorder.Eventf(svc, v1.EventTypeWarning, "PortRangeFull", "Port range %s is full; you must widen the port range in order to create new services", c.portRange)
return fmt.Errorf("the port range %s is full; you must widen the port range in order to create new services", c.portRange)
default:
c.recorder.Eventf(svc, v1.EventTypeWarning, "UnknownError", "Unable to allocate port %d due to an unknown error", port)
return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err)
}
}

View File

@ -55,7 +55,7 @@ func TestRepair(t *testing.T) {
item: &api.RangeAllocation{Range: "100-200"},
}
pr, _ := net.ParsePortRange(registry.item.Range)
r := NewRepair(0, fakeClient.Core(), *pr, registry)
r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), *pr, registry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
@ -68,7 +68,7 @@ func TestRepair(t *testing.T) {
item: &api.RangeAllocation{Range: "100-200"},
updateErr: fmt.Errorf("test error"),
}
r = NewRepair(0, fakeClient.Core(), *pr, registry)
r = NewRepair(0, fakeClient.Core(), fakeClient.Core(), *pr, registry)
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
@ -96,7 +96,7 @@ func TestRepairLeak(t *testing.T) {
},
}
r := NewRepair(0, fakeClient.Core(), *pr, registry)
r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), *pr, registry)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
@ -175,7 +175,7 @@ func TestRepairWithExisting(t *testing.T) {
Data: dst.Data,
},
}
r := NewRepair(0, fakeClient.Core(), *pr, registry)
r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), *pr, registry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}