|
|
|
@ -421,6 +421,13 @@ func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// StopWatch is used to unsubscribe a channel to a set of MDBTables
|
|
|
|
|
func (s *StateStore) StopWatch(tables MDBTables, notify chan struct{}) {
|
|
|
|
|
for _, t := range tables {
|
|
|
|
|
s.watch[t].Clear(notify)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WatchKV is used to subscribe a channel to changes in KV data
|
|
|
|
|
func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
|
|
|
|
|
s.kvWatchLock.Lock()
|
|
|
|
@ -439,6 +446,18 @@ func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
|
|
|
|
|
s.kvWatch.Insert(prefix, grp)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// StopWatchKV is used to unsubscribe a channel from changes in KV data
|
|
|
|
|
func (s *StateStore) StopWatchKV(prefix string, notify chan struct{}) {
|
|
|
|
|
s.kvWatchLock.Lock()
|
|
|
|
|
defer s.kvWatchLock.Unlock()
|
|
|
|
|
|
|
|
|
|
// Check for an existing notify group
|
|
|
|
|
if raw, ok := s.kvWatch.Get(prefix); ok {
|
|
|
|
|
grp := raw.(*NotifyGroup)
|
|
|
|
|
grp.Clear(notify)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifyKV is used to notify any KV listeners of a change
|
|
|
|
|
// on a prefix
|
|
|
|
|
func (s *StateStore) notifyKV(path string, prefix bool) {
|
|
|
|
|