mirror of https://github.com/k3s-io/k3s
Merge pull request #10024 from deads2k/name-reflectors
add originator to reflector loggingpull/6/head
commit
5c6f4f5906
|
@ -18,10 +18,13 @@ package cache
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"reflect"
|
||||
goruntime "runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
@ -45,6 +48,9 @@ type ListerWatcher interface {
|
|||
|
||||
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
|
||||
type Reflector struct {
|
||||
// name identifies this reflector. By default it will be a file:line if possible.
|
||||
name string
|
||||
|
||||
// The type of object we expect to place in the store.
|
||||
expectedType reflect.Type
|
||||
// The destination to sync up with the watch source
|
||||
|
@ -78,7 +84,13 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
|
|||
// so that you can use reflectors to periodically process everything as well as
|
||||
// incrementally processing the things that change.
|
||||
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
|
||||
return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
|
||||
}
|
||||
|
||||
// NewNamedReflector same as NewReflector, but with a specified name for logging
|
||||
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
|
||||
r := &Reflector{
|
||||
name: name,
|
||||
listerWatcher: lw,
|
||||
store: store,
|
||||
expectedType: reflect.TypeOf(expectedType),
|
||||
|
@ -88,6 +100,38 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn
|
|||
return r
|
||||
}
|
||||
|
||||
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
|
||||
// call chains to NewReflector, so they'd be low entropy names for reflectors
|
||||
var internalPackages = []string{"kubernetes/pkg/client/cache/", "kubernetes/pkg/controller/framework/"}
|
||||
|
||||
// getDefaultReflectorName walks back through the call stack until we find a caller from outside of the ignoredPackages
|
||||
// it returns back a shortpath/filename:line to aid in identification of this reflector when it starts logging
|
||||
func getDefaultReflectorName(ignoredPackages ...string) string {
|
||||
name := "????"
|
||||
outer:
|
||||
for i := 1; i < 10; i++ {
|
||||
_, file, line, ok := goruntime.Caller(i)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
for _, ignoredPackage := range ignoredPackages {
|
||||
if strings.Contains(file, ignoredPackage) {
|
||||
continue outer
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pkgLocation := strings.LastIndex(file, "/pkg/")
|
||||
if pkgLocation >= 0 {
|
||||
file = file[pkgLocation+1:]
|
||||
}
|
||||
name = fmt.Sprintf("%s:%d", file, line)
|
||||
break
|
||||
}
|
||||
|
||||
return name
|
||||
}
|
||||
|
||||
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||
// Run starts a goroutine and returns immediately.
|
||||
func (r *Reflector) Run() {
|
||||
|
@ -133,22 +177,22 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
|
|||
|
||||
list, err := r.listerWatcher.List()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to list %v: %v", r.expectedType, err)
|
||||
util.HandleError(fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err))
|
||||
return
|
||||
}
|
||||
meta, err := meta.Accessor(list)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to understand list result %#v", list)
|
||||
util.HandleError(fmt.Errorf("%s: Unable to understand list result %#v", r.name, list))
|
||||
return
|
||||
}
|
||||
resourceVersion = meta.ResourceVersion()
|
||||
items, err := runtime.ExtractList(list)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to understand list result %#v (%v)", list, err)
|
||||
util.HandleError(fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err))
|
||||
return
|
||||
}
|
||||
if err := r.syncWith(items); err != nil {
|
||||
glog.Errorf("Unable to sync list result: %v", err)
|
||||
util.HandleError(fmt.Errorf("%s: Unable to sync list result: %v", r.name, err))
|
||||
return
|
||||
}
|
||||
r.setLastSyncResourceVersion(resourceVersion)
|
||||
|
@ -160,9 +204,9 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
|
|||
case io.EOF:
|
||||
// watch closed normally
|
||||
case io.ErrUnexpectedEOF:
|
||||
glog.V(1).Infof("Watch for %v closed with unexpected EOF: %v", r.expectedType, err)
|
||||
glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
|
||||
default:
|
||||
glog.Errorf("Failed to watch %v: %v", r.expectedType, err)
|
||||
util.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
|
||||
}
|
||||
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
||||
// It doesn't make sense to re-list all objects because most likely we will be able to restart
|
||||
|
@ -180,7 +224,7 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
|
|||
}
|
||||
if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil {
|
||||
if err != errorResyncRequested && err != errorStopRequested {
|
||||
glog.Errorf("watch of %v ended with: %v", r.expectedType, err)
|
||||
util.HandleError(fmt.Errorf("%s: watch of %v ended with: %v", r.name, r.expectedType, err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -221,12 +265,12 @@ loop:
|
|||
return apierrs.FromObject(event.Object)
|
||||
}
|
||||
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
|
||||
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
|
||||
util.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
|
||||
continue
|
||||
}
|
||||
meta, err := meta.Accessor(event.Object)
|
||||
if err != nil {
|
||||
glog.Errorf("unable to understand watch event %#v", event)
|
||||
util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
|
||||
continue
|
||||
}
|
||||
switch event.Type {
|
||||
|
@ -240,7 +284,7 @@ loop:
|
|||
// to change this.
|
||||
r.store.Delete(event.Object)
|
||||
default:
|
||||
glog.Errorf("unable to understand watch event %#v", event)
|
||||
util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
|
||||
}
|
||||
*resourceVersion = meta.ResourceVersion()
|
||||
r.setLastSyncResourceVersion(*resourceVersion)
|
||||
|
@ -250,10 +294,10 @@ loop:
|
|||
|
||||
watchDuration := time.Now().Sub(start)
|
||||
if watchDuration < 1*time.Second && eventCount == 0 {
|
||||
glog.V(4).Infof("Unexpected watch close - watch lasted less than a second and no items received")
|
||||
glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
|
||||
return errors.New("very short watch")
|
||||
}
|
||||
glog.V(4).Infof("Watch close - %v total %v items received", r.expectedType, eventCount)
|
||||
glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue