From 8ed0bc12504b05bac0f1f4334ba26708207d9987 Mon Sep 17 00:00:00 2001 From: Ferran Rodenas Date: Thu, 19 Oct 2017 15:44:29 +0200 Subject: [PATCH] Send events on ip and port allocator repair controller errors Signed-off-by: Ferran Rodenas --- pkg/master/controller.go | 8 ++++--- pkg/master/master.go | 2 +- .../core/service/ipallocator/controller/BUILD | 3 +++ .../service/ipallocator/controller/repair.go | 24 ++++++++++++++----- .../ipallocator/controller/repair_test.go | 8 +++---- .../service/portallocator/controller/BUILD | 3 +++ .../portallocator/controller/repair.go | 23 +++++++++++++----- .../portallocator/controller/repair_test.go | 8 +++---- 8 files changed, 55 insertions(+), 24 deletions(-) diff --git a/pkg/master/controller.go b/pkg/master/controller.go index ef919b2963..291078b271 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -48,6 +48,7 @@ const kubernetesServiceName = "kubernetes" type Controller struct { ServiceClient coreclient.ServicesGetter NamespaceClient coreclient.NamespacesGetter + EventClient coreclient.EventsGetter ServiceClusterIPRegistry rangeallocation.RangeRegistry ServiceClusterIPInterval time.Duration @@ -77,10 +78,11 @@ type Controller struct { } // NewBootstrapController returns a controller for watching the core capabilities of the master -func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient coreclient.ServicesGetter, nsClient coreclient.NamespacesGetter) *Controller { +func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient coreclient.ServicesGetter, nsClient coreclient.NamespacesGetter, eventClient coreclient.EventsGetter) *Controller { return &Controller{ ServiceClient: serviceClient, NamespaceClient: nsClient, + EventClient: eventClient, EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler, EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval, @@ -124,8 +126,8 @@ func (c *Controller) Start() { return } - repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry) - repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry) + repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry) + repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry) // run all of the controllers once prior to returning from Start. if err := repairClusterIPs.RunOnce(); err != nil { diff --git a/pkg/master/master.go b/pkg/master/master.go index b20a6f5e92..27353e4d9c 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -374,7 +374,7 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic. if c.ExtraConfig.EnableCoreControllers { controllerName := "bootstrap-controller" coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) - bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient) + bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) } diff --git a/pkg/registry/core/service/ipallocator/controller/BUILD b/pkg/registry/core/service/ipallocator/controller/BUILD index b6a8f0a64d..43498a212e 100644 --- a/pkg/registry/core/service/ipallocator/controller/BUILD +++ b/pkg/registry/core/service/ipallocator/controller/BUILD @@ -11,15 +11,18 @@ go_library( srcs = ["repair.go"], importpath = "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller", deps = [ + "//pkg/api/legacyscheme:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/apis/core/helper:go_default_library", "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", "//pkg/registry/core/rangeallocation:go_default_library", "//pkg/registry/core/service/ipallocator:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/util/retry:go_default_library", ], ) diff --git a/pkg/registry/core/service/ipallocator/controller/repair.go b/pkg/registry/core/service/ipallocator/controller/repair.go index fa2cae7f3a..b4aaf1c289 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair.go +++ b/pkg/registry/core/service/ipallocator/controller/repair.go @@ -21,11 +21,14 @@ import ( "net" "time" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" + "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core/helper" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" @@ -54,6 +57,7 @@ type Repair struct { network *net.IPNet alloc rangeallocation.RangeRegistry leaks map[string]int // counter per leaked IP + recorder record.EventRecorder } // How many times we need to detect a leak before we clean up. This is to @@ -62,13 +66,18 @@ const numRepairsBeforeLeakCleanup = 3 // NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster // and generates informational warnings for a cluster that is not in sync. -func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair { +func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, eventClient coreclient.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&coreclient.EventSinkImpl{Interface: eventClient.Events("")}) + recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "ipallocator-repair-controller"}) + return &Repair{ interval: interval, serviceClient: serviceClient, network: network, alloc: alloc, leaks: map[string]int{}, + recorder: recorder, } } @@ -136,6 +145,7 @@ func (c *Repair) runOnce() error { ip := net.ParseIP(svc.Spec.ClusterIP) if ip == nil { // cluster IP is corrupt + c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotValid", "Cluster IP %s is not a valid IP; please recreate service", svc.Spec.ClusterIP) runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace)) continue } @@ -147,22 +157,24 @@ func (c *Repair) runOnce() error { stored.Release(ip) } else { // cluster IP doesn't seem to be allocated - runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not allocated; repairing", svc.Spec.ClusterIP, svc.Name, svc.Namespace)) + c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotAllocated", "Cluster IP %s is not allocated; repairing", ip) + runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not allocated; repairing", ip, svc.Name, svc.Namespace)) } delete(c.leaks, ip.String()) // it is used, so it can't be leaked case ipallocator.ErrAllocated: - // TODO: send event // cluster IP is duplicate + c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "Cluster IP %s was assigned to multiple services; please recreate service", ip) runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace)) case err.(*ipallocator.ErrNotInRange): - // TODO: send event // cluster IP is out of range + c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPOutOfRange", "Cluster IP %s is not within the service CIDR %s; please recreate service", ip, c.network) runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network)) case ipallocator.ErrFull: - // TODO: send event // somehow we are out of IPs - return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", rebuilt) + c.recorder.Eventf(&svc, v1.EventTypeWarning, "ServiceCIDRFull", "Service CIDR %s is full; you must widen the CIDR in order to create new services", c.network) + return fmt.Errorf("the service CIDR %s is full; you must widen the CIDR in order to create new services", c.network) default: + c.recorder.Eventf(&svc, v1.EventTypeWarning, "UnknownError", "Unable to allocate cluster IP %s due to an unknown error", ip) return fmt.Errorf("unable to allocate cluster IP %s for service %s/%s due to an unknown error, exiting: %v", ip, svc.Name, svc.Namespace, err) } } diff --git a/pkg/registry/core/service/ipallocator/controller/repair_test.go b/pkg/registry/core/service/ipallocator/controller/repair_test.go index 51392b843a..0bbd6f5959 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair_test.go +++ b/pkg/registry/core/service/ipallocator/controller/repair_test.go @@ -55,7 +55,7 @@ func TestRepair(t *testing.T) { item: &api.RangeAllocation{Range: "192.168.1.0/24"}, } _, cidr, _ := net.ParseCIDR(ipregistry.item.Range) - r := NewRepair(0, fakeClient.Core(), cidr, ipregistry) + r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), cidr, ipregistry) if err := r.RunOnce(); err != nil { t.Fatal(err) @@ -68,7 +68,7 @@ func TestRepair(t *testing.T) { item: &api.RangeAllocation{Range: "192.168.1.0/24"}, updateErr: fmt.Errorf("test error"), } - r = NewRepair(0, fakeClient.Core(), cidr, ipregistry) + r = NewRepair(0, fakeClient.Core(), fakeClient.Core(), cidr, ipregistry) if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { t.Fatal(err) } @@ -96,7 +96,7 @@ func TestRepairLeak(t *testing.T) { }, } - r := NewRepair(0, fakeClient.Core(), cidr, ipregistry) + r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), cidr, ipregistry) // Run through the "leak detection holdoff" loops. for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { if err := r.RunOnce(); err != nil { @@ -169,7 +169,7 @@ func TestRepairWithExisting(t *testing.T) { Data: dst.Data, }, } - r := NewRepair(0, fakeClient.Core(), cidr, ipregistry) + r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), cidr, ipregistry) if err := r.RunOnce(); err != nil { t.Fatal(err) } diff --git a/pkg/registry/core/service/portallocator/controller/BUILD b/pkg/registry/core/service/portallocator/controller/BUILD index 01408f6fba..3483044bcd 100644 --- a/pkg/registry/core/service/portallocator/controller/BUILD +++ b/pkg/registry/core/service/portallocator/controller/BUILD @@ -11,16 +11,19 @@ go_library( srcs = ["repair.go"], importpath = "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller", deps = [ + "//pkg/api/legacyscheme:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", "//pkg/registry/core/rangeallocation:go_default_library", "//pkg/registry/core/service:go_default_library", "//pkg/registry/core/service/portallocator:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/util/retry:go_default_library", ], ) diff --git a/pkg/registry/core/service/portallocator/controller/repair.go b/pkg/registry/core/service/portallocator/controller/repair.go index e02ca3ef1e..e7024ded05 100644 --- a/pkg/registry/core/service/portallocator/controller/repair.go +++ b/pkg/registry/core/service/portallocator/controller/repair.go @@ -20,12 +20,15 @@ import ( "fmt" "time" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" + "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" "k8s.io/kubernetes/pkg/registry/core/rangeallocation" @@ -40,6 +43,7 @@ type Repair struct { portRange net.PortRange alloc rangeallocation.RangeRegistry leaks map[int]int // counter per leaked port + recorder record.EventRecorder } // How many times we need to detect a leak before we clean up. This is to @@ -48,13 +52,18 @@ const numRepairsBeforeLeakCleanup = 3 // NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster // and generates informational warnings for a cluster that is not in sync. -func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair { +func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, eventClient coreclient.EventsGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&coreclient.EventSinkImpl{Interface: eventClient.Events("")}) + recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "portallocator-repair-controller"}) + return &Repair{ interval: interval, serviceClient: serviceClient, portRange: portRange, alloc: alloc, leaks: map[int]int{}, + recorder: recorder, } } @@ -130,22 +139,24 @@ func (c *Repair) runOnce() error { stored.Release(port) } else { // doesn't seem to be allocated + c.recorder.Eventf(svc, v1.EventTypeWarning, "PortNotAllocated", "Port %d is not allocated; repairing", port) runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace)) } delete(c.leaks, port) // it is used, so it can't be leaked case portallocator.ErrAllocated: - // TODO: send event // port is duplicate, reallocate + c.recorder.Eventf(svc, v1.EventTypeWarning, "PortAlreadyAllocated", "Port %d was assigned to multiple services; please recreate service", port) runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace)) case err.(*portallocator.ErrNotInRange): - // TODO: send event // port is out of range, reallocate - runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange)) + c.recorder.Eventf(svc, v1.EventTypeWarning, "PortOutOfRange", "Port %d is not within the port range %s; please recreate service", port, c.portRange) + runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %s; please recreate", port, svc.Name, svc.Namespace, c.portRange)) case portallocator.ErrFull: - // TODO: send event // somehow we are out of ports - return fmt.Errorf("the port range %v is full; you must widen the port range in order to create new services", c.portRange) + c.recorder.Eventf(svc, v1.EventTypeWarning, "PortRangeFull", "Port range %s is full; you must widen the port range in order to create new services", c.portRange) + return fmt.Errorf("the port range %s is full; you must widen the port range in order to create new services", c.portRange) default: + c.recorder.Eventf(svc, v1.EventTypeWarning, "UnknownError", "Unable to allocate port %d due to an unknown error", port) return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err) } } diff --git a/pkg/registry/core/service/portallocator/controller/repair_test.go b/pkg/registry/core/service/portallocator/controller/repair_test.go index a011606253..151c791cc3 100644 --- a/pkg/registry/core/service/portallocator/controller/repair_test.go +++ b/pkg/registry/core/service/portallocator/controller/repair_test.go @@ -55,7 +55,7 @@ func TestRepair(t *testing.T) { item: &api.RangeAllocation{Range: "100-200"}, } pr, _ := net.ParsePortRange(registry.item.Range) - r := NewRepair(0, fakeClient.Core(), *pr, registry) + r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), *pr, registry) if err := r.RunOnce(); err != nil { t.Fatal(err) @@ -68,7 +68,7 @@ func TestRepair(t *testing.T) { item: &api.RangeAllocation{Range: "100-200"}, updateErr: fmt.Errorf("test error"), } - r = NewRepair(0, fakeClient.Core(), *pr, registry) + r = NewRepair(0, fakeClient.Core(), fakeClient.Core(), *pr, registry) if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { t.Fatal(err) } @@ -96,7 +96,7 @@ func TestRepairLeak(t *testing.T) { }, } - r := NewRepair(0, fakeClient.Core(), *pr, registry) + r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), *pr, registry) // Run through the "leak detection holdoff" loops. for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { if err := r.RunOnce(); err != nil { @@ -175,7 +175,7 @@ func TestRepairWithExisting(t *testing.T) { Data: dst.Data, }, } - r := NewRepair(0, fakeClient.Core(), *pr, registry) + r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), *pr, registry) if err := r.RunOnce(); err != nil { t.Fatal(err) }