diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD index 2d064bac40..46a282969d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD @@ -10,6 +10,7 @@ go_test( name = "go_default_test", srcs = [ "compact_test.go", + "event_test.go", "lease_manager_test.go", "store_test.go", "watcher_test.go", @@ -36,7 +37,10 @@ go_test( "//vendor/github.com/coreos/etcd/clientv3:go_default_library", "//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library", "//vendor/github.com/coreos/etcd/integration:go_default_library", + "//vendor/github.com/coreos/etcd/mvcc/mvccpb:go_default_library", "//vendor/github.com/coreos/pkg/capnslog:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go index 7dc9175bcf..dbaf785b26 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go @@ -17,6 +17,7 @@ limitations under the License. package etcd3 import ( + "fmt" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" ) @@ -42,7 +43,12 @@ func parseKV(kv *mvccpb.KeyValue) *event { } } -func parseEvent(e *clientv3.Event) *event { +func parseEvent(e *clientv3.Event) (*event, error) { + if !e.IsCreate() && e.PrevKv == nil { + // If the previous value is nil, error. One example of how this is possible is if the previous value has been compacted already. + return nil, fmt.Errorf("etcd event received with PrevKv=nil (key=%q, modRevision=%d, type=%s)", string(e.Kv.Key), e.Kv.ModRevision, e.Type.String()) + + } ret := &event{ key: string(e.Kv.Key), value: e.Kv.Value, @@ -53,5 +59,5 @@ func parseEvent(e *clientv3.Event) *event { if e.PrevKv != nil { ret.prevValue = e.PrevKv.Value } - return ret + return ret, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event_test.go new file mode 100644 index 0000000000..0bbcac3296 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +func TestParseEvent(t *testing.T) { + for _, tc := range []struct { + name string + etcdEvent *clientv3.Event + expectedEvent *event + expectedErr string + }{ + { + name: "successful create", + etcdEvent: &clientv3.Event{ + Type: clientv3.EventTypePut, + PrevKv: nil, + Kv: &mvccpb.KeyValue{ + // key is the key in bytes. An empty key is not allowed. + Key: []byte("key"), + ModRevision: 1, + CreateRevision: 1, + Value: []byte("value"), + }, + }, + expectedEvent: &event{ + key: "key", + value: []byte("value"), + prevValue: nil, + rev: 1, + isDeleted: false, + isCreated: true, + }, + expectedErr: "", + }, + { + name: "unsuccessful delete", + etcdEvent: &clientv3.Event{ + Type: mvccpb.DELETE, + PrevKv: nil, + Kv: &mvccpb.KeyValue{ + Key: []byte("key"), + CreateRevision: 1, + ModRevision: 2, + Value: nil, + }, + }, + expectedErr: "etcd event received with PrevKv=nil", + }, + { + name: "successful delete", + etcdEvent: &clientv3.Event{ + Type: mvccpb.DELETE, + PrevKv: &mvccpb.KeyValue{ + Key: []byte("key"), + CreateRevision: 1, + ModRevision: 1, + Value: []byte("value"), + }, + Kv: &mvccpb.KeyValue{ + Key: []byte("key"), + CreateRevision: 1, + ModRevision: 2, + Value: nil, + }, + }, + expectedEvent: &event{ + key: "key", + value: nil, + prevValue: []byte("value"), + rev: 2, + isDeleted: true, + isCreated: false, + }, + expectedErr: "", + }, + } { + t.Run(tc.name, func(t *testing.T) { + actualEvent, err := parseEvent(tc.etcdEvent) + if tc.expectedErr != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.expectedErr) + } else { + require.NoError(t, err) + assert.Equal(t, tc.expectedEvent, actualEvent) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index d450038eff..15437f27e7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -210,7 +210,13 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) { return } for _, e := range wres.Events { - wc.sendEvent(parseEvent(e)) + parsedEvent, err := parseEvent(e) + if err != nil { + klog.Errorf("watch chan error: %v", err) + wc.sendError(err) + return + } + wc.sendEvent(parsedEvent) } } // When we come to this point, it's only possible that client side ends the watch.