diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index f1814a9f33..93612354e8 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -18,10 +18,13 @@ package cache import ( "errors" + "fmt" "io" "net" "net/url" "reflect" + goruntime "runtime" + "strings" "sync" "syscall" "time" @@ -45,6 +48,9 @@ type ListerWatcher interface { // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { + // name identifies this reflector. By default it will be a file:line if possible. + name string + // The type of object we expect to place in the store. expectedType reflect.Type // The destination to sync up with the watch source @@ -78,7 +84,13 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa // so that you can use reflectors to periodically process everything as well as // incrementally processing the things that change. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { + return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod) +} + +// NewNamedReflector same as NewReflector, but with a specified name for logging +func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { r := &Reflector{ + name: name, listerWatcher: lw, store: store, expectedType: reflect.TypeOf(expectedType), @@ -88,6 +100,38 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn return r } +// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common +// call chains to NewReflector, so they'd be low entropy names for reflectors +var internalPackages = []string{"kubernetes/pkg/client/cache/", "kubernetes/pkg/controller/framework/"} + +// getDefaultReflectorName walks back through the call stack until we find a caller from outside of the ignoredPackages +// it returns back a shortpath/filename:line to aid in identification of this reflector when it starts logging +func getDefaultReflectorName(ignoredPackages ...string) string { + name := "????" +outer: + for i := 1; i < 10; i++ { + _, file, line, ok := goruntime.Caller(i) + if !ok { + break + } + for _, ignoredPackage := range ignoredPackages { + if strings.Contains(file, ignoredPackage) { + continue outer + } + + } + + pkgLocation := strings.LastIndex(file, "/pkg/") + if pkgLocation >= 0 { + file = file[pkgLocation+1:] + } + name = fmt.Sprintf("%s:%d", file, line) + break + } + + return name +} + // Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run starts a goroutine and returns immediately. func (r *Reflector) Run() { @@ -133,22 +177,22 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) { list, err := r.listerWatcher.List() if err != nil { - glog.Errorf("Failed to list %v: %v", r.expectedType, err) + util.HandleError(fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)) return } meta, err := meta.Accessor(list) if err != nil { - glog.Errorf("Unable to understand list result %#v", list) + util.HandleError(fmt.Errorf("%s: Unable to understand list result %#v", r.name, list)) return } resourceVersion = meta.ResourceVersion() items, err := runtime.ExtractList(list) if err != nil { - glog.Errorf("Unable to understand list result %#v (%v)", list, err) + util.HandleError(fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)) return } if err := r.syncWith(items); err != nil { - glog.Errorf("Unable to sync list result: %v", err) + util.HandleError(fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)) return } r.setLastSyncResourceVersion(resourceVersion) @@ -160,9 +204,9 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) { case io.EOF: // watch closed normally case io.ErrUnexpectedEOF: - glog.V(1).Infof("Watch for %v closed with unexpected EOF: %v", r.expectedType, err) + glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err) default: - glog.Errorf("Failed to watch %v: %v", r.expectedType, err) + util.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) } // If this is "connection refused" error, it means that most likely apiserver is not responsive. // It doesn't make sense to re-list all objects because most likely we will be able to restart @@ -180,7 +224,7 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) { } if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil { if err != errorResyncRequested && err != errorStopRequested { - glog.Errorf("watch of %v ended with: %v", r.expectedType, err) + util.HandleError(fmt.Errorf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)) } return } @@ -221,12 +265,12 @@ loop: return apierrs.FromObject(event.Object) } if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { - glog.Errorf("expected type %v, but watch event object had type %v", e, a) + util.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) continue } meta, err := meta.Accessor(event.Object) if err != nil { - glog.Errorf("unable to understand watch event %#v", event) + util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) continue } switch event.Type { @@ -240,7 +284,7 @@ loop: // to change this. r.store.Delete(event.Object) default: - glog.Errorf("unable to understand watch event %#v", event) + util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } *resourceVersion = meta.ResourceVersion() r.setLastSyncResourceVersion(*resourceVersion) @@ -250,10 +294,10 @@ loop: watchDuration := time.Now().Sub(start) if watchDuration < 1*time.Second && eventCount == 0 { - glog.V(4).Infof("Unexpected watch close - watch lasted less than a second and no items received") + glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name) return errors.New("very short watch") } - glog.V(4).Infof("Watch close - %v total %v items received", r.expectedType, eventCount) + glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount) return nil }