mirror of https://github.com/hashicorp/consul
Makes all delete loops use a separate slice to protect the iterator.
parent
682b0113b8
commit
834c6c1cb4
|
@ -99,11 +99,19 @@ func (g *Graveyard) ReapTxn(tx *memdb.Txn, idx uint64) error {
|
|||
return fmt.Errorf("failed querying tombstones: %s", err)
|
||||
}
|
||||
|
||||
// Find eligible tombstones.
|
||||
var objs []interface{}
|
||||
for stone := stones.Next(); stone != nil; stone = stones.Next() {
|
||||
if stone.(*Tombstone).Index <= idx {
|
||||
if err := tx.Delete("tombstones", stone); err != nil {
|
||||
return fmt.Errorf("failed deleting tombstone: %s", err)
|
||||
}
|
||||
objs = append(objs, stone)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete the tombstones in a separate loop so we don't trash the
|
||||
// iterator.
|
||||
for _, obj := range objs {
|
||||
if err := tx.Delete("tombstones", obj); err != nil {
|
||||
return fmt.Errorf("failed deleting tombstone: %s", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -481,9 +481,15 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) err
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
var sids []string
|
||||
for service := services.Next(); service != nil; service = services.Next() {
|
||||
svc := service.(*structs.ServiceNode)
|
||||
if err := s.deleteServiceTxn(tx, idx, watches, nodeID, svc.ServiceID); err != nil {
|
||||
sids = append(sids, svc.ServiceID)
|
||||
}
|
||||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, sid := range sids {
|
||||
if err := s.deleteServiceTxn(tx, idx, watches, nodeID, sid); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -494,9 +500,15 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) err
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed check lookup: %s", err)
|
||||
}
|
||||
var cids []string
|
||||
for check := checks.Next(); check != nil; check = checks.Next() {
|
||||
hc := check.(*structs.HealthCheck)
|
||||
if err := s.deleteCheckTxn(tx, idx, watches, nodeID, hc.CheckID); err != nil {
|
||||
cids = append(cids, hc.CheckID)
|
||||
}
|
||||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, cid := range cids {
|
||||
if err := s.deleteCheckTxn(tx, idx, watches, nodeID, cid); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -514,9 +526,14 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) err
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed session lookup: %s", err)
|
||||
}
|
||||
var ids []string
|
||||
for sess := sessions.Next(); sess != nil; sess = sessions.Next() {
|
||||
session := sess.(*structs.Session).ID
|
||||
if err := s.deleteSessionTxn(tx, idx, watches, session); err != nil {
|
||||
ids = append(ids, sess.(*structs.Session).ID)
|
||||
}
|
||||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, id := range ids {
|
||||
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
|
||||
return fmt.Errorf("failed session delete: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -804,12 +821,20 @@ func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed service check lookup: %s", err)
|
||||
}
|
||||
var cids []string
|
||||
for check := checks.Next(); check != nil; check = checks.Next() {
|
||||
hc := check.(*structs.HealthCheck)
|
||||
if err := s.deleteCheckTxn(tx, idx, watches, nodeID, hc.CheckID); err != nil {
|
||||
cids = append(cids, hc.CheckID)
|
||||
}
|
||||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, cid := range cids {
|
||||
if err := s.deleteCheckTxn(tx, idx, watches, nodeID, cid); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Update the index.
|
||||
if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
@ -895,10 +920,16 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.Healt
|
|||
return fmt.Errorf("failed session checks lookup: %s", err)
|
||||
}
|
||||
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
var ids []string
|
||||
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
||||
session := mapping.(*sessionCheck).Session
|
||||
if err := s.deleteSessionTxn(tx, idx, watches, session); err != nil {
|
||||
ids = append(ids, mapping.(*sessionCheck).Session)
|
||||
}
|
||||
|
||||
// Delete the session in a separate loop so we don't trash the
|
||||
// iterator.
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
for _, id := range ids {
|
||||
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
|
||||
return fmt.Errorf("failed deleting session: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -1011,9 +1042,14 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed session checks lookup: %s", err)
|
||||
}
|
||||
var ids []string
|
||||
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
||||
session := mapping.(*sessionCheck).Session
|
||||
if err := s.deleteSessionTxn(tx, idx, watches, session); err != nil {
|
||||
ids = append(ids, mapping.(*sessionCheck).Session)
|
||||
}
|
||||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, id := range ids {
|
||||
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
|
||||
return fmt.Errorf("failed deleting session: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -1528,16 +1564,22 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
|
|||
// directly so that we only update the index once. We also add
|
||||
// tombstones as we go.
|
||||
var modified bool
|
||||
var objs []interface{}
|
||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||
e := entry.(*structs.DirEntry)
|
||||
if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil {
|
||||
return fmt.Errorf("failed adding to graveyard: %s", err)
|
||||
}
|
||||
objs = append(objs, entry)
|
||||
modified = true
|
||||
}
|
||||
|
||||
if err := tx.Delete("kvs", e); err != nil {
|
||||
// Do the actual deletes in a separate loop so we don't trash the
|
||||
// iterator as we go.
|
||||
for _, obj := range objs {
|
||||
if err := tx.Delete("kvs", obj); err != nil {
|
||||
return fmt.Errorf("failed deleting kvs entry: %s", err)
|
||||
}
|
||||
modified = true
|
||||
}
|
||||
|
||||
// Update the index
|
||||
|
@ -1912,6 +1954,9 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
|||
switch session.Behavior {
|
||||
case structs.SessionKeysRelease:
|
||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||
// Note that we clone here since we are modifying the
|
||||
// returned object and want to make sure our set op
|
||||
// respects the transaction we are in.
|
||||
e := entry.(*structs.DirEntry).Clone()
|
||||
e.Session = ""
|
||||
if err := s.kvsSetTxn(tx, idx, e, true); err != nil {
|
||||
|
@ -1944,8 +1989,14 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed session checks lookup: %s", err)
|
||||
}
|
||||
var objs []interface{}
|
||||
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
||||
if err := tx.Delete("session_checks", mapping); err != nil {
|
||||
objs = append(objs, mapping)
|
||||
}
|
||||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, obj := range objs {
|
||||
if err := tx.Delete("session_checks", obj); err != nil {
|
||||
return fmt.Errorf("failed deleting session check: %s", err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue