2014-06-06 23:40:48 +00:00
/ *
2016-06-03 00:25:58 +00:00
Copyright 2014 The Kubernetes Authors .
2014-06-06 23:40:48 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2014-06-18 23:01:49 +00:00
2015-06-11 13:13:19 +00:00
// CAUTION: If you update code in this file, you may need to also update code
2015-06-11 19:34:04 +00:00
// in contrib/mesos/pkg/service/endpoints_controller.go
2015-10-10 03:58:57 +00:00
package endpoint
2014-06-06 23:40:48 +00:00
import (
2015-03-20 21:24:43 +00:00
"reflect"
2016-05-06 20:15:49 +00:00
"strconv"
2015-04-16 23:18:02 +00:00
"time"
2014-06-06 23:40:48 +00:00
2016-02-02 18:59:54 +00:00
"encoding/json"
"github.com/golang/glog"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
"k8s.io/kubernetes/pkg/api/errors"
2015-11-06 11:34:42 +00:00
podutil "k8s.io/kubernetes/pkg/api/pod"
2016-02-02 18:59:54 +00:00
utilpod "k8s.io/kubernetes/pkg/api/pod"
2015-09-03 21:40:58 +00:00
"k8s.io/kubernetes/pkg/client/cache"
2016-02-05 21:58:03 +00:00
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
2015-10-06 09:12:00 +00:00
"k8s.io/kubernetes/pkg/controller"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/controller/framework"
2016-04-14 18:00:52 +00:00
"k8s.io/kubernetes/pkg/controller/framework/informers"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
2016-04-13 18:38:32 +00:00
"k8s.io/kubernetes/pkg/util/metrics"
2016-01-15 07:32:10 +00:00
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
2015-09-09 17:45:01 +00:00
"k8s.io/kubernetes/pkg/util/sets"
2016-02-02 10:57:06 +00:00
"k8s.io/kubernetes/pkg/util/wait"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
2014-06-06 23:40:48 +00:00
)
2015-04-16 23:18:02 +00:00
const (
// We'll attempt to recompute EVERY service's endpoints at least this
// often. Higher numbers = lower CPU/network load; lower numbers =
// shorter amount of time before a mistaken endpoint is corrected.
FullServiceResyncPeriod = 30 * time . Second
2016-03-16 08:47:30 +00:00
// We must avoid syncing service until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time . Millisecond
2016-05-06 20:15:49 +00:00
// An annotation on the Service denoting if the endpoints controller should
// go ahead and create endpoints for unready pods. This annotation is
// currently only used by PetSets, where we need the pet to be DNS
// resolvable during initialization. In this situation we create a headless
// service just for the PetSet, and clients shouldn't be using this Service
// for anything so unready endpoints don't matter.
TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
2015-04-16 23:18:02 +00:00
)
var (
keyFunc = framework . DeletionHandlingMetaNamespaceKeyFunc
)
// NewEndpointController returns a new *EndpointController.
2016-04-07 12:15:21 +00:00
func NewEndpointController ( podInformer framework . SharedIndexInformer , client * clientset . Clientset ) * EndpointController {
2016-04-13 18:38:32 +00:00
if client != nil && client . Core ( ) . GetRESTClient ( ) . GetRateLimiter ( ) != nil {
metrics . RegisterMetricAndTrackRateLimiterUsage ( "endpoint_controller" , client . Core ( ) . GetRESTClient ( ) . GetRateLimiter ( ) )
}
2015-04-16 23:18:02 +00:00
e := & EndpointController {
client : client ,
queue : workqueue . New ( ) ,
}
e . serviceStore . Store , e . serviceController = framework . NewInformer (
& cache . ListWatch {
2015-12-10 09:39:03 +00:00
ListFunc : func ( options api . ListOptions ) ( runtime . Object , error ) {
2016-02-03 21:21:05 +00:00
return e . client . Core ( ) . Services ( api . NamespaceAll ) . List ( options )
2015-04-16 23:18:02 +00:00
} ,
2015-12-10 09:39:03 +00:00
WatchFunc : func ( options api . ListOptions ) ( watch . Interface , error ) {
2016-02-03 21:21:05 +00:00
return e . client . Core ( ) . Services ( api . NamespaceAll ) . Watch ( options )
2015-04-16 23:18:02 +00:00
} ,
} ,
& api . Service { } ,
2015-10-06 09:12:00 +00:00
// TODO: Can we have much longer period here?
2015-04-16 23:18:02 +00:00
FullServiceResyncPeriod ,
framework . ResourceEventHandlerFuncs {
AddFunc : e . enqueueService ,
UpdateFunc : func ( old , cur interface { } ) {
e . enqueueService ( cur )
} ,
DeleteFunc : e . enqueueService ,
} ,
)
2016-04-14 18:00:52 +00:00
podInformer . AddEventHandler ( framework . ResourceEventHandlerFuncs {
AddFunc : e . addPod ,
UpdateFunc : e . updatePod ,
DeleteFunc : e . deletePod ,
} )
2016-04-07 12:15:21 +00:00
e . podStore . Indexer = podInformer . GetIndexer ( )
2016-04-14 18:00:52 +00:00
e . podController = podInformer . GetController ( )
e . podStoreSynced = podInformer . HasSynced
return e
}
// NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer.
func NewEndpointControllerFromClient ( client * clientset . Clientset , resyncPeriod controller . ResyncPeriodFunc ) * EndpointController {
2016-05-02 04:35:18 +00:00
podInformer := informers . CreateSharedPodIndexInformer ( client , resyncPeriod ( ) )
2016-04-14 18:00:52 +00:00
e := NewEndpointController ( podInformer , client )
e . internalPodInformer = podInformer
2015-04-16 23:18:02 +00:00
return e
}
2015-02-04 19:21:33 +00:00
// EndpointController manages selector-based service endpoints.
2014-06-06 23:40:48 +00:00
type EndpointController struct {
2016-01-29 06:34:08 +00:00
client * clientset . Clientset
2015-04-16 23:18:02 +00:00
serviceStore cache . StoreToServiceLister
podStore cache . StoreToPodLister
2016-04-14 18:00:52 +00:00
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewEndpointController(passing SharedInformer), this
// will be null
2016-04-07 12:15:21 +00:00
internalPodInformer framework . SharedIndexInformer
2016-04-14 18:00:52 +00:00
2015-04-16 23:18:02 +00:00
// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
queue * workqueue . Type
// Since we join two objects, we'll watch both of them with
// controllers.
serviceController * framework . Controller
2016-04-14 18:00:52 +00:00
podController framework . ControllerInterface
2016-03-16 08:47:30 +00:00
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func ( ) bool
2014-06-06 23:40:48 +00:00
}
2015-04-16 23:18:02 +00:00
// Runs e; will not return until stopCh is closed. workers determines how many
// endpoints will be handled in parallel.
func ( e * EndpointController ) Run ( workers int , stopCh <- chan struct { } ) {
2016-01-15 07:32:10 +00:00
defer utilruntime . HandleCrash ( )
2015-04-16 23:18:02 +00:00
go e . serviceController . Run ( stopCh )
go e . podController . Run ( stopCh )
for i := 0 ; i < workers ; i ++ {
2016-02-02 10:57:06 +00:00
go wait . Until ( e . worker , time . Second , stopCh )
2014-07-12 07:15:30 +00:00
}
2015-04-24 21:16:27 +00:00
go func ( ) {
2016-01-15 07:32:10 +00:00
defer utilruntime . HandleCrash ( )
2015-04-24 21:16:27 +00:00
time . Sleep ( 5 * time . Minute ) // give time for our cache to fill
e . checkLeftoverEndpoints ( )
} ( )
2016-04-14 18:00:52 +00:00
if e . internalPodInformer != nil {
go e . internalPodInformer . Run ( stopCh )
}
2015-04-16 23:18:02 +00:00
<- stopCh
e . queue . ShutDown ( )
2014-07-12 07:15:30 +00:00
}
2015-09-09 17:45:01 +00:00
func ( e * EndpointController ) getPodServiceMemberships ( pod * api . Pod ) ( sets . String , error ) {
set := sets . String { }
2015-04-16 23:18:02 +00:00
services , err := e . serviceStore . GetPodServices ( pod )
2014-06-06 23:40:48 +00:00
if err != nil {
2015-04-16 23:18:02 +00:00
// don't log this error because this function makes pointless
// errors when no services match.
return set , nil
2014-06-06 23:40:48 +00:00
}
2015-04-16 23:18:02 +00:00
for i := range services {
key , err := keyFunc ( & services [ i ] )
if err != nil {
return nil , err
2014-10-28 00:56:33 +00:00
}
2015-04-16 23:18:02 +00:00
set . Insert ( key )
}
return set , nil
}
// When a pod is added, figure out what services it will be a member of and
// enqueue them. obj must have *api.Pod type.
func ( e * EndpointController ) addPod ( obj interface { } ) {
pod := obj . ( * api . Pod )
services , err := e . getPodServiceMemberships ( pod )
if err != nil {
glog . Errorf ( "Unable to get pod %v/%v's service memberships: %v" , pod . Namespace , pod . Name , err )
return
}
for key := range services {
e . queue . Add ( key )
}
}
// When a pod is updated, figure out what services it used to be a member of
// and what services it will be a member of, and enqueue the union of these.
// old and cur must be *api.Pod types.
func ( e * EndpointController ) updatePod ( old , cur interface { } ) {
if api . Semantic . DeepEqual ( old , cur ) {
return
}
newPod := old . ( * api . Pod )
services , err := e . getPodServiceMemberships ( newPod )
if err != nil {
glog . Errorf ( "Unable to get pod %v/%v's service memberships: %v" , newPod . Namespace , newPod . Name , err )
return
}
2014-11-18 17:49:00 +00:00
2015-04-16 23:18:02 +00:00
oldPod := cur . ( * api . Pod )
// Only need to get the old services if the labels changed.
2016-02-02 18:59:54 +00:00
if ! reflect . DeepEqual ( newPod . Labels , oldPod . Labels ) ||
2016-04-14 17:45:29 +00:00
! hostNameAndDomainAreEqual ( newPod , oldPod ) {
2015-04-16 23:18:02 +00:00
oldServices , err := e . getPodServiceMemberships ( oldPod )
2014-06-06 23:40:48 +00:00
if err != nil {
2015-04-16 23:18:02 +00:00
glog . Errorf ( "Unable to get pod %v/%v's service memberships: %v" , oldPod . Namespace , oldPod . Name , err )
return
2014-06-06 23:40:48 +00:00
}
2015-04-16 23:18:02 +00:00
services = services . Union ( oldServices )
}
for key := range services {
e . queue . Add ( key )
}
}
2014-11-13 15:52:13 +00:00
2016-04-14 17:45:29 +00:00
func hostNameAndDomainAreEqual ( pod1 , pod2 * api . Pod ) bool {
return getHostname ( pod1 ) == getHostname ( pod2 ) &&
getSubdomain ( pod1 ) == getSubdomain ( pod2 )
}
func getHostname ( pod * api . Pod ) string {
if len ( pod . Spec . Hostname ) > 0 {
return pod . Spec . Hostname
2016-02-02 18:59:54 +00:00
}
2016-04-14 17:45:29 +00:00
if pod . Annotations != nil {
return pod . Annotations [ utilpod . PodHostnameAnnotation ]
2016-02-02 18:59:54 +00:00
}
2016-04-14 17:45:29 +00:00
return ""
}
func getSubdomain ( pod * api . Pod ) string {
if len ( pod . Spec . Subdomain ) > 0 {
return pod . Spec . Subdomain
}
if pod . Annotations != nil {
return pod . Annotations [ utilpod . PodSubdomainAnnotation ]
}
return ""
2016-02-02 18:59:54 +00:00
}
2015-04-16 23:18:02 +00:00
// When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func ( e * EndpointController ) deletePod ( obj interface { } ) {
if _ , ok := obj . ( * api . Pod ) ; ok {
// Enqueue all the services that the pod used to be a member
// of. This happens to be exactly the same thing we do when a
// pod is added.
e . addPod ( obj )
return
}
podKey , err := keyFunc ( obj )
if err != nil {
glog . Errorf ( "Couldn't get key for object %+v: %v" , obj , err )
2016-05-29 12:44:20 +00:00
return
2015-04-16 23:18:02 +00:00
}
glog . Infof ( "Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records." , podKey , FullServiceResyncPeriod )
2015-02-02 18:51:52 +00:00
2015-04-16 23:18:02 +00:00
// TODO: keep a map of pods to services to handle this condition.
}
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
func ( e * EndpointController ) enqueueService ( obj interface { } ) {
key , err := keyFunc ( obj )
if err != nil {
glog . Errorf ( "Couldn't get key for object %+v: %v" , obj , err )
2016-05-29 12:44:20 +00:00
return
2015-04-16 23:18:02 +00:00
}
2015-02-02 18:51:52 +00:00
2015-04-16 23:18:02 +00:00
e . queue . Add ( key )
}
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time.
func ( e * EndpointController ) worker ( ) {
for {
func ( ) {
key , quit := e . queue . Get ( )
if quit {
return
2015-03-13 15:16:41 +00:00
}
2015-04-16 23:18:02 +00:00
// Use defer: in the unlikely event that there's a
// panic, we'd still like this to get marked done--
// otherwise the controller will not be able to sync
// this service again until it is restarted.
defer e . queue . Done ( key )
e . syncService ( key . ( string ) )
} ( )
}
}
2015-03-20 21:24:43 +00:00
2015-04-16 23:18:02 +00:00
func ( e * EndpointController ) syncService ( key string ) {
startTime := time . Now ( )
defer func ( ) {
glog . V ( 4 ) . Infof ( "Finished syncing service %q endpoints. (%v)" , key , time . Now ( ) . Sub ( startTime ) )
} ( )
2016-03-16 08:47:30 +00:00
if ! e . podStoreSynced ( ) {
// Sleep so we give the pod reflector goroutine a chance to run.
time . Sleep ( PodStoreSyncedPollPeriod )
2016-06-17 09:05:50 +00:00
glog . Infof ( "Waiting for pods controller to sync, requeuing service %v" , key )
2016-03-16 08:47:30 +00:00
e . queue . Add ( key )
return
}
2015-04-16 23:18:02 +00:00
obj , exists , err := e . serviceStore . Store . GetByKey ( key )
if err != nil || ! exists {
// Delete the corresponding endpoint, as the service has been deleted.
// TODO: Please note that this will delete an endpoint when a
// service is deleted. However, if we're down at the time when
// the service is deleted, we will miss that deletion, so this
// doesn't completely solve the problem. See #6877.
namespace , name , err := cache . SplitMetaNamespaceKey ( key )
2014-09-26 20:34:55 +00:00
if err != nil {
2015-04-16 23:18:02 +00:00
glog . Errorf ( "Need to delete endpoint with key %q, but couldn't understand the key: %v" , key , err )
// Don't retry, as the key isn't going to magically become understandable.
return
}
2016-01-29 06:34:08 +00:00
err = e . client . Endpoints ( namespace ) . Delete ( name , nil )
2015-04-16 23:18:02 +00:00
if err != nil && ! errors . IsNotFound ( err ) {
glog . Errorf ( "Error deleting endpoint %q: %v" , key , err )
e . queue . Add ( key ) // Retry
}
return
}
service := obj . ( * api . Service )
if service . Spec . Selector == nil {
// services without a selector receive no endpoints from this controller;
// these services will receive the endpoints that are created out-of-band via the REST API.
return
}
glog . V ( 5 ) . Infof ( "About to update endpoints for service %q" , key )
pods , err := e . podStore . Pods ( service . Namespace ) . List ( labels . Set ( service . Spec . Selector ) . AsSelector ( ) )
if err != nil {
// Since we're getting stuff from a local cache, it is
// basically impossible to get this error.
glog . Errorf ( "Error syncing service %q: %v" , key , err )
e . queue . Add ( key ) // Retry
return
}
subsets := [ ] api . EndpointSubset { }
2016-02-02 18:59:54 +00:00
podHostNames := map [ string ] endpoints . HostRecord { }
2016-05-06 20:15:49 +00:00
var tolerateUnreadyEndpoints bool
if v , ok := service . Annotations [ TolerateUnreadyEndpointsAnnotation ] ; ok {
b , err := strconv . ParseBool ( v )
if err == nil {
tolerateUnreadyEndpoints = b
} else {
glog . Errorf ( "Failed to parse annotation %v: %v" , TolerateUnreadyEndpointsAnnotation , err )
}
}
2015-04-16 23:18:02 +00:00
for i := range pods . Items {
pod := & pods . Items [ i ]
for i := range service . Spec . Ports {
servicePort := & service . Spec . Ports [ i ]
portName := servicePort . Name
portProto := servicePort . Protocol
2015-11-06 11:34:42 +00:00
portNum , err := podutil . FindPort ( pod , servicePort )
2015-04-16 23:18:02 +00:00
if err != nil {
2015-05-16 23:37:13 +00:00
glog . V ( 4 ) . Infof ( "Failed to find port for service %s/%s: %v" , service . Namespace , service . Name , err )
2015-04-16 23:18:02 +00:00
continue
}
if len ( pod . Status . PodIP ) == 0 {
2015-08-20 01:52:34 +00:00
glog . V ( 5 ) . Infof ( "Failed to find an IP for pod %s/%s" , pod . Namespace , pod . Name )
continue
}
if pod . DeletionTimestamp != nil {
glog . V ( 5 ) . Infof ( "Pod is being deleted %s/%s" , pod . Namespace , pod . Name )
2015-04-16 23:18:02 +00:00
continue
}
2016-04-27 04:35:14 +00:00
epp := api . EndpointPort { Name : portName , Port : int32 ( portNum ) , Protocol : portProto }
2016-02-02 18:59:54 +00:00
epa := api . EndpointAddress {
IP : pod . Status . PodIP ,
TargetRef : & api . ObjectReference {
Kind : "Pod" ,
Namespace : pod . ObjectMeta . Namespace ,
Name : pod . ObjectMeta . Name ,
UID : pod . ObjectMeta . UID ,
ResourceVersion : pod . ObjectMeta . ResourceVersion ,
} }
2016-04-14 17:45:29 +00:00
hostname := getHostname ( pod )
if len ( hostname ) > 0 &&
getSubdomain ( pod ) == service . Name &&
service . Namespace == pod . Namespace {
hostRecord := endpoints . HostRecord {
HostName : hostname ,
}
// TODO: stop populating podHostNames annotation in 1.4
podHostNames [ string ( pod . Status . PodIP ) ] = hostRecord
epa . Hostname = hostname
}
2016-05-06 20:15:49 +00:00
if tolerateUnreadyEndpoints || api . IsPodReady ( pod ) {
2015-09-10 01:28:53 +00:00
subsets = append ( subsets , api . EndpointSubset {
Addresses : [ ] api . EndpointAddress { epa } ,
Ports : [ ] api . EndpointPort { epp } ,
} )
} else {
glog . V ( 5 ) . Infof ( "Pod is out of service: %v/%v" , pod . Namespace , pod . Name )
subsets = append ( subsets , api . EndpointSubset {
NotReadyAddresses : [ ] api . EndpointAddress { epa } ,
Ports : [ ] api . EndpointPort { epp } ,
} )
}
2014-09-26 20:34:55 +00:00
}
2015-04-16 23:18:02 +00:00
}
subsets = endpoints . RepackSubsets ( subsets )
2014-09-26 20:34:55 +00:00
2015-04-16 23:18:02 +00:00
// See if there's actually an update here.
currentEndpoints , err := e . client . Endpoints ( service . Namespace ) . Get ( service . Name )
if err != nil {
if errors . IsNotFound ( err ) {
currentEndpoints = & api . Endpoints {
ObjectMeta : api . ObjectMeta {
Name : service . Name ,
Labels : service . Labels ,
} ,
}
2014-09-26 20:34:55 +00:00
} else {
2015-04-16 23:18:02 +00:00
glog . Errorf ( "Error getting endpoints: %v" , err )
e . queue . Add ( key ) // Retry
return
2014-06-06 23:40:48 +00:00
}
}
2016-02-02 18:59:54 +00:00
serializedPodHostNames := ""
if len ( podHostNames ) > 0 {
b , err := json . Marshal ( podHostNames )
if err != nil {
glog . Errorf ( "Error updating endpoints. Marshalling of hostnames failed.: %v" , err )
e . queue . Add ( key ) // Retry
return
}
serializedPodHostNames = string ( b )
}
newAnnotations := make ( map [ string ] string )
newAnnotations [ endpoints . PodHostnamesAnnotation ] = serializedPodHostNames
if reflect . DeepEqual ( currentEndpoints . Subsets , subsets ) &&
2016-04-14 17:45:29 +00:00
reflect . DeepEqual ( currentEndpoints . Labels , service . Labels ) {
2015-04-16 23:18:02 +00:00
glog . V ( 5 ) . Infof ( "endpoints are equal for %s/%s, skipping update" , service . Namespace , service . Name )
return
}
newEndpoints := currentEndpoints
newEndpoints . Subsets = subsets
newEndpoints . Labels = service . Labels
2016-02-02 18:59:54 +00:00
if newEndpoints . Annotations == nil {
newEndpoints . Annotations = make ( map [ string ] string )
}
if len ( serializedPodHostNames ) == 0 {
delete ( newEndpoints . Annotations , endpoints . PodHostnamesAnnotation )
} else {
newEndpoints . Annotations [ endpoints . PodHostnamesAnnotation ] = serializedPodHostNames
}
2015-04-16 23:18:02 +00:00
if len ( currentEndpoints . ResourceVersion ) == 0 {
// No previous endpoints, create them
_ , err = e . client . Endpoints ( service . Namespace ) . Create ( newEndpoints )
} else {
// Pre-existing
_ , err = e . client . Endpoints ( service . Namespace ) . Update ( newEndpoints )
}
if err != nil {
glog . Errorf ( "Error updating endpoints: %v" , err )
e . queue . Add ( key ) // Retry
}
2014-06-06 23:40:48 +00:00
}
2014-08-11 07:34:59 +00:00
2015-04-24 21:16:27 +00:00
// checkLeftoverEndpoints lists all currently existing endpoints and adds their
// service to the queue. This will detect endpoints that exist with no
// corresponding service; these endpoints need to be deleted. We only need to
// do this once on startup, because in steady-state these are detected (but
// some stragglers could have been left behind if the endpoint controller
// reboots).
func ( e * EndpointController ) checkLeftoverEndpoints ( ) {
2015-12-10 09:39:03 +00:00
list , err := e . client . Endpoints ( api . NamespaceAll ) . List ( api . ListOptions { } )
2015-04-24 21:16:27 +00:00
if err != nil {
glog . Errorf ( "Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)" , err )
return
}
for i := range list . Items {
ep := & list . Items [ i ]
key , err := keyFunc ( ep )
if err != nil {
glog . Errorf ( "Unable to get key for endpoint %#v" , ep )
continue
}
e . queue . Add ( key )
}
}