diff --git a/consul/state/graveyard.go b/consul/state/graveyard.go index 88181e83c7..b22b29e85b 100644 --- a/consul/state/graveyard.go +++ b/consul/state/graveyard.go @@ -99,11 +99,19 @@ func (g *Graveyard) ReapTxn(tx *memdb.Txn, idx uint64) error { return fmt.Errorf("failed querying tombstones: %s", err) } + // Find eligible tombstones. + var objs []interface{} for stone := stones.Next(); stone != nil; stone = stones.Next() { if stone.(*Tombstone).Index <= idx { - if err := tx.Delete("tombstones", stone); err != nil { - return fmt.Errorf("failed deleting tombstone: %s", err) - } + objs = append(objs, stone) + } + } + + // Delete the tombstones in a separate loop so we don't trash the + // iterator. + for _, obj := range objs { + if err := tx.Delete("tombstones", obj); err != nil { + return fmt.Errorf("failed deleting tombstone: %s", err) } } return nil diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 259f98b447..481abe2df1 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -481,9 +481,15 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) err if err != nil { return fmt.Errorf("failed service lookup: %s", err) } + var sids []string for service := services.Next(); service != nil; service = services.Next() { svc := service.(*structs.ServiceNode) - if err := s.deleteServiceTxn(tx, idx, watches, nodeID, svc.ServiceID); err != nil { + sids = append(sids, svc.ServiceID) + } + + // Do the delete in a separate loop so we don't trash the iterator. + for _, sid := range sids { + if err := s.deleteServiceTxn(tx, idx, watches, nodeID, sid); err != nil { return err } } @@ -494,9 +500,15 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) err if err != nil { return fmt.Errorf("failed check lookup: %s", err) } + var cids []string for check := checks.Next(); check != nil; check = checks.Next() { hc := check.(*structs.HealthCheck) - if err := s.deleteCheckTxn(tx, idx, watches, nodeID, hc.CheckID); err != nil { + cids = append(cids, hc.CheckID) + } + + // Do the delete in a separate loop so we don't trash the iterator. + for _, cid := range cids { + if err := s.deleteCheckTxn(tx, idx, watches, nodeID, cid); err != nil { return err } } @@ -514,9 +526,14 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) err if err != nil { return fmt.Errorf("failed session lookup: %s", err) } + var ids []string for sess := sessions.Next(); sess != nil; sess = sessions.Next() { - session := sess.(*structs.Session).ID - if err := s.deleteSessionTxn(tx, idx, watches, session); err != nil { + ids = append(ids, sess.(*structs.Session).ID) + } + + // Do the delete in a separate loop so we don't trash the iterator. + for _, id := range ids { + if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil { return fmt.Errorf("failed session delete: %s", err) } } @@ -804,12 +821,20 @@ func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa if err != nil { return fmt.Errorf("failed service check lookup: %s", err) } + var cids []string for check := checks.Next(); check != nil; check = checks.Next() { hc := check.(*structs.HealthCheck) - if err := s.deleteCheckTxn(tx, idx, watches, nodeID, hc.CheckID); err != nil { + cids = append(cids, hc.CheckID) + } + + // Do the delete in a separate loop so we don't trash the iterator. + for _, cid := range cids { + if err := s.deleteCheckTxn(tx, idx, watches, nodeID, cid); err != nil { return err } } + + // Update the index. if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -895,10 +920,16 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.Healt return fmt.Errorf("failed session checks lookup: %s", err) } - watches := NewDumbWatchManager(s.tableWatches) + var ids []string for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() { - session := mapping.(*sessionCheck).Session - if err := s.deleteSessionTxn(tx, idx, watches, session); err != nil { + ids = append(ids, mapping.(*sessionCheck).Session) + } + + // Delete the session in a separate loop so we don't trash the + // iterator. + watches := NewDumbWatchManager(s.tableWatches) + for _, id := range ids { + if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil { return fmt.Errorf("failed deleting session: %s", err) } } @@ -1011,9 +1042,14 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc if err != nil { return fmt.Errorf("failed session checks lookup: %s", err) } + var ids []string for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() { - session := mapping.(*sessionCheck).Session - if err := s.deleteSessionTxn(tx, idx, watches, session); err != nil { + ids = append(ids, mapping.(*sessionCheck).Session) + } + + // Do the delete in a separate loop so we don't trash the iterator. + for _, id := range ids { + if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil { return fmt.Errorf("failed deleting session: %s", err) } } @@ -1528,16 +1564,22 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error { // directly so that we only update the index once. We also add // tombstones as we go. var modified bool + var objs []interface{} for entry := entries.Next(); entry != nil; entry = entries.Next() { e := entry.(*structs.DirEntry) if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil { return fmt.Errorf("failed adding to graveyard: %s", err) } + objs = append(objs, entry) + modified = true + } - if err := tx.Delete("kvs", e); err != nil { + // Do the actual deletes in a separate loop so we don't trash the + // iterator as we go. + for _, obj := range objs { + if err := tx.Delete("kvs", obj); err != nil { return fmt.Errorf("failed deleting kvs entry: %s", err) } - modified = true } // Update the index @@ -1912,6 +1954,9 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa switch session.Behavior { case structs.SessionKeysRelease: for entry := entries.Next(); entry != nil; entry = entries.Next() { + // Note that we clone here since we are modifying the + // returned object and want to make sure our set op + // respects the transaction we are in. e := entry.(*structs.DirEntry).Clone() e.Session = "" if err := s.kvsSetTxn(tx, idx, e, true); err != nil { @@ -1944,8 +1989,14 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa if err != nil { return fmt.Errorf("failed session checks lookup: %s", err) } + var objs []interface{} for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() { - if err := tx.Delete("session_checks", mapping); err != nil { + objs = append(objs, mapping) + } + + // Do the delete in a separate loop so we don't trash the iterator. + for _, obj := range objs { + if err := tx.Delete("session_checks", obj); err != nil { return fmt.Errorf("failed deleting session check: %s", err) } }