Guts all the old blocking query code.

pull/2671/head
James Phillips 2017-01-24 11:53:02 -08:00
parent 7da2f513dc
commit d97c3c6c18
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
18 changed files with 37 additions and 1438 deletions

View File

@ -11,7 +11,6 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
@ -353,76 +352,6 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
return future.Response(), nil
}
// blockingRPC is used for queries that need to wait for a minimum index. This
// is used to block and wait for changes.
func (s *Server) blockingRPC(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
watch state.Watch, run func() error) error {
var timeout *time.Timer
var notifyCh chan struct{}
// Fast path right to the non-blocking query.
if queryOpts.MinQueryIndex == 0 {
goto RUN_QUERY
}
// Make sure a watch was given if we were asked to block.
if watch == nil {
panic("no watch given for blocking query")
}
// Restrict the max query time, and ensure there is always one.
if queryOpts.MaxQueryTime > maxQueryTime {
queryOpts.MaxQueryTime = maxQueryTime
} else if queryOpts.MaxQueryTime <= 0 {
queryOpts.MaxQueryTime = defaultQueryTime
}
// Apply a small amount of jitter to the request.
queryOpts.MaxQueryTime += lib.RandomStagger(queryOpts.MaxQueryTime / jitterFraction)
// Setup a query timeout.
timeout = time.NewTimer(queryOpts.MaxQueryTime)
// Setup the notify channel.
notifyCh = make(chan struct{}, 1)
// Ensure we tear down any watches on return.
defer func() {
timeout.Stop()
watch.Clear(notifyCh)
}()
REGISTER_NOTIFY:
// Register the notification channel. This may be done multiple times if
// we haven't reached the target wait index.
watch.Wait(notifyCh)
RUN_QUERY:
// Update the query metadata.
s.setQueryMeta(queryMeta)
// If the read must be consistent we verify that we are still the leader.
if queryOpts.RequireConsistent {
if err := s.consistentRead(); err != nil {
return err
}
}
// Run the query.
metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1)
err := run()
// Check for minimum query time.
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
select {
case <-notifyCh:
goto REGISTER_NOTIFY
case <-timeout.C:
}
}
return err
}
// queryFn is used to perform a query operation. If a re-query is needed, the
// passed-in watch set will be used to block for changes.
type queryFn func(memdb.WatchSet) error

View File

@ -26,7 +26,6 @@ func (s *StateRestore) ACL(acl *structs.ACL) error {
return fmt.Errorf("failed updating index: %s", err)
}
s.watches.Arm("acls")
return nil
}
@ -75,7 +74,6 @@ func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) erro
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["acls"].Notify() })
return nil
}
@ -170,6 +168,5 @@ func (s *StateStore) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["acls"].Notify() })
return nil
}

View File

@ -288,27 +288,3 @@ func TestStateStore_ACL_Snapshot_Restore(t *testing.T) {
}
}()
}
func TestStateStore_ACL_Watches(t *testing.T) {
s := testStateStore(t)
// Call functions that update the acls table and make sure a watch fires
// each time.
verifyWatch(t, s.getTableWatch("acls"), func() {
if err := s.ACLSet(1, &structs.ACL{ID: "acl1"}); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("acls"), func() {
if err := s.ACLDelete(2, "acl1"); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("acls"), func() {
restore := s.Restore()
if err := restore.ACL(&structs.ACL{ID: "acl1"}); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
}

View File

@ -42,7 +42,7 @@ func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) {
// performed within a single transaction to avoid race conditions on state
// updates.
func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) error {
if err := s.store.ensureRegistrationTxn(s.tx, idx, s.watches, req); err != nil {
if err := s.store.ensureRegistrationTxn(s.tx, idx, req); err != nil {
return err
}
return nil
@ -55,12 +55,10 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest
tx := s.db.Txn(true)
defer tx.Abort()
watches := NewDumbWatchManager(s.tableWatches)
if err := s.ensureRegistrationTxn(tx, idx, watches, req); err != nil {
if err := s.ensureRegistrationTxn(tx, idx, req); err != nil {
return err
}
tx.Defer(func() { watches.Notify() })
tx.Commit()
return nil
}
@ -68,8 +66,7 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest
// ensureRegistrationTxn is used to make sure a node, service, and check
// registration is performed within a single transaction to avoid race
// conditions on state updates.
func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
req *structs.RegisterRequest) error {
func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.RegisterRequest) error {
// Create a node structure.
node := &structs.Node{
ID: req.ID,
@ -90,7 +87,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D
return fmt.Errorf("node lookup failed: %s", err)
}
if existing == nil || req.ChangesNode(existing.(*structs.Node)) {
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
if err := s.ensureNodeTxn(tx, idx, node); err != nil {
return fmt.Errorf("failed inserting node: %s", err)
}
}
@ -105,7 +102,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D
return fmt.Errorf("failed service lookup: %s", err)
}
if existing == nil || !(existing.(*structs.ServiceNode).ToNodeService()).IsSame(req.Service) {
if err := s.ensureServiceTxn(tx, idx, watches, req.Node, req.Service); err != nil {
if err := s.ensureServiceTxn(tx, idx, req.Node, req.Service); err != nil {
return fmt.Errorf("failed inserting service: %s", err)
}
@ -120,12 +117,12 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D
// Add the checks, if any.
if req.Check != nil {
if err := s.ensureCheckTxn(tx, idx, watches, req.Check); err != nil {
if err := s.ensureCheckTxn(tx, idx, req.Check); err != nil {
return fmt.Errorf("failed inserting check: %s", err)
}
}
for _, check := range req.Checks {
if err := s.ensureCheckTxn(tx, idx, watches, check); err != nil {
if err := s.ensureCheckTxn(tx, idx, check); err != nil {
return fmt.Errorf("failed inserting check: %s", err)
}
}
@ -139,12 +136,10 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
defer tx.Abort()
// Call the node upsert
watches := NewDumbWatchManager(s.tableWatches)
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
if err := s.ensureNodeTxn(tx, idx, node); err != nil {
return err
}
tx.Defer(func() { watches.Notify() })
tx.Commit()
return nil
}
@ -152,8 +147,7 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
// ensureNodeTxn is the inner function called to actually create a node
// registration or modify an existing one in the state store. It allows
// passing in a memdb transaction so it may be part of a larger txn.
func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
node *structs.Node) error {
func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error {
// Check for an existing node
existing, err := tx.First("nodes", "id", node.Node)
if err != nil {
@ -177,7 +171,6 @@ func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, watches *DumbWatch
return fmt.Errorf("failed updating index: %s", err)
}
watches.Arm("nodes")
return nil
}
@ -187,7 +180,7 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) {
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("GetNode")...)
idx := maxIndexTxn(tx, "nodes")
// Retrieve the node from the state store
node, err := tx.First("nodes", "id", id)
@ -280,10 +273,6 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e
return nil
}
// Use a watch manager since the inner functions can perform multiple
// ops per table.
watches := NewDumbWatchManager(s.tableWatches)
// Delete all services associated with the node and update the service index.
services, err := tx.Get("services", "node", nodeName)
if err != nil {
@ -296,7 +285,7 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e
// Do the delete in a separate loop so we don't trash the iterator.
for _, sid := range sids {
if err := s.deleteServiceTxn(tx, idx, watches, nodeName, sid); err != nil {
if err := s.deleteServiceTxn(tx, idx, nodeName, sid); err != nil {
return err
}
}
@ -314,7 +303,7 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e
// Do the delete in a separate loop so we don't trash the iterator.
for _, cid := range cids {
if err := s.deleteCheckTxn(tx, idx, watches, nodeName, cid); err != nil {
if err := s.deleteCheckTxn(tx, idx, nodeName, cid); err != nil {
return err
}
}
@ -331,7 +320,6 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
watches.Arm("coordinates")
}
// Delete the node and update the index.
@ -354,13 +342,11 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e
// Do the delete in a separate loop so we don't trash the iterator.
for _, id := range ids {
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
if err := s.deleteSessionTxn(tx, idx, id); err != nil {
return fmt.Errorf("failed session delete: %s", err)
}
}
watches.Arm("nodes")
tx.Defer(func() { watches.Notify() })
return nil
}
@ -370,20 +356,17 @@ func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeSer
defer tx.Abort()
// Call the service registration upsert
watches := NewDumbWatchManager(s.tableWatches)
if err := s.ensureServiceTxn(tx, idx, watches, node, svc); err != nil {
if err := s.ensureServiceTxn(tx, idx, node, svc); err != nil {
return err
}
tx.Defer(func() { watches.Notify() })
tx.Commit()
return nil
}
// ensureServiceTxn is used to upsert a service registration within an
// existing memdb transaction.
func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
node string, svc *structs.NodeService) error {
func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error {
// Check for existing service
existing, err := tx.First("services", "id", node, svc.ID)
if err != nil {
@ -419,7 +402,6 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
return fmt.Errorf("failed updating index: %s", err)
}
watches.Arm("services")
return nil
}
@ -657,7 +639,7 @@ func (s *StateStore) NodeService(nodeName string, serviceID string) (uint64, *st
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("NodeService")...)
idx := maxIndexTxn(tx, "services")
// Query the service
service, err := tx.First("services", "id", nodeName, serviceID)
@ -719,19 +701,17 @@ func (s *StateStore) DeleteService(idx uint64, nodeName, serviceID string) error
defer tx.Abort()
// Call the service deletion
watches := NewDumbWatchManager(s.tableWatches)
if err := s.deleteServiceTxn(tx, idx, watches, nodeName, serviceID); err != nil {
if err := s.deleteServiceTxn(tx, idx, nodeName, serviceID); err != nil {
return err
}
tx.Defer(func() { watches.Notify() })
tx.Commit()
return nil
}
// deleteServiceTxn is the inner method called to remove a service
// registration within an existing transaction.
func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, nodeName, serviceID string) error {
func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
// Look up the service.
service, err := tx.First("services", "id", nodeName, serviceID)
if err != nil {
@ -754,7 +734,7 @@ func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
// Do the delete in a separate loop so we don't trash the iterator.
for _, cid := range cids {
if err := s.deleteCheckTxn(tx, idx, watches, nodeName, cid); err != nil {
if err := s.deleteCheckTxn(tx, idx, nodeName, cid); err != nil {
return err
}
}
@ -772,7 +752,6 @@ func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
return fmt.Errorf("failed updating index: %s", err)
}
watches.Arm("services")
return nil
}
@ -782,12 +761,10 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
defer tx.Abort()
// Call the check registration
watches := NewDumbWatchManager(s.tableWatches)
if err := s.ensureCheckTxn(tx, idx, watches, hc); err != nil {
if err := s.ensureCheckTxn(tx, idx, hc); err != nil {
return err
}
tx.Defer(func() { watches.Notify() })
tx.Commit()
return nil
}
@ -795,8 +772,7 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
// ensureCheckTransaction is used as the inner method to handle inserting
// a health check into the state store. It ensures safety against inserting
// checks with no matching node or service.
func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
hc *structs.HealthCheck) error {
func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error {
// Check if we have an existing health check
existing, err := tx.First("checks", "id", hc.Node, string(hc.CheckID))
if err != nil {
@ -855,13 +831,11 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc
// Delete the session in a separate loop so we don't trash the
// iterator.
watches := NewDumbWatchManager(s.tableWatches)
for _, id := range ids {
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
if err := s.deleteSessionTxn(tx, idx, id); err != nil {
return fmt.Errorf("failed deleting session: %s", err)
}
}
tx.Defer(func() { watches.Notify() })
}
// Persist the check registration in the db.
@ -872,7 +846,6 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc
return fmt.Errorf("failed updating index: %s", err)
}
watches.Arm("checks")
return nil
}
@ -1068,19 +1041,17 @@ func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID)
defer tx.Abort()
// Call the check deletion
watches := NewDumbWatchManager(s.tableWatches)
if err := s.deleteCheckTxn(tx, idx, watches, node, checkID); err != nil {
if err := s.deleteCheckTxn(tx, idx, node, checkID); err != nil {
return err
}
tx.Defer(func() { watches.Notify() })
tx.Commit()
return nil
}
// deleteCheckTxn is the inner method used to call a health
// check deletion within an existing transaction.
func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, node string, checkID types.CheckID) error {
func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error {
// Try to retrieve the existing health check.
hc, err := tx.First("checks", "id", node, string(checkID))
if err != nil {
@ -1110,12 +1081,11 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc
// Do the delete in a separate loop so we don't trash the iterator.
for _, id := range ids {
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
if err := s.deleteSessionTxn(tx, idx, id); err != nil {
return fmt.Errorf("failed deleting session: %s", err)
}
}
watches.Arm("checks")
return nil
}
@ -1276,7 +1246,7 @@ func (s *StateStore) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, erro
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("NodeDump")...)
idx := maxIndexTxn(tx, "nodes", "services", "checks")
// Fetch all of the registered nodes
nodes, err := tx.Get("nodes", "id")

View File

@ -314,97 +314,6 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
}()
}
func TestStateStore_EnsureRegistration_Watches(t *testing.T) {
s := testStateStore(t)
// With the new diffing logic for the node and service structures, we
// need to twiddle the request to get the expected watch to fire for
// the restore cases below.
req := &structs.RegisterRequest{
Node: "node1",
Address: "1.2.3.4",
}
// The nodes watch should fire for this one.
verifyWatch(t, s.getTableWatch("nodes"), func() {
verifyNoWatch(t, s.getTableWatch("services"), func() {
verifyNoWatch(t, s.getTableWatch("checks"), func() {
if err := s.EnsureRegistration(1, req); err != nil {
t.Fatalf("err: %s", err)
}
})
})
})
// The nodes watch should fire for this one.
verifyWatch(t, s.getTableWatch("nodes"), func() {
verifyNoWatch(t, s.getTableWatch("services"), func() {
verifyNoWatch(t, s.getTableWatch("checks"), func() {
req.Address = "1.2.3.5"
restore := s.Restore()
if err := restore.Registration(1, req); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
})
})
// With a service definition added it should fire just services.
req.Service = &structs.NodeService{
ID: "redis1",
Service: "redis",
Address: "1.1.1.1",
Port: 8080,
}
verifyNoWatch(t, s.getTableWatch("nodes"), func() {
verifyWatch(t, s.getTableWatch("services"), func() {
verifyNoWatch(t, s.getTableWatch("checks"), func() {
if err := s.EnsureRegistration(2, req); err != nil {
t.Fatalf("err: %s", err)
}
})
})
})
verifyNoWatch(t, s.getTableWatch("nodes"), func() {
verifyWatch(t, s.getTableWatch("services"), func() {
verifyNoWatch(t, s.getTableWatch("checks"), func() {
req.Service.Address = "1.1.1.2"
restore := s.Restore()
if err := restore.Registration(2, req); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
})
})
// Adding a check should just affect checks.
req.Check = &structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Name: "check",
}
verifyNoWatch(t, s.getTableWatch("nodes"), func() {
verifyNoWatch(t, s.getTableWatch("services"), func() {
verifyWatch(t, s.getTableWatch("checks"), func() {
if err := s.EnsureRegistration(3, req); err != nil {
t.Fatalf("err: %s", err)
}
})
})
})
verifyNoWatch(t, s.getTableWatch("nodes"), func() {
verifyNoWatch(t, s.getTableWatch("services"), func() {
verifyWatch(t, s.getTableWatch("checks"), func() {
restore := s.Restore()
if err := restore.Registration(3, req); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
})
})
}
func TestStateStore_EnsureNode(t *testing.T) {
s := testStateStore(t)
@ -734,58 +643,6 @@ func TestStateStore_Node_Snapshot(t *testing.T) {
}
}
func TestStateStore_Node_Watches(t *testing.T) {
s := testStateStore(t)
// Call functions that update the nodes table and make sure a watch fires
// each time.
verifyWatch(t, s.getTableWatch("nodes"), func() {
req := &structs.RegisterRequest{
Node: "node1",
}
if err := s.EnsureRegistration(1, req); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("nodes"), func() {
node := &structs.Node{Node: "node2"}
if err := s.EnsureNode(2, node); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("nodes"), func() {
if err := s.DeleteNode(3, "node2"); err != nil {
t.Fatalf("err: %s", err)
}
})
// Check that a delete of a node + service + check + coordinate triggers
// all tables in one shot.
testRegisterNode(t, s, 4, "node1")
testRegisterService(t, s, 5, "node1", "service1")
testRegisterCheck(t, s, 6, "node1", "service1", "check3", structs.HealthPassing)
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(7, updates); err != nil {
t.Fatalf("err: %s", err)
}
verifyWatch(t, s.getTableWatch("nodes"), func() {
verifyWatch(t, s.getTableWatch("services"), func() {
verifyWatch(t, s.getTableWatch("checks"), func() {
verifyWatch(t, s.getTableWatch("coordinates"), func() {
if err := s.DeleteNode(7, "node1"); err != nil {
t.Fatalf("err: %s", err)
}
})
})
})
})
}
func TestStateStore_EnsureService(t *testing.T) {
s := testStateStore(t)
@ -1546,43 +1403,6 @@ func TestStateStore_Service_Snapshot(t *testing.T) {
}
}
func TestStateStore_Service_Watches(t *testing.T) {
s := testStateStore(t)
testRegisterNode(t, s, 0, "node1")
ns := &structs.NodeService{
ID: "service2",
Service: "nomad",
Address: "1.1.1.2",
Port: 8000,
}
// Call functions that update the services table and make sure a watch
// fires each time.
verifyWatch(t, s.getTableWatch("services"), func() {
if err := s.EnsureService(2, "node1", ns); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("services"), func() {
if err := s.DeleteService(3, "node1", "service2"); err != nil {
t.Fatalf("err: %s", err)
}
})
// Check that a delete of a service + check triggers both tables in one
// shot.
testRegisterService(t, s, 4, "node1", "service1")
testRegisterCheck(t, s, 5, "node1", "service1", "check3", structs.HealthPassing)
verifyWatch(t, s.getTableWatch("services"), func() {
verifyWatch(t, s.getTableWatch("checks"), func() {
if err := s.DeleteService(6, "node1", "service1"); err != nil {
t.Fatalf("err: %s", err)
}
})
})
}
func TestStateStore_EnsureCheck(t *testing.T) {
s := testStateStore(t)
@ -2465,36 +2285,6 @@ func TestStateStore_Check_Snapshot(t *testing.T) {
}
}
func TestStateStore_Check_Watches(t *testing.T) {
s := testStateStore(t)
testRegisterNode(t, s, 0, "node1")
hc := &structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Status: structs.HealthPassing,
}
// Call functions that update the checks table and make sure a watch fires
// each time.
verifyWatch(t, s.getTableWatch("checks"), func() {
if err := s.EnsureCheck(1, hc); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("checks"), func() {
hc.Status = structs.HealthCritical
if err := s.EnsureCheck(2, hc); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("checks"), func() {
if err := s.DeleteCheck(3, "node1", "check1"); err != nil {
t.Fatalf("err: %s", err)
}
})
}
func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
s := testStateStore(t)

View File

@ -31,7 +31,6 @@ func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) erro
return fmt.Errorf("failed updating index: %s", err)
}
s.watches.Arm("coordinates")
return nil
}
@ -113,7 +112,6 @@ func (s *StateStore) CoordinateBatchUpdate(idx uint64, updates structs.Coordinat
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["coordinates"].Notify() })
tx.Commit()
return nil
}

View File

@ -284,28 +284,3 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
}()
}
func TestStateStore_Coordinate_Watches(t *testing.T) {
s := testStateStore(t)
testRegisterNode(t, s, 1, "node1")
// Call functions that update the coordinates table and make sure a watch fires
// each time.
verifyWatch(t, s.getTableWatch("coordinates"), func() {
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(2, updates); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("coordinates"), func() {
if err := s.DeleteNode(3, "node1"); err != nil {
t.Fatalf("err: %s", err)
}
})
}

View File

@ -32,9 +32,6 @@ func (s *StateRestore) KVS(entry *structs.DirEntry) error {
if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, "kvs"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
// We have a single top-level KVS watch trigger instead of doing
// tons of prefix watches.
return nil
}
@ -114,7 +111,6 @@ func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntr
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.kvsWatch.Notify(entry.Key, false) })
return nil
}
@ -316,7 +312,6 @@ func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.kvsWatch.Notify(key, false) })
return nil
}
@ -455,7 +450,6 @@ func (s *StateStore) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string)
// Update the index
if modified {
tx.Defer(func() { s.kvsWatch.Notify(prefix, true) })
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}

View File

@ -1374,142 +1374,6 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
}()
}
func TestStateStore_KVS_Watches(t *testing.T) {
s := testStateStore(t)
// This is used when locking down below.
testRegisterNode(t, s, 1, "node1")
session := testUUID()
if err := s.SessionCreate(2, &structs.Session{ID: session, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
// An empty prefix watch should hit on all KVS ops, and some other
// prefix should not be affected ever. We also add a positive prefix
// match.
verifyWatch(t, s.GetKVSWatch(""), func() {
verifyWatch(t, s.GetKVSWatch("a"), func() {
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
if err := s.KVSSet(1, &structs.DirEntry{Key: "aaa"}); err != nil {
t.Fatalf("err: %s", err)
}
})
})
})
verifyWatch(t, s.GetKVSWatch(""), func() {
verifyWatch(t, s.GetKVSWatch("a"), func() {
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
if err := s.KVSSet(2, &structs.DirEntry{Key: "aaa"}); err != nil {
t.Fatalf("err: %s", err)
}
})
})
})
// Restore just fires off a top-level watch, so we should get hits on
// any prefix, including ones for keys that aren't in there.
verifyWatch(t, s.GetKVSWatch(""), func() {
verifyWatch(t, s.GetKVSWatch("b"), func() {
verifyWatch(t, s.GetKVSWatch("/nope"), func() {
restore := s.Restore()
if err := restore.KVS(&structs.DirEntry{Key: "bbb"}); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
})
})
verifyWatch(t, s.GetKVSWatch(""), func() {
verifyWatch(t, s.GetKVSWatch("a"), func() {
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
if err := s.KVSDelete(3, "aaa"); err != nil {
t.Fatalf("err: %s", err)
}
})
})
})
verifyWatch(t, s.GetKVSWatch(""), func() {
verifyWatch(t, s.GetKVSWatch("a"), func() {
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
if ok, err := s.KVSSetCAS(4, &structs.DirEntry{Key: "aaa"}); !ok || err != nil {
t.Fatalf("ok: %v err: %s", ok, err)
}
})
})
})
verifyWatch(t, s.GetKVSWatch(""), func() {
verifyWatch(t, s.GetKVSWatch("a"), func() {
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
if ok, err := s.KVSLock(5, &structs.DirEntry{Key: "aaa", Session: session}); !ok || err != nil {
t.Fatalf("ok: %v err: %s", ok, err)
}
})
})
})
verifyWatch(t, s.GetKVSWatch(""), func() {
verifyWatch(t, s.GetKVSWatch("a"), func() {
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
if ok, err := s.KVSUnlock(6, &structs.DirEntry{Key: "aaa", Session: session}); !ok || err != nil {
t.Fatalf("ok: %v err: %s", ok, err)
}
})
})
})
verifyWatch(t, s.GetKVSWatch(""), func() {
verifyWatch(t, s.GetKVSWatch("a"), func() {
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
if err := s.KVSDeleteTree(7, "aaa"); err != nil {
t.Fatalf("err: %s", err)
}
})
})
})
// A delete tree operation at the top level will notify all the watches.
verifyWatch(t, s.GetKVSWatch(""), func() {
verifyWatch(t, s.GetKVSWatch("a"), func() {
verifyWatch(t, s.GetKVSWatch("/nope"), func() {
if err := s.KVSDeleteTree(8, ""); err != nil {
t.Fatalf("err: %s", err)
}
})
})
})
// Create a more interesting tree.
testSetKey(t, s, 9, "foo/bar", "bar")
testSetKey(t, s, 10, "foo/bar/baz", "baz")
testSetKey(t, s, 11, "foo/bar/zip", "zip")
testSetKey(t, s, 12, "foo/zorp", "zorp")
// Deleting just the foo/bar key should not trigger watches on the
// children.
verifyWatch(t, s.GetKVSWatch("foo/bar"), func() {
verifyNoWatch(t, s.GetKVSWatch("foo/bar/baz"), func() {
verifyNoWatch(t, s.GetKVSWatch("foo/bar/zip"), func() {
if err := s.KVSDelete(13, "foo/bar"); err != nil {
t.Fatalf("err: %s", err)
}
})
})
})
// But a delete tree from that point should notify the whole subtree,
// even for keys that don't exist.
verifyWatch(t, s.GetKVSWatch("foo/bar"), func() {
verifyWatch(t, s.GetKVSWatch("foo/bar/baz"), func() {
verifyWatch(t, s.GetKVSWatch("foo/bar/zip"), func() {
verifyWatch(t, s.GetKVSWatch("foo/bar/uh/nope"), func() {
if err := s.KVSDeleteTree(14, "foo/bar"); err != nil {
t.Fatalf("err: %s", err)
}
})
})
})
})
}
func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
s := testStateStore(t)

View File

@ -75,7 +75,6 @@ func (s *StateRestore) PreparedQuery(query *structs.PreparedQuery) error {
return fmt.Errorf("failed updating index: %s", err)
}
s.watches.Arm("prepared-queries")
return nil
}
@ -193,7 +192,6 @@ func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *struc
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["prepared-queries"].Notify() })
return nil
}
@ -202,20 +200,17 @@ func (s *StateStore) PreparedQueryDelete(idx uint64, queryID string) error {
tx := s.db.Txn(true)
defer tx.Abort()
watches := NewDumbWatchManager(s.tableWatches)
if err := s.preparedQueryDeleteTxn(tx, idx, watches, queryID); err != nil {
if err := s.preparedQueryDeleteTxn(tx, idx, queryID); err != nil {
return fmt.Errorf("failed prepared query delete: %s", err)
}
tx.Defer(func() { watches.Notify() })
tx.Commit()
return nil
}
// preparedQueryDeleteTxn is the inner method used to delete a prepared query
// with the proper indexes into the state store.
func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
queryID string) error {
func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, queryID string) error {
// Pull the query.
wrapped, err := tx.First("prepared-queries", "id", queryID)
if err != nil {
@ -233,7 +228,6 @@ func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, watches *
return fmt.Errorf("failed updating index: %s", err)
}
watches.Arm("prepared-queries")
return nil
}
@ -262,7 +256,7 @@ func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *struct
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryResolve")...)
idx := maxIndexTxn(tx, "prepared-queries")
// Explicitly ban an empty query. This will never match an ID and the
// schema is set up so it will never match a query with an empty name,
@ -337,7 +331,7 @@ func (s *StateStore) PreparedQueryList(ws memdb.WatchSet) (uint64, structs.Prepa
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryList")...)
idx := maxIndexTxn(tx, "prepared-queries")
// Query all of the prepared queries in the state store.
queries, err := tx.Get("prepared-queries", "id")

View File

@ -972,38 +972,3 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
}
}()
}
func TestStateStore_PreparedQuery_Watches(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
query := &structs.PreparedQuery{
ID: testUUID(),
Service: structs.ServiceQuery{
Service: "redis",
},
}
// Call functions that update the queries table and make sure a watch
// fires each time.
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
if err := s.PreparedQuerySet(3, query); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
if err := s.PreparedQueryDelete(4, query.ID); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
restore := s.Restore()
if err := restore.PreparedQuery(query); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
}

View File

@ -42,7 +42,6 @@ func (s *StateRestore) Session(sess *structs.Session) error {
return fmt.Errorf("failed updating index: %s", err)
}
s.watches.Arm("sessions")
return nil
}
@ -140,7 +139,6 @@ func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.S
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["sessions"].Notify() })
return nil
}
@ -220,19 +218,17 @@ func (s *StateStore) SessionDestroy(idx uint64, sessionID string) error {
defer tx.Abort()
// Call the session deletion.
watches := NewDumbWatchManager(s.tableWatches)
if err := s.deleteSessionTxn(tx, idx, watches, sessionID); err != nil {
if err := s.deleteSessionTxn(tx, idx, sessionID); err != nil {
return err
}
tx.Defer(func() { watches.Notify() })
tx.Commit()
return nil
}
// deleteSessionTxn is the inner method, which is used to do the actual
// session deletion and handle session invalidation, watch triggers, etc.
func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, sessionID string) error {
// session deletion and handle session invalidation, etc.
func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, sessionID string) error {
// Look up the session.
sess, err := tx.First("sessions", "id", sessionID)
if err != nil {
@ -337,12 +333,11 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
// Do the delete in a separate loop so we don't trash the iterator.
for _, id := range ids {
if err := s.preparedQueryDeleteTxn(tx, idx, watches, id); err != nil {
if err := s.preparedQueryDeleteTxn(tx, idx, id); err != nil {
return fmt.Errorf("failed prepared query delete: %s", err)
}
}
}
watches.Arm("sessions")
return nil
}

View File

@ -504,44 +504,6 @@ func TestStateStore_Session_Snapshot_Restore(t *testing.T) {
}()
}
func TestStateStore_Session_Watches(t *testing.T) {
s := testStateStore(t)
// Register a test node.
testRegisterNode(t, s, 1, "node1")
// This just covers the basics. The session invalidation tests above
// cover the more nuanced multiple table watches.
session := testUUID()
verifyWatch(t, s.getTableWatch("sessions"), func() {
sess := &structs.Session{
ID: session,
Node: "node1",
Behavior: structs.SessionKeysDelete,
}
if err := s.SessionCreate(2, sess); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("sessions"), func() {
if err := s.SessionDestroy(3, session); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("sessions"), func() {
restore := s.Restore()
sess := &structs.Session{
ID: session,
Node: "node1",
Behavior: structs.SessionKeysDelete,
}
if err := restore.Session(sess); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
}
func TestStateStore_Session_Invalidate_DeleteNode(t *testing.T) {
s := testStateStore(t)

View File

@ -54,12 +54,6 @@ type StateStore struct {
// abandoned (usually during a restore). This is only ever closed.
abandonCh chan struct{}
// tableWatches holds all the full table watches, indexed by table name.
tableWatches map[string]*FullTableWatch
// kvsWatch holds the special prefix watch for the key value store.
kvsWatch *PrefixWatchManager
// kvsGraveyard manages tombstones for the key value store.
kvsGraveyard *Graveyard
@ -80,7 +74,6 @@ type StateSnapshot struct {
type StateRestore struct {
store *StateStore
tx *memdb.Txn
watches *DumbWatchManager
}
// IndexEntry keeps a record of the last index per-table.
@ -108,23 +101,11 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
return nil, fmt.Errorf("Failed setting up state store: %s", err)
}
// Build up the all-table watches.
tableWatches := make(map[string]*FullTableWatch)
for table, _ := range schema.Tables {
if table == "kvs" || table == "tombstones" {
continue
}
tableWatches[table] = NewFullTableWatch()
}
// Create and return the state store.
s := &StateStore{
schema: schema,
db: db,
abandonCh: make(chan struct{}),
tableWatches: tableWatches,
kvsWatch: NewPrefixWatchManager(),
kvsGraveyard: NewGraveyard(gc),
lockDelay: NewDelay(),
}
@ -159,8 +140,7 @@ func (s *StateSnapshot) Close() {
// transaction.
func (s *StateStore) Restore() *StateRestore {
tx := s.db.Txn(true)
watches := NewDumbWatchManager(s.tableWatches)
return &StateRestore{s, tx, watches}
return &StateRestore{s, tx}
}
// Abort abandons the changes made by a restore. This or Commit should always be
@ -172,11 +152,6 @@ func (s *StateRestore) Abort() {
// Commit commits the changes made by a restore. This or Abort should always be
// called.
func (s *StateRestore) Commit() {
// Fire off a single KVS watch instead of a zillion prefix ones, and use
// a dumb watch manager to single-fire all the full table watches.
s.tx.Defer(func() { s.store.kvsWatch.Notify("", true) })
s.tx.Defer(func() { s.watches.Notify() })
s.tx.Commit()
}
@ -237,64 +212,3 @@ func indexUpdateMaxTxn(tx *memdb.Txn, idx uint64, table string) error {
return nil
}
// getWatchTables returns the list of tables that should be watched and used for
// max index calculations for the given query method. This is used for all
// methods except for KVS. This will panic if the method is unknown.
func (s *StateStore) getWatchTables(method string) []string {
switch method {
case "GetNode", "Nodes":
return []string{"nodes"}
case "Services":
return []string{"services"}
case "NodeService", "NodeServices", "ServiceNodes":
return []string{"nodes", "services"}
case "NodeCheck", "NodeChecks", "ServiceChecks", "ChecksInState":
return []string{"checks"}
case "ChecksInStateByNodeMeta", "ServiceChecksByNodeMeta":
return []string{"nodes", "checks"}
case "CheckServiceNodes", "NodeInfo", "NodeDump":
return []string{"nodes", "services", "checks"}
case "SessionGet", "SessionList", "NodeSessions":
return []string{"sessions"}
case "ACLGet", "ACLList":
return []string{"acls"}
case "Coordinates":
return []string{"coordinates"}
case "PreparedQueryGet", "PreparedQueryResolve", "PreparedQueryList":
return []string{"prepared-queries"}
}
panic(fmt.Sprintf("Unknown method %s", method))
}
// getTableWatch returns a full table watch for the given table. This will panic
// if the table doesn't have a full table watch.
func (s *StateStore) getTableWatch(table string) Watch {
if watch, ok := s.tableWatches[table]; ok {
return watch
}
panic(fmt.Sprintf("Unknown watch for table %s", table))
}
// GetQueryWatch returns a watch for the given query method. This is
// used for all methods except for KV; you should call GetKVSWatch instead.
// This will panic if the method is unknown.
func (s *StateStore) GetQueryWatch(method string) Watch {
tables := s.getWatchTables(method)
if len(tables) == 1 {
return s.getTableWatch(tables[0])
}
var watches []Watch
for _, table := range tables {
watches = append(watches, s.getTableWatch(table))
}
return NewMultiWatch(watches...)
}
// GetKVSWatch returns a watch for the given prefix in the key value store.
func (s *StateStore) GetKVSWatch(prefix string) Watch {
return s.kvsWatch.NewPrefixWatch(prefix)
}

View File

@ -203,50 +203,3 @@ func TestStateStore_indexUpdateMaxTxn(t *testing.T) {
t.Fatalf("bad max: %d", max)
}
}
func TestStateStore_GetWatches(t *testing.T) {
s := testStateStore(t)
// This test does two things - it makes sure there's no full table
// watch for KVS, and it makes sure that asking for a watch that
// doesn't exist causes a panic.
func() {
defer func() {
if r := recover(); r == nil {
t.Fatalf("didn't get expected panic")
}
}()
s.getTableWatch("kvs")
}()
// Similar for tombstones; those don't support watches at all.
func() {
defer func() {
if r := recover(); r == nil {
t.Fatalf("didn't get expected panic")
}
}()
s.getTableWatch("tombstones")
}()
// Make sure requesting a bogus method causes a panic.
func() {
defer func() {
if r := recover(); r == nil {
t.Fatalf("didn't get expected panic")
}
}()
s.GetQueryWatch("dogs")
}()
// Request valid watches.
if w := s.GetQueryWatch("Nodes"); w == nil {
t.Fatalf("didn't get a watch")
}
if w := s.GetQueryWatch("NodeDump"); w == nil {
t.Fatalf("didn't get a watch")
}
if w := s.GetKVSWatch("/dogs"); w == nil {
t.Fatalf("didn't get a watch")
}
}

View File

@ -711,84 +711,3 @@ func TestStateStore_Txn_KVS_RO_Safety(t *testing.T) {
}
}
}
func TestStateStore_Txn_Watches(t *testing.T) {
s := testStateStore(t)
// Verify that a basic transaction triggers multiple watches. We call
// the same underlying methods that are called above so this is more
// of a sanity check.
verifyWatch(t, s.GetKVSWatch("multi/one"), func() {
verifyWatch(t, s.GetKVSWatch("multi/two"), func() {
ops := structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/one",
Value: []byte("one"),
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/two",
Value: []byte("two"),
},
},
},
}
results, errors := s.TxnRW(15, ops)
if len(results) != len(ops) {
t.Fatalf("bad len: %d != %d", len(results), len(ops))
}
if len(errors) != 0 {
t.Fatalf("bad len: %d != 0", len(errors))
}
})
})
// Verify that a rolled back transaction doesn't trigger any watches.
verifyNoWatch(t, s.GetKVSWatch("multi/one"), func() {
verifyNoWatch(t, s.GetKVSWatch("multi/two"), func() {
ops := structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/one",
Value: []byte("one-updated"),
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/two",
Value: []byte("two-updated"),
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "multi/nope",
Value: []byte("nope"),
},
},
},
}
results, errors := s.TxnRW(16, ops)
if len(errors) != 1 {
t.Fatalf("bad len: %d != 1", len(errors))
}
if len(results) != 0 {
t.Fatalf("bad len: %d != 0", len(results))
}
})
})
}

View File

@ -1,219 +0,0 @@
package state
import (
"fmt"
"sync"
"github.com/armon/go-radix"
)
// Watch is the external interface that's common to all the different flavors.
type Watch interface {
// Wait registers the given channel and calls it back when the watch
// fires.
Wait(notifyCh chan struct{})
// Clear deregisters the given channel.
Clear(notifyCh chan struct{})
}
// FullTableWatch implements a single notify group for a table.
type FullTableWatch struct {
group NotifyGroup
}
// NewFullTableWatch returns a new full table watch.
func NewFullTableWatch() *FullTableWatch {
return &FullTableWatch{}
}
// See Watch.
func (w *FullTableWatch) Wait(notifyCh chan struct{}) {
w.group.Wait(notifyCh)
}
// See Watch.
func (w *FullTableWatch) Clear(notifyCh chan struct{}) {
w.group.Clear(notifyCh)
}
// Notify wakes up all the watchers registered for this table.
func (w *FullTableWatch) Notify() {
w.group.Notify()
}
// DumbWatchManager is a wrapper that allows nested code to arm full table
// watches multiple times but fire them only once. This doesn't have any
// way to clear the state, and it's not thread-safe, so it should be used once
// and thrown away inside the context of a single thread.
type DumbWatchManager struct {
// tableWatches holds the full table watches.
tableWatches map[string]*FullTableWatch
// armed tracks whether the table should be notified.
armed map[string]bool
}
// NewDumbWatchManager returns a new dumb watch manager.
func NewDumbWatchManager(tableWatches map[string]*FullTableWatch) *DumbWatchManager {
return &DumbWatchManager{
tableWatches: tableWatches,
armed: make(map[string]bool),
}
}
// Arm arms the given table's watch.
func (d *DumbWatchManager) Arm(table string) {
if _, ok := d.tableWatches[table]; !ok {
panic(fmt.Sprintf("unknown table: %s", table))
}
if _, ok := d.armed[table]; !ok {
d.armed[table] = true
}
}
// Notify fires watches for all the armed tables.
func (d *DumbWatchManager) Notify() {
for table, _ := range d.armed {
d.tableWatches[table].Notify()
}
}
// PrefixWatch provides a Watch-compatible interface for a PrefixWatchManager,
// bound to a specific prefix.
type PrefixWatch struct {
// manager is the underlying watch manager.
manager *PrefixWatchManager
// prefix is the prefix we are watching.
prefix string
}
// Wait registers the given channel with the notify group for our prefix.
func (w *PrefixWatch) Wait(notifyCh chan struct{}) {
w.manager.Wait(w.prefix, notifyCh)
}
// Clear deregisters the given channel from the the notify group for our prefix.
func (w *PrefixWatch) Clear(notifyCh chan struct{}) {
w.manager.Clear(w.prefix, notifyCh)
}
// PrefixWatchManager maintains a notify group for each prefix, allowing for
// much more fine-grained watches.
type PrefixWatchManager struct {
// watches has the set of notify groups, organized by prefix.
watches *radix.Tree
// lock protects the watches tree.
lock sync.Mutex
}
// NewPrefixWatchManager returns a new prefix watch manager.
func NewPrefixWatchManager() *PrefixWatchManager {
return &PrefixWatchManager{
watches: radix.New(),
}
}
// NewPrefixWatch returns a Watch-compatible interface for watching the given
// prefix.
func (w *PrefixWatchManager) NewPrefixWatch(prefix string) Watch {
return &PrefixWatch{
manager: w,
prefix: prefix,
}
}
// Wait registers the given channel on a prefix.
func (w *PrefixWatchManager) Wait(prefix string, notifyCh chan struct{}) {
w.lock.Lock()
defer w.lock.Unlock()
var group *NotifyGroup
if raw, ok := w.watches.Get(prefix); ok {
group = raw.(*NotifyGroup)
} else {
group = &NotifyGroup{}
w.watches.Insert(prefix, group)
}
group.Wait(notifyCh)
}
// Clear deregisters the given channel from the notify group for a prefix (if
// one exists).
func (w *PrefixWatchManager) Clear(prefix string, notifyCh chan struct{}) {
w.lock.Lock()
defer w.lock.Unlock()
if raw, ok := w.watches.Get(prefix); ok {
group := raw.(*NotifyGroup)
group.Clear(notifyCh)
}
}
// Notify wakes up all the watchers associated with the given prefix. If subtree
// is true then we will also notify all the tree under the prefix, such as when
// a key is being deleted.
func (w *PrefixWatchManager) Notify(prefix string, subtree bool) {
w.lock.Lock()
defer w.lock.Unlock()
var cleanup []string
fn := func(k string, raw interface{}) bool {
group := raw.(*NotifyGroup)
group.Notify()
if k != "" {
cleanup = append(cleanup, k)
}
return false
}
// Invoke any watcher on the path downward to the key.
w.watches.WalkPath(prefix, fn)
// If the entire prefix may be affected (e.g. delete tree),
// invoke the entire prefix.
if subtree {
w.watches.WalkPrefix(prefix, fn)
}
// Delete the old notify groups.
for i := len(cleanup) - 1; i >= 0; i-- {
w.watches.Delete(cleanup[i])
}
// TODO (slackpad) If a watch never fires then we will never clear it
// out of the tree. The old state store had the same behavior, so this
// has been around for a while. We should probably add a prefix scan
// with a function that clears out any notify groups that are empty.
}
// MultiWatch wraps several watches and allows any of them to trigger the
// caller.
type MultiWatch struct {
// watches holds the list of subordinate watches to forward events to.
watches []Watch
}
// NewMultiWatch returns a new new multi watch over the given set of watches.
func NewMultiWatch(watches ...Watch) *MultiWatch {
return &MultiWatch{
watches: watches,
}
}
// See Watch.
func (w *MultiWatch) Wait(notifyCh chan struct{}) {
for _, watch := range w.watches {
watch.Wait(notifyCh)
}
}
// See Watch.
func (w *MultiWatch) Clear(notifyCh chan struct{}) {
for _, watch := range w.watches {
watch.Clear(notifyCh)
}
}

View File

@ -1,377 +0,0 @@
package state
import (
"sort"
"strings"
"testing"
)
// verifyWatch will set up a watch channel, call the given function, and then
// make sure the watch fires.
func verifyWatch(t *testing.T, watch Watch, fn func()) {
ch := make(chan struct{}, 1)
watch.Wait(ch)
fn()
select {
case <-ch:
default:
t.Fatalf("watch should have been notified")
}
}
// verifyNoWatch will set up a watch channel, call the given function, and then
// make sure the watch never fires.
func verifyNoWatch(t *testing.T, watch Watch, fn func()) {
ch := make(chan struct{}, 1)
watch.Wait(ch)
fn()
select {
case <-ch:
t.Fatalf("watch should not been notified")
default:
}
}
func TestWatch_FullTableWatch(t *testing.T) {
w := NewFullTableWatch()
// Test the basic trigger with a single watcher.
verifyWatch(t, w, func() {
w.Notify()
})
// Run multiple watchers and make sure they both fire.
verifyWatch(t, w, func() {
verifyWatch(t, w, func() {
w.Notify()
})
})
// Make sure clear works.
ch := make(chan struct{}, 1)
w.Wait(ch)
w.Clear(ch)
w.Notify()
select {
case <-ch:
t.Fatalf("watch should not have been notified")
default:
}
// Make sure notify is a one shot.
w.Wait(ch)
w.Notify()
select {
case <-ch:
default:
t.Fatalf("watch should have been notified")
}
w.Notify()
select {
case <-ch:
t.Fatalf("watch should not have been notified")
default:
}
}
func TestWatch_DumbWatchManager(t *testing.T) {
watches := map[string]*FullTableWatch{
"alice": NewFullTableWatch(),
"bob": NewFullTableWatch(),
"carol": NewFullTableWatch(),
}
// Notify with nothing armed and make sure nothing triggers.
func() {
w := NewDumbWatchManager(watches)
verifyNoWatch(t, watches["alice"], func() {
verifyNoWatch(t, watches["bob"], func() {
verifyNoWatch(t, watches["carol"], func() {
w.Notify()
})
})
})
}()
// Trigger one watch.
func() {
w := NewDumbWatchManager(watches)
verifyWatch(t, watches["alice"], func() {
verifyNoWatch(t, watches["bob"], func() {
verifyNoWatch(t, watches["carol"], func() {
w.Arm("alice")
w.Notify()
})
})
})
}()
// Trigger two watches.
func() {
w := NewDumbWatchManager(watches)
verifyWatch(t, watches["alice"], func() {
verifyNoWatch(t, watches["bob"], func() {
verifyWatch(t, watches["carol"], func() {
w.Arm("alice")
w.Arm("carol")
w.Notify()
})
})
})
}()
// Trigger all three watches.
func() {
w := NewDumbWatchManager(watches)
verifyWatch(t, watches["alice"], func() {
verifyWatch(t, watches["bob"], func() {
verifyWatch(t, watches["carol"], func() {
w.Arm("alice")
w.Arm("bob")
w.Arm("carol")
w.Notify()
})
})
})
}()
// Trigger multiple times.
func() {
w := NewDumbWatchManager(watches)
verifyWatch(t, watches["alice"], func() {
verifyNoWatch(t, watches["bob"], func() {
verifyNoWatch(t, watches["carol"], func() {
w.Arm("alice")
w.Arm("alice")
w.Notify()
})
})
})
}()
// Make sure it panics when asked to arm an unknown table.
func() {
defer func() {
if r := recover(); r == nil {
t.Fatalf("didn't get expected panic")
}
}()
w := NewDumbWatchManager(watches)
w.Arm("nope")
}()
}
func verifyWatches(t *testing.T, w *PrefixWatchManager, expected string) {
var found []string
fn := func(k string, v interface{}) bool {
if k == "" {
k = "(full)"
}
found = append(found, k)
return false
}
w.watches.WalkPrefix("", fn)
sort.Strings(found)
actual := strings.Join(found, "|")
if expected != actual {
t.Fatalf("bad: %s != %s", expected, actual)
}
}
func TestWatch_PrefixWatchManager(t *testing.T) {
w := NewPrefixWatchManager()
verifyWatches(t, w, "")
// This will create the watch group.
ch1 := make(chan struct{}, 1)
w.Wait("hello", ch1)
verifyWatches(t, w, "hello")
// This will add to the existing one.
ch2 := make(chan struct{}, 1)
w.Wait("hello", ch2)
verifyWatches(t, w, "hello")
// This will add to the existing as well.
ch3 := make(chan struct{}, 1)
w.Wait("hello", ch3)
verifyWatches(t, w, "hello")
// Remove one of the watches.
w.Clear("hello", ch2)
verifyWatches(t, w, "hello")
// Do "clear" for one that was never added.
ch4 := make(chan struct{}, 1)
w.Clear("hello", ch4)
verifyWatches(t, w, "hello")
// Add a full table watch.
full := make(chan struct{}, 1)
w.Wait("", full)
verifyWatches(t, w, "(full)|hello")
// Add another channel for a different prefix.
nope := make(chan struct{}, 1)
w.Wait("nope", nope)
verifyWatches(t, w, "(full)|hello|nope")
// Fire off the notification and make sure channels were pinged (or not)
// as expected.
w.Notify("hello", false)
verifyWatches(t, w, "(full)|nope")
select {
case <-ch1:
default:
t.Fatalf("ch1 should have been notified")
}
select {
case <-ch2:
t.Fatalf("ch2 should not have been notified")
default:
}
select {
case <-ch3:
default:
t.Fatalf("ch3 should have been notified")
}
select {
case <-ch4:
t.Fatalf("ch4 should not have been notified")
default:
}
select {
case <-nope:
t.Fatalf("nope should not have been notified")
default:
}
select {
case <-full:
default:
t.Fatalf("full should have been notified")
}
}
func TestWatch_PrefixWatch(t *testing.T) {
w := NewPrefixWatchManager()
// Hit a specific key.
verifyWatch(t, w.NewPrefixWatch(""), func() {
verifyWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() {
verifyNoWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() {
verifyNoWatch(t, w.NewPrefixWatch("nope"), func() {
w.Notify("foo/bar/baz", false)
})
})
})
})
// Make sure cleanup is happening. All that should be left is the
// full-table watch and the un-fired watches.
verifyWatches(t, w, "(full)|foo/bar/zoo|nope")
// Delete a subtree.
verifyWatch(t, w.NewPrefixWatch(""), func() {
verifyWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() {
verifyWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() {
verifyNoWatch(t, w.NewPrefixWatch("nope"), func() {
w.Notify("foo/", true)
})
})
})
})
verifyWatches(t, w, "(full)|nope")
// Hit an unknown key.
verifyWatch(t, w.NewPrefixWatch(""), func() {
verifyNoWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() {
verifyNoWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() {
verifyNoWatch(t, w.NewPrefixWatch("nope"), func() {
w.Notify("not/in/there", false)
})
})
})
})
verifyWatches(t, w, "(full)|foo/bar/baz|foo/bar/zoo|nope")
// Make sure a watch can be reused.
watch := w.NewPrefixWatch("over/and/over")
for i := 0; i < 10; i++ {
verifyWatch(t, watch, func() {
w.Notify("over/and/over", false)
})
}
}
type MockWatch struct {
Waits map[chan struct{}]int
Clears map[chan struct{}]int
}
func NewMockWatch() *MockWatch {
return &MockWatch{
Waits: make(map[chan struct{}]int),
Clears: make(map[chan struct{}]int),
}
}
func (m *MockWatch) Wait(notifyCh chan struct{}) {
if _, ok := m.Waits[notifyCh]; ok {
m.Waits[notifyCh]++
} else {
m.Waits[notifyCh] = 1
}
}
func (m *MockWatch) Clear(notifyCh chan struct{}) {
if _, ok := m.Clears[notifyCh]; ok {
m.Clears[notifyCh]++
} else {
m.Clears[notifyCh] = 1
}
}
func TestWatch_MultiWatch(t *testing.T) {
w1, w2 := NewMockWatch(), NewMockWatch()
w := NewMultiWatch(w1, w2)
// Do some activity.
c1, c2 := make(chan struct{}), make(chan struct{})
w.Wait(c1)
w.Clear(c1)
w.Wait(c1)
w.Wait(c2)
w.Clear(c1)
w.Clear(c2)
// Make sure all the events were forwarded.
if cnt, ok := w1.Waits[c1]; !ok || cnt != 2 {
t.Fatalf("bad: %d", w1.Waits[c1])
}
if cnt, ok := w1.Clears[c1]; !ok || cnt != 2 {
t.Fatalf("bad: %d", w1.Clears[c1])
}
if cnt, ok := w1.Waits[c2]; !ok || cnt != 1 {
t.Fatalf("bad: %d", w1.Waits[c2])
}
if cnt, ok := w1.Clears[c2]; !ok || cnt != 1 {
t.Fatalf("bad: %d", w1.Clears[c2])
}
if cnt, ok := w2.Waits[c1]; !ok || cnt != 2 {
t.Fatalf("bad: %d", w2.Waits[c1])
}
if cnt, ok := w2.Clears[c1]; !ok || cnt != 2 {
t.Fatalf("bad: %d", w2.Clears[c1])
}
if cnt, ok := w2.Waits[c2]; !ok || cnt != 1 {
t.Fatalf("bad: %d", w2.Waits[c2])
}
if cnt, ok := w2.Clears[c2]; !ok || cnt != 1 {
t.Fatalf("bad: %d", w2.Clears[c2])
}
}