mirror of https://github.com/k3s-io/k3s
Merge pull request #34435 from wojtek-t/avoid_unnecessary_decoding
Automatic merge from submit-queue Avoid unnecessary decoding in etcd3 client Ref https://github.com/kubernetes/kubernetes/issues/33653 With the "Cacher" layer in Kubernetes, most of the watches processed by "pkg/storage/etcd3/watcher.go" have "filter = Everything()". That said, we generally don't need to decode previous value of the object (which is used only to get the value of filter of it), because we already know it will be true. This PR is basically fixing this problem. Should be merged after https://github.com/kubernetes/kubernetes/pull/34246pull/6/head
commit
b65a07b204
|
@ -328,7 +328,7 @@ func (s *store) watch(ctx context.Context, key string, rv string, pred storage.S
|
|||
return nil, err
|
||||
}
|
||||
key = keyWithPrefix(s.pathPrefix, key)
|
||||
return s.watcher.Watch(ctx, key, int64(rev), recursive, storage.SimpleFilter(pred))
|
||||
return s.watcher.Watch(ctx, key, int64(rev), recursive, pred)
|
||||
}
|
||||
|
||||
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
|
||||
|
|
|
@ -51,7 +51,7 @@ type watchChan struct {
|
|||
key string
|
||||
initialRev int64
|
||||
recursive bool
|
||||
filter storage.FilterFunc
|
||||
internalFilter storage.FilterFunc
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
incomingEventChan chan *event
|
||||
|
@ -73,27 +73,31 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.
|
|||
// If rev is non-zero, it will watch events happened after given revision.
|
||||
// If recursive is false, it watches on given key.
|
||||
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
|
||||
// filter must be non-nil. Only if filter returns true will the changes be returned.
|
||||
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) (watch.Interface, error) {
|
||||
// pred must be non-nil. Only if pred matches the change, it will be returned.
|
||||
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||
if recursive && !strings.HasSuffix(key, "/") {
|
||||
key += "/"
|
||||
}
|
||||
wc := w.createWatchChan(ctx, key, rev, recursive, filter)
|
||||
wc := w.createWatchChan(ctx, key, rev, recursive, pred)
|
||||
go wc.run()
|
||||
return wc, nil
|
||||
}
|
||||
|
||||
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) *watchChan {
|
||||
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan {
|
||||
wc := &watchChan{
|
||||
watcher: w,
|
||||
key: key,
|
||||
initialRev: rev,
|
||||
recursive: recursive,
|
||||
filter: filter,
|
||||
internalFilter: storage.SimpleFilter(pred),
|
||||
incomingEventChan: make(chan *event, incomingBufSize),
|
||||
resultChan: make(chan watch.Event, outgoingBufSize),
|
||||
errChan: make(chan error, 1),
|
||||
}
|
||||
if pred.Label.Empty() && pred.Field.Empty() {
|
||||
// The filter doesn't filter out any object.
|
||||
wc.internalFilter = nil
|
||||
}
|
||||
wc.ctx, wc.cancel = context.WithCancel(ctx)
|
||||
return wc
|
||||
}
|
||||
|
@ -231,16 +235,20 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
|||
}
|
||||
}
|
||||
|
||||
func (wc *watchChan) filter(obj runtime.Object) bool {
|
||||
if wc.internalFilter == nil {
|
||||
return true
|
||||
}
|
||||
return wc.internalFilter(obj)
|
||||
}
|
||||
|
||||
func (wc *watchChan) acceptAll() bool {
|
||||
return wc.internalFilter == nil
|
||||
}
|
||||
|
||||
// transform transforms an event into a result for user if not filtered.
|
||||
// TODO (Optimization):
|
||||
// - Save remote round-trip.
|
||||
// Currently, DELETE and PUT event don't contain the previous value.
|
||||
// We need to do another Get() in order to get previous object and have logic upon it.
|
||||
// We could potentially do some optimizations:
|
||||
// - For PUT, we can save current and previous objects into the value.
|
||||
// - For DELETE, See https://github.com/coreos/etcd/issues/4620
|
||||
func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
||||
curObj, oldObj, err := prepareObjs(wc.ctx, e, wc.watcher.client, wc.watcher.codec, wc.watcher.versioner)
|
||||
curObj, oldObj, err := wc.prepareObjs(e)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to prepare current and previous objects: %v", err)
|
||||
wc.sendError(err)
|
||||
|
@ -265,6 +273,13 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
|||
Object: curObj,
|
||||
}
|
||||
default:
|
||||
if wc.acceptAll() {
|
||||
res = &watch.Event{
|
||||
Type: watch.Modified,
|
||||
Object: curObj,
|
||||
}
|
||||
return res
|
||||
}
|
||||
curObjPasses := wc.filter(curObj)
|
||||
oldObjPasses := wc.filter(oldObj)
|
||||
switch {
|
||||
|
@ -332,19 +347,22 @@ func (wc *watchChan) sendEvent(e *event) {
|
|||
}
|
||||
}
|
||||
|
||||
func prepareObjs(ctx context.Context, e *event, client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner) (curObj runtime.Object, oldObj runtime.Object, err error) {
|
||||
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
|
||||
if !e.isDeleted {
|
||||
curObj, err = decodeObj(codec, versioner, e.value, e.rev)
|
||||
curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.value, e.rev)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
if len(e.prevValue) > 0 {
|
||||
// We need to decode prevValue, only if this is deletion event or
|
||||
// the underlying filter doesn't accept all objects (otherwise we
|
||||
// know that the filter for previous object will return true and
|
||||
// we need the object only to compute whether it was filtered out
|
||||
// before).
|
||||
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
|
||||
// Note that this sends the *old* object with the etcd revision for the time at
|
||||
// which it gets deleted.
|
||||
// We assume old object is returned only in Deleted event. Users (e.g. cacher) need
|
||||
// to have larger than previous rev to tell the ordering.
|
||||
oldObj, err = decodeObj(codec, versioner, e.prevValue, e.rev)
|
||||
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.prevValue, e.rev)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
@ -197,7 +197,7 @@ func TestWatchContextCancel(t *testing.T) {
|
|||
cancel()
|
||||
// When we watch with a canceled context, we should detect that it's context canceled.
|
||||
// We won't take it as error and also close the watcher.
|
||||
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.SimpleFilter(storage.Everything))
|
||||
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -216,7 +216,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
|||
origCtx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, cancel := context.WithCancel(origCtx)
|
||||
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.SimpleFilter(storage.Everything))
|
||||
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything)
|
||||
// make resutlChan and errChan blocking to ensure ordering.
|
||||
w.resultChan = make(chan watch.Event)
|
||||
w.errChan = make(chan error)
|
||||
|
|
Loading…
Reference in New Issue