|
|
|
@ -29,9 +29,10 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if msg.GetCreateRequest() != nil { |
|
|
|
|
w.Start(msg.GetCreateRequest()) |
|
|
|
|
w.Start(ws.Context(), msg.GetCreateRequest()) |
|
|
|
|
} else if msg.GetCancelRequest() != nil { |
|
|
|
|
w.Cancel(msg.GetCancelRequest().WatchId) |
|
|
|
|
logrus.Debugf("WATCH CANCEL REQ id=%d", msg.GetCancelRequest().GetWatchId()) |
|
|
|
|
w.Cancel(msg.GetCancelRequest().WatchId, nil) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -45,11 +46,11 @@ type watcher struct {
|
|
|
|
|
watches map[int64]func() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *watcher) Start(r *etcdserverpb.WatchCreateRequest) { |
|
|
|
|
func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) { |
|
|
|
|
w.Lock() |
|
|
|
|
defer w.Unlock() |
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
|
|
ctx, cancel := context.WithCancel(ctx) |
|
|
|
|
|
|
|
|
|
id := atomic.AddInt64(&watchID, 1) |
|
|
|
|
w.watches[id] = cancel |
|
|
|
@ -57,7 +58,7 @@ func (w *watcher) Start(r *etcdserverpb.WatchCreateRequest) {
|
|
|
|
|
|
|
|
|
|
key := string(r.Key) |
|
|
|
|
|
|
|
|
|
logrus.Debugf("WATCH START id=%d, key=%s, revision=%d", id, key, r.StartRevision) |
|
|
|
|
logrus.Debugf("WATCH START id=%d, count=%d, key=%s, revision=%d", id, len(w.watches), key, r.StartRevision) |
|
|
|
|
|
|
|
|
|
go func() { |
|
|
|
|
defer w.wg.Done() |
|
|
|
@ -66,7 +67,7 @@ func (w *watcher) Start(r *etcdserverpb.WatchCreateRequest) {
|
|
|
|
|
Created: true, |
|
|
|
|
WatchId: id, |
|
|
|
|
}); err != nil { |
|
|
|
|
w.Cancel(id) |
|
|
|
|
w.Cancel(id, err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -86,10 +87,11 @@ func (w *watcher) Start(r *etcdserverpb.WatchCreateRequest) {
|
|
|
|
|
WatchId: id, |
|
|
|
|
Events: toEvents(events...), |
|
|
|
|
}); err != nil { |
|
|
|
|
w.Cancel(id) |
|
|
|
|
w.Cancel(id, err) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
w.Cancel(id, nil) |
|
|
|
|
logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key) |
|
|
|
|
}() |
|
|
|
|
} |
|
|
|
@ -116,28 +118,35 @@ func toEvent(event *Event) *mvccpb.Event {
|
|
|
|
|
return e |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *watcher) Cancel(watchID int64) { |
|
|
|
|
func (w *watcher) Cancel(watchID int64, err error) { |
|
|
|
|
w.Lock() |
|
|
|
|
defer w.Unlock() |
|
|
|
|
if cancel, ok := w.watches[watchID]; ok { |
|
|
|
|
cancel() |
|
|
|
|
delete(w.watches, watchID) |
|
|
|
|
} |
|
|
|
|
err := w.server.Send(&etcdserverpb.WatchResponse{ |
|
|
|
|
Header: &etcdserverpb.ResponseHeader{}, |
|
|
|
|
Canceled: true, |
|
|
|
|
WatchId: watchID, |
|
|
|
|
w.Unlock() |
|
|
|
|
|
|
|
|
|
reason := "" |
|
|
|
|
if err != nil { |
|
|
|
|
reason = err.Error() |
|
|
|
|
} |
|
|
|
|
logrus.Debugf("WATCH CANCEL id=%d reason=%s", watchID, reason) |
|
|
|
|
err = w.server.Send(&etcdserverpb.WatchResponse{ |
|
|
|
|
Header: &etcdserverpb.ResponseHeader{}, |
|
|
|
|
Canceled: true, |
|
|
|
|
CancelReason: "watch closed", |
|
|
|
|
WatchId: watchID, |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
logrus.Errorf("Failed to send cancel response for watchID %d: %v", watchID, err) |
|
|
|
|
logrus.Errorf("WATCH Failed to send cancel response for watchID %d: %v", watchID, err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *watcher) Close() { |
|
|
|
|
w.Lock() |
|
|
|
|
defer w.Unlock() |
|
|
|
|
for _, v := range w.watches { |
|
|
|
|
v() |
|
|
|
|
} |
|
|
|
|
w.Unlock() |
|
|
|
|
w.wg.Wait() |
|
|
|
|
} |
|
|
|
|