From ffe531c55fe3d6f25e1de7795d9e5b006eeab106 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 19 Oct 2015 14:56:22 -0700 Subject: [PATCH] Converts sessions and ACLs over to iterators. --- consul/fsm.go | 21 ++++++++++--------- consul/fsm_test.go | 20 ++++++++++++------ consul/leader_test.go | 13 +++++++----- consul/state/graveyard.go | 12 ++++------- consul/state/graveyard_test.go | 12 +++++++++-- consul/state/state_store.go | 35 ++++++++++++++++---------------- consul/state/state_store_test.go | 30 +++++++++++++++++++-------- 7 files changed, 85 insertions(+), 58 deletions(-) diff --git a/consul/fsm.go b/consul/fsm.go index 0982fd5c63..e75c7fec50 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -445,14 +445,14 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink, func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink, encoder *codec.Encoder) error { - sessions, err := s.state.SessionDump() + iter, err := s.state.Sessions() if err != nil { return err } - for _, s := range sessions { + for si := iter.Next(); si != nil; si = iter.Next() { sink.Write([]byte{byte(structs.SessionRequestType)}) - if err := encoder.Encode(s); err != nil { + if err := encoder.Encode(si.(*structs.Session)); err != nil { return err } } @@ -461,14 +461,14 @@ func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink, func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink, encoder *codec.Encoder) error { - acls, err := s.state.ACLDump() + iter, err := s.state.ACLs() if err != nil { return err } - for _, s := range acls { + for ai := iter.Next(); ai != nil; ai = iter.Next() { sink.Write([]byte{byte(structs.ACLRequestType)}) - if err := encoder.Encode(s); err != nil { + if err := encoder.Encode(ai.(*structs.ACL)); err != nil { return err } } @@ -493,21 +493,22 @@ func (s *consulSnapshot) persistKVs(sink raft.SnapshotSink, func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink, encoder *codec.Encoder) error { - stones, err := s.state.TombstoneDump() + iter, err := s.state.Tombstones() if err != nil { return err } - for _, s := range stones { + for ti := iter.Next(); ti != nil; ti = iter.Next() { sink.Write([]byte{byte(structs.TombstoneRequestType)}) // For historical reasons, these are serialized in the snapshots // as KV entries. We want to keep the snapshot format compatible // with pre-0.6 versions for now. + stone := ti.(*state.Tombstone) fake := &structs.DirEntry{ - Key: s.Key, + Key: stone.Key, RaftIndex: structs.RaftIndex{ - ModifyIndex: s.Index, + ModifyIndex: stone.Index, }, } if err := encoder.Encode(fake); err != nil { diff --git a/consul/fsm_test.go b/consul/fsm_test.go index f3c0c7aeca..ff027fff30 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -5,6 +5,7 @@ import ( "os" "testing" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/raft" ) @@ -474,12 +475,19 @@ func TestFSM_SnapshotRestore(t *testing.T) { func() { snap := fsm2.state.Snapshot() defer snap.Close() - dump, err := snap.TombstoneDump() + iter, err := snap.Tombstones() if err != nil { t.Fatalf("err: %s", err) } - if len(dump) != 1 { - t.Fatalf("bad: %#v", dump) + stone := iter.Next().(*state.Tombstone) + if stone == nil { + t.Fatalf("missing tombstone") + } + if stone.Key != "/remove" || stone.Index != 12 { + t.Fatalf("bad: %v", stone) + } + if iter.Next() != nil { + t.Fatalf("unexpected extra tombstones") } }() } @@ -1015,12 +1023,12 @@ func TestFSM_TombstoneReap(t *testing.T) { // Verify the tombstones are gone snap := fsm.state.Snapshot() defer snap.Close() - dump, err := snap.TombstoneDump() + iter, err := snap.Tombstones() if err != nil { t.Fatalf("err: %s", err) } - if len(dump) != 0 { - t.Fatalf("bad: %#v", dump) + if iter.Next() != nil { + t.Fatalf("unexpected extra tombstones") } } diff --git a/consul/leader_test.go b/consul/leader_test.go index d48d002046..0adbf7f614 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -590,12 +590,15 @@ func TestLeader_ReapTombstones(t *testing.T) { func() { snap := state.Snapshot() defer snap.Close() - dump, err := snap.TombstoneDump() + iter, err := snap.Tombstones() if err != nil { t.Fatalf("err: %s", err) } - if len(dump) != 1 { - t.Fatalf("bad: %#v", dump) + if iter.Next() == nil { + t.Fatalf("missing tombstones") + } + if iter.Next() != nil { + t.Fatalf("unexpected extra tombstones") } }() @@ -604,11 +607,11 @@ func TestLeader_ReapTombstones(t *testing.T) { testutil.WaitForResult(func() (bool, error) { snap := state.Snapshot() defer snap.Close() - dump, err := snap.TombstoneDump() + iter, err := snap.Tombstones() if err != nil { return false, err } - return len(dump) == 0, nil + return iter.Next() == nil, nil }, func(err error) { t.Fatalf("err: %v", err) }) diff --git a/consul/state/graveyard.go b/consul/state/graveyard.go index b22b29e85b..0ecd0974b1 100644 --- a/consul/state/graveyard.go +++ b/consul/state/graveyard.go @@ -62,17 +62,13 @@ func (g *Graveyard) GetMaxIndexTxn(tx *memdb.Txn, prefix string) (uint64, error) } // DumpTxn returns all the tombstones. -func (g *Graveyard) DumpTxn(tx *memdb.Txn) ([]*Tombstone, error) { - stones, err := tx.Get("tombstones", "id") +func (g *Graveyard) DumpTxn(tx *memdb.Txn) (memdb.ResultIterator, error) { + iter, err := tx.Get("tombstones", "id") if err != nil { - return nil, fmt.Errorf("failed querying tombstones: %s", err) + return nil, err } - var dump []*Tombstone - for stone := stones.Next(); stone != nil; stone = stones.Next() { - dump = append(dump, stone.(*Tombstone)) - } - return dump, nil + return iter, nil } // RestoreTxn is used when restoring from a snapshot. For general inserts, use diff --git a/consul/state/graveyard_test.go b/consul/state/graveyard_test.go index e796ce8557..4b7f46e27f 100644 --- a/consul/state/graveyard_test.go +++ b/consul/state/graveyard_test.go @@ -199,10 +199,14 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) { tx := s.db.Txn(false) defer tx.Abort() - dump, err := g.DumpTxn(tx) + iter, err := g.DumpTxn(tx) if err != nil { t.Fatalf("err: %s", err) } + var dump []*Tombstone + for ti := iter.Next(); ti != nil; ti = iter.Next() { + dump = append(dump, ti.(*Tombstone)) + } return dump }() @@ -241,10 +245,14 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) { tx := s.db.Txn(false) defer tx.Abort() - dump, err := g.DumpTxn(tx) + iter, err := g.DumpTxn(tx) if err != nil { t.Fatalf("err: %s", err) } + var dump []*Tombstone + for ti := iter.Next(); ti != nil; ti = iter.Next() { + dump = append(dump, ti.(*Tombstone)) + } return dump }() if !reflect.DeepEqual(dump, expected) { diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 4176ab556d..a26cd42a60 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -156,7 +156,7 @@ func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) { return iter, nil } -// KVSDump is used to pull the full list of KVS entries for use during snapshots. +// KVs is used to pull the full list of KVS entries for use during snapshots. func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) { iter, err := s.tx.Get("kvs", "id_prefix") if err != nil { @@ -165,28 +165,27 @@ func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) { return iter, nil } -// TombstoneDump is used to pull all the tombstones from the graveyard. -func (s *StateSnapshot) TombstoneDump() ([]*Tombstone, error) { +// Tombstones is used to pull all the tombstones from the graveyard. +func (s *StateSnapshot) Tombstones() (memdb.ResultIterator, error) { return s.store.kvsGraveyard.DumpTxn(s.tx) } -// SessionDump is used to pull the full list of sessions for use during snapshots. -func (s *StateSnapshot) SessionDump() (structs.Sessions, error) { - sessions, err := s.tx.Get("sessions", "id") +// Sessions is used to pull the full list of sessions for use during snapshots. +func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) { + iter, err := s.tx.Get("sessions", "id") if err != nil { - return nil, fmt.Errorf("failed session lookup: %s", err) - } - - var dump structs.Sessions - for session := sessions.Next(); session != nil; session = sessions.Next() { - dump = append(dump, session.(*structs.Session)) + return nil, err } - return dump, nil + return iter, nil } -// ACLDump is used to pull all the ACLs from the snapshot. -func (s *StateSnapshot) ACLDump() (structs.ACLs, error) { - return aclListTxn(s.tx) +// ACLs is used to pull all the ACLs from the snapshot. +func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) { + iter, err := s.tx.Get("acls", "id") + if err != nil { + return nil, err + } + return iter, nil } // maxIndex is a helper used to retrieve the highest known index @@ -2094,7 +2093,7 @@ func (s *StateStore) ACLList() (uint64, structs.ACLs, error) { idx := maxIndexTxn(tx, s.getWatchTables("ACLList")...) // Return the ACLs. - acls, err := aclListTxn(tx) + acls, err := s.aclListTxn(tx) if err != nil { return 0, nil, fmt.Errorf("failed acl lookup: %s", err) } @@ -2103,7 +2102,7 @@ func (s *StateStore) ACLList() (uint64, structs.ACLs, error) { // aclListTxn is used to list out all of the ACLs in the state store. This is a // function vs. a method so it can be called from the snapshotter. -func aclListTxn(tx *memdb.Txn) (structs.ACLs, error) { +func (s *StateStore) aclListTxn(tx *memdb.Txn) (structs.ACLs, error) { // Query all of the ACLs in the state store acls, err := tx.Get("acls", "id") if err != nil { diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 3137023b54..df69e7f319 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -286,12 +286,12 @@ func TestStateStore_ReapTombstones(t *testing.T) { // Make sure the tombstones are actually gone. snap := s.Snapshot() defer snap.Close() - dump, err := snap.TombstoneDump() + iter, err := snap.Tombstones() if err != nil { t.Fatalf("err: %s", err) } - if len(dump) != 0 { - t.Fatalf("bad: %#v", dump) + if iter.Next() != nil { + t.Fatalf("unexpected extra tombstones") } } @@ -3264,10 +3264,14 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) { } // Verify the snapshot. - dump, err := snap.TombstoneDump() + iter, err := snap.Tombstones() if err != nil { t.Fatalf("err: %s", err) } + var dump []*Tombstone + for ti := iter.Next(); ti != nil; ti = iter.Next() { + dump = append(dump, ti.(*Tombstone)) + } if len(dump) != 1 { t.Fatalf("bad %#v", dump) } @@ -3312,12 +3316,12 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) { // But make sure the tombstone is actually gone. snap := s.Snapshot() defer snap.Close() - dump, err := snap.TombstoneDump() + iter, err := snap.Tombstones() if err != nil { t.Fatalf("err: %s", err) } - if len(dump) != 0 { - t.Fatalf("bad %#v", dump) + if iter.Next() != nil { + t.Fatalf("unexpected extra tombstones") } }() } @@ -3656,10 +3660,14 @@ func TestStateStore_Session_Snapshot_Restore(t *testing.T) { if idx := snap.LastIndex(); idx != 7 { t.Fatalf("bad index: %d", idx) } - dump, err := snap.SessionDump() + iter, err := snap.Sessions() if err != nil { t.Fatalf("err: %s", err) } + var dump structs.Sessions + for si := iter.Next(); si != nil; si = iter.Next() { + dump = append(dump, si.(*structs.Session)) + } if !reflect.DeepEqual(dump, sessions) { t.Fatalf("bad: %#v", dump) } @@ -4319,10 +4327,14 @@ func TestStateStore_ACL_Snapshot_Restore(t *testing.T) { if idx := snap.LastIndex(); idx != 2 { t.Fatalf("bad index: %d", idx) } - dump, err := snap.ACLDump() + iter, err := snap.ACLs() if err != nil { t.Fatalf("err: %s", err) } + var dump structs.ACLs + for ai := iter.Next(); ai != nil; ai = iter.Next() { + dump = append(dump, ai.(*structs.ACL)) + } if !reflect.DeepEqual(dump, acls) { t.Fatalf("bad: %#v", dump) }