2014-08-03 07:00:42 +00:00
|
|
|
/*
|
2015-05-01 16:19:44 +00:00
|
|
|
Copyright 2014 The Kubernetes Authors All rights reserved.
|
2014-08-03 07:00:42 +00:00
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package cache
|
|
|
|
|
|
|
|
import (
|
2014-09-20 01:09:40 +00:00
|
|
|
"errors"
|
2015-06-18 14:15:30 +00:00
|
|
|
"fmt"
|
2014-12-05 20:03:07 +00:00
|
|
|
"io"
|
2015-10-26 09:34:45 +00:00
|
|
|
"math/rand"
|
2015-05-28 14:20:39 +00:00
|
|
|
"net"
|
|
|
|
"net/url"
|
2014-08-03 07:00:42 +00:00
|
|
|
"reflect"
|
2015-06-18 14:15:30 +00:00
|
|
|
goruntime "runtime"
|
|
|
|
"strings"
|
2015-05-19 09:24:17 +00:00
|
|
|
"sync"
|
2015-05-28 14:20:39 +00:00
|
|
|
"syscall"
|
2014-08-03 07:00:42 +00:00
|
|
|
"time"
|
|
|
|
|
2015-08-05 22:05:17 +00:00
|
|
|
"github.com/golang/glog"
|
2015-10-26 09:34:45 +00:00
|
|
|
"k8s.io/kubernetes/pkg/api"
|
2015-08-05 22:03:47 +00:00
|
|
|
apierrs "k8s.io/kubernetes/pkg/api/errors"
|
|
|
|
"k8s.io/kubernetes/pkg/api/meta"
|
|
|
|
"k8s.io/kubernetes/pkg/runtime"
|
|
|
|
"k8s.io/kubernetes/pkg/util"
|
|
|
|
"k8s.io/kubernetes/pkg/watch"
|
2014-08-03 07:00:42 +00:00
|
|
|
)
|
|
|
|
|
2014-09-16 01:10:10 +00:00
|
|
|
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
|
|
|
|
type ListerWatcher interface {
|
|
|
|
// List should return a list type object; the Items field will be extracted, and the
|
|
|
|
// ResourceVersion field will be used to start the watch in the right place.
|
|
|
|
List() (runtime.Object, error)
|
|
|
|
// Watch should begin a watch at the specified version.
|
2015-10-26 09:34:45 +00:00
|
|
|
Watch(options api.ListOptions) (watch.Interface, error)
|
2014-09-16 01:10:10 +00:00
|
|
|
}
|
|
|
|
|
2014-08-03 22:36:36 +00:00
|
|
|
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
|
|
|
|
type Reflector struct {
|
2015-06-18 14:15:30 +00:00
|
|
|
// name identifies this reflector. By default it will be a file:line if possible.
|
|
|
|
name string
|
|
|
|
|
2014-08-14 22:42:05 +00:00
|
|
|
// The type of object we expect to place in the store.
|
2014-08-03 07:00:42 +00:00
|
|
|
expectedType reflect.Type
|
2014-08-14 22:42:05 +00:00
|
|
|
// The destination to sync up with the watch source
|
|
|
|
store Store
|
2014-09-16 01:10:10 +00:00
|
|
|
// listerWatcher is used to perform lists and watches.
|
|
|
|
listerWatcher ListerWatcher
|
2014-08-18 21:47:20 +00:00
|
|
|
// period controls timing between one watch ending and
|
2014-08-14 22:42:05 +00:00
|
|
|
// the beginning of the next one.
|
2015-02-27 00:58:00 +00:00
|
|
|
period time.Duration
|
|
|
|
resyncPeriod time.Duration
|
2015-10-26 09:34:45 +00:00
|
|
|
// nextResync is approximate time of next resync (0 if not scheduled)
|
|
|
|
nextResync time.Time
|
2015-03-27 18:17:54 +00:00
|
|
|
// lastSyncResourceVersion is the resource version token last
|
|
|
|
// observed when doing a sync with the underlying store
|
2015-05-19 09:24:17 +00:00
|
|
|
// it is thread safe, but not synchronized with the underlying store
|
2015-03-27 18:17:54 +00:00
|
|
|
lastSyncResourceVersion string
|
2015-05-19 09:24:17 +00:00
|
|
|
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
|
|
|
lastSyncResourceVersionMutex sync.RWMutex
|
2014-08-03 07:00:42 +00:00
|
|
|
}
|
|
|
|
|
2015-10-26 09:34:45 +00:00
|
|
|
var (
|
|
|
|
// We try to spread the load on apiserver by setting timeouts for
|
|
|
|
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
|
|
|
|
// However, it can be modified to avoid periodic resync to break the
|
|
|
|
// TCP connection.
|
|
|
|
minWatchTimeout = 5 * time.Minute
|
|
|
|
|
|
|
|
now func() time.Time = time.Now
|
|
|
|
// If we are within 'forceResyncThreshold' from the next planned resync
|
|
|
|
// and are just before issueing Watch(), resync will be forced now.
|
|
|
|
forceResyncThreshold = 3 * time.Second
|
|
|
|
// We try to set timeouts for Watch() so that we will finish about
|
|
|
|
// than 'timeoutThreshold' from next planned periodic resync.
|
|
|
|
timeoutThreshold = 1 * time.Second
|
|
|
|
)
|
|
|
|
|
2015-02-16 15:54:29 +00:00
|
|
|
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
|
|
|
|
// The indexer is configured to key on namespace
|
|
|
|
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
|
|
|
|
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
|
|
|
|
reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
|
|
|
|
return indexer, reflector
|
|
|
|
}
|
|
|
|
|
2014-09-02 10:00:28 +00:00
|
|
|
// NewReflector creates a new Reflector object which will keep the given store up to
|
2014-08-03 22:36:36 +00:00
|
|
|
// date with the server's contents for the given resource. Reflector promises to
|
2015-08-16 17:42:58 +00:00
|
|
|
// only put things in the store that have the type of expectedType, unless expectedType
|
|
|
|
// is nil. If resyncPeriod is non-zero, then lists will be executed after every
|
|
|
|
// resyncPeriod, so that you can use reflectors to periodically process everything as
|
|
|
|
// well as incrementally processing the things that change.
|
2015-02-27 00:58:00 +00:00
|
|
|
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
|
2015-06-18 14:15:30 +00:00
|
|
|
return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewNamedReflector same as NewReflector, but with a specified name for logging
|
|
|
|
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
|
2014-09-16 01:10:10 +00:00
|
|
|
r := &Reflector{
|
2015-06-18 14:15:30 +00:00
|
|
|
name: name,
|
2014-09-16 01:10:10 +00:00
|
|
|
listerWatcher: lw,
|
|
|
|
store: store,
|
|
|
|
expectedType: reflect.TypeOf(expectedType),
|
|
|
|
period: time.Second,
|
2015-02-27 00:58:00 +00:00
|
|
|
resyncPeriod: resyncPeriod,
|
2014-08-03 07:00:42 +00:00
|
|
|
}
|
2014-09-16 01:10:10 +00:00
|
|
|
return r
|
2014-08-03 07:00:42 +00:00
|
|
|
}
|
|
|
|
|
2015-06-18 14:15:30 +00:00
|
|
|
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
|
|
|
|
// call chains to NewReflector, so they'd be low entropy names for reflectors
|
|
|
|
var internalPackages = []string{"kubernetes/pkg/client/cache/", "kubernetes/pkg/controller/framework/"}
|
|
|
|
|
|
|
|
// getDefaultReflectorName walks back through the call stack until we find a caller from outside of the ignoredPackages
|
|
|
|
// it returns back a shortpath/filename:line to aid in identification of this reflector when it starts logging
|
|
|
|
func getDefaultReflectorName(ignoredPackages ...string) string {
|
|
|
|
name := "????"
|
|
|
|
outer:
|
|
|
|
for i := 1; i < 10; i++ {
|
|
|
|
_, file, line, ok := goruntime.Caller(i)
|
|
|
|
if !ok {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
for _, ignoredPackage := range ignoredPackages {
|
|
|
|
if strings.Contains(file, ignoredPackage) {
|
|
|
|
continue outer
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
pkgLocation := strings.LastIndex(file, "/pkg/")
|
|
|
|
if pkgLocation >= 0 {
|
|
|
|
file = file[pkgLocation+1:]
|
|
|
|
}
|
|
|
|
name = fmt.Sprintf("%s:%d", file, line)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
return name
|
|
|
|
}
|
|
|
|
|
2014-08-14 22:42:05 +00:00
|
|
|
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
|
|
|
// Run starts a goroutine and returns immediately.
|
2014-09-16 01:10:10 +00:00
|
|
|
func (r *Reflector) Run() {
|
2015-08-24 01:59:15 +00:00
|
|
|
go util.Until(func() { r.ListAndWatch(util.NeverStop) }, r.period, util.NeverStop)
|
2014-09-16 01:10:10 +00:00
|
|
|
}
|
|
|
|
|
2015-01-21 23:25:54 +00:00
|
|
|
// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
|
|
|
|
// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
|
|
|
|
func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
|
2015-07-28 06:26:53 +00:00
|
|
|
go util.Until(func() { r.ListAndWatch(stopCh) }, r.period, stopCh)
|
2015-01-21 23:25:54 +00:00
|
|
|
}
|
|
|
|
|
2015-02-27 00:58:00 +00:00
|
|
|
var (
|
|
|
|
// nothing will ever be sent down this channel
|
|
|
|
neverExitWatch <-chan time.Time = make(chan time.Time)
|
|
|
|
|
|
|
|
// Used to indicate that watching stopped so that a resync could happen.
|
|
|
|
errorResyncRequested = errors.New("resync channel fired")
|
2015-04-09 04:13:15 +00:00
|
|
|
|
|
|
|
// Used to indicate that watching stopped because of a signal from the stop
|
|
|
|
// channel passed in from a client of the reflector.
|
|
|
|
errorStopRequested = errors.New("Stop requested")
|
2015-02-27 00:58:00 +00:00
|
|
|
)
|
|
|
|
|
2015-05-04 21:29:33 +00:00
|
|
|
// resyncChan returns a channel which will receive something when a resync is
|
|
|
|
// required, and a cleanup function.
|
|
|
|
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
|
2015-02-27 00:58:00 +00:00
|
|
|
if r.resyncPeriod == 0 {
|
2015-10-26 09:34:45 +00:00
|
|
|
r.nextResync = time.Time{}
|
2015-05-04 21:29:33 +00:00
|
|
|
return neverExitWatch, func() bool { return false }
|
2015-02-27 00:58:00 +00:00
|
|
|
}
|
2015-05-04 21:29:33 +00:00
|
|
|
// The cleanup function is required: imagine the scenario where watches
|
|
|
|
// always fail so we end up listing frequently. Then, if we don't
|
|
|
|
// manually stop the timer, we could end up with many timers active
|
|
|
|
// concurrently.
|
2015-10-26 09:34:45 +00:00
|
|
|
r.nextResync = now().Add(r.resyncPeriod)
|
2015-05-04 21:29:33 +00:00
|
|
|
t := time.NewTimer(r.resyncPeriod)
|
|
|
|
return t.C, t.Stop
|
2015-02-27 00:58:00 +00:00
|
|
|
}
|
|
|
|
|
2015-10-26 09:34:45 +00:00
|
|
|
// We want to avoid situations when periodic resyncing is breaking the TCP
|
|
|
|
// connection.
|
|
|
|
// If response`s body is not read to completion before calling body.Close(),
|
|
|
|
// that TCP connection will not be reused in the future - see #15664 issue
|
|
|
|
// for more details.
|
|
|
|
// Thus, we set timeout for watch requests to be smaller than the remaining
|
|
|
|
// time until next periodic resync and force resyncing ourself to avoid
|
|
|
|
// breaking TCP connection.
|
|
|
|
//
|
|
|
|
// TODO: This should be parametrizable based on server load.
|
|
|
|
func (r *Reflector) timeoutForWatch() *int64 {
|
|
|
|
randTimeout := time.Duration(float64(minWatchTimeout) * (rand.Float64() + 1.0))
|
|
|
|
timeout := r.nextResync.Sub(now()) - timeoutThreshold
|
|
|
|
if timeout < 0 || randTimeout < timeout {
|
|
|
|
timeout = randTimeout
|
|
|
|
}
|
|
|
|
timeoutSeconds := int64(timeout.Seconds())
|
|
|
|
return &timeoutSeconds
|
|
|
|
}
|
|
|
|
|
|
|
|
// Returns true if we are close enough to next planned periodic resync
|
|
|
|
// and we can force resyncing ourself now.
|
|
|
|
func (r *Reflector) canForceResyncNow() bool {
|
|
|
|
if r.nextResync.IsZero() {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return now().Add(forceResyncThreshold).After(r.nextResync)
|
|
|
|
}
|
|
|
|
|
2015-07-28 06:26:53 +00:00
|
|
|
// Returns error if ListAndWatch didn't even tried to initialize watch.
|
|
|
|
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
2014-10-07 20:51:28 +00:00
|
|
|
var resourceVersion string
|
2015-05-04 21:29:33 +00:00
|
|
|
resyncCh, cleanup := r.resyncChan()
|
|
|
|
defer cleanup()
|
2014-09-16 01:10:10 +00:00
|
|
|
|
|
|
|
list, err := r.listerWatcher.List()
|
|
|
|
if err != nil {
|
2015-07-28 06:26:53 +00:00
|
|
|
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
|
2014-09-16 01:10:10 +00:00
|
|
|
}
|
2015-11-12 10:45:42 +00:00
|
|
|
metaInterface, err := meta.Accessor(list)
|
2014-09-16 01:10:10 +00:00
|
|
|
if err != nil {
|
2015-07-28 06:26:53 +00:00
|
|
|
return fmt.Errorf("%s: Unable to understand list result %#v", r.name, list)
|
2014-09-16 01:10:10 +00:00
|
|
|
}
|
2015-11-12 10:45:42 +00:00
|
|
|
resourceVersion = metaInterface.ResourceVersion()
|
|
|
|
items, err := meta.ExtractList(list)
|
2014-09-16 01:10:10 +00:00
|
|
|
if err != nil {
|
2015-07-28 06:26:53 +00:00
|
|
|
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
|
2014-09-16 01:10:10 +00:00
|
|
|
}
|
2015-07-28 06:26:53 +00:00
|
|
|
if err := r.syncWith(items, resourceVersion); err != nil {
|
2015-07-28 06:26:53 +00:00
|
|
|
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
|
2014-09-16 01:10:10 +00:00
|
|
|
}
|
2015-05-19 09:24:17 +00:00
|
|
|
r.setLastSyncResourceVersion(resourceVersion)
|
2014-09-16 01:10:10 +00:00
|
|
|
|
|
|
|
for {
|
2015-10-26 09:34:45 +00:00
|
|
|
options := api.ListOptions{
|
|
|
|
ResourceVersion: resourceVersion,
|
|
|
|
// We want to avoid situations when resyncing is breaking the TCP connection
|
|
|
|
// - see comment for 'timeoutForWatch()' for more details.
|
|
|
|
TimeoutSeconds: r.timeoutForWatch(),
|
|
|
|
}
|
|
|
|
w, err := r.listerWatcher.Watch(options)
|
2014-08-03 22:36:36 +00:00
|
|
|
if err != nil {
|
2014-12-05 20:03:07 +00:00
|
|
|
switch err {
|
|
|
|
case io.EOF:
|
|
|
|
// watch closed normally
|
|
|
|
case io.ErrUnexpectedEOF:
|
2015-06-18 14:15:30 +00:00
|
|
|
glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
|
2014-12-05 20:03:07 +00:00
|
|
|
default:
|
2015-06-18 14:15:30 +00:00
|
|
|
util.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
|
2014-12-05 20:03:07 +00:00
|
|
|
}
|
2015-05-28 14:20:39 +00:00
|
|
|
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
|
|
|
// It doesn't make sense to re-list all objects because most likely we will be able to restart
|
|
|
|
// watch where we ended.
|
|
|
|
// If that's the case wait and resend watch request.
|
|
|
|
if urlError, ok := err.(*url.Error); ok {
|
|
|
|
if opError, ok := urlError.Err.(*net.OpError); ok {
|
|
|
|
if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-07-28 06:26:53 +00:00
|
|
|
return nil
|
2014-08-03 22:36:36 +00:00
|
|
|
}
|
2015-04-09 04:13:15 +00:00
|
|
|
if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil {
|
2015-05-28 14:20:39 +00:00
|
|
|
if err != errorResyncRequested && err != errorStopRequested {
|
2015-07-28 03:47:16 +00:00
|
|
|
glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
|
2015-02-27 00:58:00 +00:00
|
|
|
}
|
2015-07-28 06:26:53 +00:00
|
|
|
return nil
|
2014-09-20 01:09:40 +00:00
|
|
|
}
|
2015-10-26 09:34:45 +00:00
|
|
|
if r.canForceResyncNow() {
|
|
|
|
glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync)
|
|
|
|
return nil
|
|
|
|
}
|
2014-09-16 01:10:10 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// syncWith replaces the store's items with the given list.
|
2015-07-28 06:26:53 +00:00
|
|
|
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
|
2015-01-26 21:44:53 +00:00
|
|
|
found := make([]interface{}, 0, len(items))
|
2014-09-16 01:10:10 +00:00
|
|
|
for _, item := range items {
|
2015-01-26 21:44:53 +00:00
|
|
|
found = append(found, item)
|
2014-09-16 01:10:10 +00:00
|
|
|
}
|
2015-08-18 08:34:27 +00:00
|
|
|
return r.store.Replace(found, resourceVersion)
|
2014-08-03 07:00:42 +00:00
|
|
|
}
|
|
|
|
|
2014-08-14 22:42:05 +00:00
|
|
|
// watchHandler watches w and keeps *resourceVersion up to date.
|
2015-04-09 04:13:15 +00:00
|
|
|
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, resyncCh <-chan time.Time, stopCh <-chan struct{}) error {
|
2014-09-20 01:09:40 +00:00
|
|
|
start := time.Now()
|
|
|
|
eventCount := 0
|
2015-04-09 04:13:15 +00:00
|
|
|
|
|
|
|
// Stopping the watcher should be idempotent and if we return from this function there's no way
|
|
|
|
// we're coming back in with the same watch interface.
|
|
|
|
defer w.Stop()
|
|
|
|
|
2015-02-27 00:58:00 +00:00
|
|
|
loop:
|
2014-08-03 07:00:42 +00:00
|
|
|
for {
|
2015-02-27 00:58:00 +00:00
|
|
|
select {
|
2015-04-09 04:13:15 +00:00
|
|
|
case <-stopCh:
|
|
|
|
return errorStopRequested
|
|
|
|
case <-resyncCh:
|
2015-02-27 00:58:00 +00:00
|
|
|
return errorResyncRequested
|
|
|
|
case event, ok := <-w.ResultChan():
|
|
|
|
if !ok {
|
|
|
|
break loop
|
|
|
|
}
|
|
|
|
if event.Type == watch.Error {
|
|
|
|
return apierrs.FromObject(event.Object)
|
|
|
|
}
|
2015-08-16 17:42:58 +00:00
|
|
|
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
|
2015-06-18 14:15:30 +00:00
|
|
|
util.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
|
2015-02-27 00:58:00 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
meta, err := meta.Accessor(event.Object)
|
|
|
|
if err != nil {
|
2015-06-18 14:15:30 +00:00
|
|
|
util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
|
2015-02-27 00:58:00 +00:00
|
|
|
continue
|
|
|
|
}
|
2015-07-28 06:26:53 +00:00
|
|
|
newResourceVersion := meta.ResourceVersion()
|
2015-02-27 00:58:00 +00:00
|
|
|
switch event.Type {
|
|
|
|
case watch.Added:
|
|
|
|
r.store.Add(event.Object)
|
|
|
|
case watch.Modified:
|
|
|
|
r.store.Update(event.Object)
|
|
|
|
case watch.Deleted:
|
|
|
|
// TODO: Will any consumers need access to the "last known
|
|
|
|
// state", which is passed in event.Object? If so, may need
|
|
|
|
// to change this.
|
|
|
|
r.store.Delete(event.Object)
|
|
|
|
default:
|
2015-06-18 14:15:30 +00:00
|
|
|
util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
|
2015-02-27 00:58:00 +00:00
|
|
|
}
|
2015-07-28 06:26:53 +00:00
|
|
|
*resourceVersion = newResourceVersion
|
|
|
|
r.setLastSyncResourceVersion(newResourceVersion)
|
2015-02-27 00:58:00 +00:00
|
|
|
eventCount++
|
2014-08-03 07:00:42 +00:00
|
|
|
}
|
|
|
|
}
|
2014-09-20 01:09:40 +00:00
|
|
|
|
|
|
|
watchDuration := time.Now().Sub(start)
|
|
|
|
if watchDuration < 1*time.Second && eventCount == 0 {
|
2015-06-18 14:15:30 +00:00
|
|
|
glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
|
2014-09-20 01:09:40 +00:00
|
|
|
return errors.New("very short watch")
|
|
|
|
}
|
2015-06-18 14:15:30 +00:00
|
|
|
glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
|
2014-09-20 01:09:40 +00:00
|
|
|
return nil
|
2014-08-03 07:00:42 +00:00
|
|
|
}
|
2015-03-27 18:17:54 +00:00
|
|
|
|
|
|
|
// LastSyncResourceVersion is the resource version observed when last sync with the underlying store
|
|
|
|
// The value returned is not synchronized with access to the underlying store and is not thread-safe
|
|
|
|
func (r *Reflector) LastSyncResourceVersion() string {
|
2015-05-19 09:24:17 +00:00
|
|
|
r.lastSyncResourceVersionMutex.RLock()
|
|
|
|
defer r.lastSyncResourceVersionMutex.RUnlock()
|
2015-03-27 18:17:54 +00:00
|
|
|
return r.lastSyncResourceVersion
|
|
|
|
}
|
2015-05-19 09:24:17 +00:00
|
|
|
|
|
|
|
func (r *Reflector) setLastSyncResourceVersion(v string) {
|
|
|
|
r.lastSyncResourceVersionMutex.Lock()
|
|
|
|
defer r.lastSyncResourceVersionMutex.Unlock()
|
|
|
|
r.lastSyncResourceVersion = v
|
|
|
|
}
|