Error when etcd3 watch finds delete event with nil prevKV

pull/564/head
Ryan McNamara 2019-04-16 12:55:15 -07:00 committed by Jordan Liggitt
parent ebf834b434
commit c9731088e4
4 changed files with 129 additions and 3 deletions

View File

@ -10,6 +10,7 @@ go_test(
name = "go_default_test",
srcs = [
"compact_test.go",
"event_test.go",
"lease_manager_test.go",
"store_test.go",
"watcher_test.go",
@ -36,7 +37,10 @@ go_test(
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library",
"//vendor/github.com/coreos/etcd/integration:go_default_library",
"//vendor/github.com/coreos/etcd/mvcc/mvccpb:go_default_library",
"//vendor/github.com/coreos/pkg/capnslog:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
)

View File

@ -17,6 +17,7 @@ limitations under the License.
package etcd3
import (
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
)
@ -42,7 +43,12 @@ func parseKV(kv *mvccpb.KeyValue) *event {
}
}
func parseEvent(e *clientv3.Event) *event {
func parseEvent(e *clientv3.Event) (*event, error) {
if !e.IsCreate() && e.PrevKv == nil {
// If the previous value is nil, error. One example of how this is possible is if the previous value has been compacted already.
return nil, fmt.Errorf("etcd event received with PrevKv=nil (key=%q, modRevision=%d, type=%s)", string(e.Kv.Key), e.Kv.ModRevision, e.Type.String())
}
ret := &event{
key: string(e.Kv.Key),
value: e.Kv.Value,
@ -53,5 +59,5 @@ func parseEvent(e *clientv3.Event) *event {
if e.PrevKv != nil {
ret.prevValue = e.PrevKv.Value
}
return ret
return ret, nil
}

View File

@ -0,0 +1,110 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
)
func TestParseEvent(t *testing.T) {
for _, tc := range []struct {
name string
etcdEvent *clientv3.Event
expectedEvent *event
expectedErr string
}{
{
name: "successful create",
etcdEvent: &clientv3.Event{
Type: clientv3.EventTypePut,
PrevKv: nil,
Kv: &mvccpb.KeyValue{
// key is the key in bytes. An empty key is not allowed.
Key: []byte("key"),
ModRevision: 1,
CreateRevision: 1,
Value: []byte("value"),
},
},
expectedEvent: &event{
key: "key",
value: []byte("value"),
prevValue: nil,
rev: 1,
isDeleted: false,
isCreated: true,
},
expectedErr: "",
},
{
name: "unsuccessful delete",
etcdEvent: &clientv3.Event{
Type: mvccpb.DELETE,
PrevKv: nil,
Kv: &mvccpb.KeyValue{
Key: []byte("key"),
CreateRevision: 1,
ModRevision: 2,
Value: nil,
},
},
expectedErr: "etcd event received with PrevKv=nil",
},
{
name: "successful delete",
etcdEvent: &clientv3.Event{
Type: mvccpb.DELETE,
PrevKv: &mvccpb.KeyValue{
Key: []byte("key"),
CreateRevision: 1,
ModRevision: 1,
Value: []byte("value"),
},
Kv: &mvccpb.KeyValue{
Key: []byte("key"),
CreateRevision: 1,
ModRevision: 2,
Value: nil,
},
},
expectedEvent: &event{
key: "key",
value: nil,
prevValue: []byte("value"),
rev: 2,
isDeleted: true,
isCreated: false,
},
expectedErr: "",
},
} {
t.Run(tc.name, func(t *testing.T) {
actualEvent, err := parseEvent(tc.etcdEvent)
if tc.expectedErr != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), tc.expectedErr)
} else {
require.NoError(t, err)
assert.Equal(t, tc.expectedEvent, actualEvent)
}
})
}
}

View File

@ -210,7 +210,13 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
return
}
for _, e := range wres.Events {
wc.sendEvent(parseEvent(e))
parsedEvent, err := parseEvent(e)
if err != nil {
klog.Errorf("watch chan error: %v", err)
wc.sendError(err)
return
}
wc.sendEvent(parsedEvent)
}
}
// When we come to this point, it's only possible that client side ends the watch.