Add old endpoint cleanup function

pull/6/head
Daniel Smith 2015-04-24 14:16:27 -07:00
parent 221553b147
commit b49dd0ad1e
4 changed files with 88 additions and 0 deletions

View File

@ -132,6 +132,11 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go util.Until(e.worker, time.Second, stopCh)
}
go func() {
defer util.HandleCrash()
time.Sleep(5 * time.Minute) // give time for our cache to fill
e.checkLeftoverEndpoints()
}()
<-stopCh
e.queue.ShutDown()
}
@ -370,6 +375,29 @@ func (e *EndpointController) syncService(key string) {
}
}
// checkLeftoverEndpoints lists all currently existing endpoints and adds their
// service to the queue. This will detect endpoints that exist with no
// corresponding service; these endpoints need to be deleted. We only need to
// do this once on startup, because in steady-state these are detected (but
// some stragglers could have been left behind if the endpoint controller
// reboots).
func (e *EndpointController) checkLeftoverEndpoints() {
list, err := e.client.Endpoints(api.NamespaceAll).List(labels.Everything())
if err != nil {
glog.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)
return
}
for i := range list.Items {
ep := &list.Items[i]
key, err := keyFunc(ep)
if err != nil {
glog.Errorf("Unable to get key for endpoint %#v", ep)
continue
}
e.queue.Add(key)
}
}
func findDefaultPort(pod *api.Pod, servicePort int, proto api.Protocol) int {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {

View File

@ -262,6 +262,41 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
endpointsHandler.ValidateRequestCount(t, 0)
}
func TestCheckLeftoverEndpoints(t *testing.T) {
ns := api.NamespaceDefault
// Note that this requests *all* endpoints, therefore the NamespaceAll
// below.
testServer, _ := makeTestServer(t, api.NamespaceAll,
serverResponse{http.StatusOK, &api.EndpointsList{
ListMeta: api.ListMeta{
ResourceVersion: "1",
},
Items: []api.Endpoints{{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "6.7.8.9"}},
Ports: []api.EndpointPort{{Port: 1000}},
}},
}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client)
endpoints.checkLeftoverEndpoints()
if e, a := 1, endpoints.queue.Len(); e != a {
t.Fatalf("Expected %v, got %v", e, a)
}
got, _ := endpoints.queue.Get()
if e, a := ns+"/foo", got; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestSyncEndpointsProtocolTCP(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns,

View File

@ -85,6 +85,15 @@ func (q *Type) Add(item interface{}) {
q.cond.Signal()
}
// Len returns the current queue length, for informational purposes only. You
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
// value, that can't be synchronized properly.
func (q *Type) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.queue)
}
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.

View File

@ -113,3 +113,19 @@ func TestAddWhileProcessing(t *testing.T) {
q.ShutDown()
consumerWG.Wait()
}
func TestLen(t *testing.T) {
q := workqueue.New()
q.Add("foo")
if e, a := 1, q.Len(); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
q.Add("bar")
if e, a := 2, q.Len(); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
q.Add("foo") // should not increase the queue length.
if e, a := 2, q.Len(); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}