From e810697e06d08b79318281c507d3688ecca4d679 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 28 Nov 2017 17:26:16 -0800 Subject: [PATCH] Resolves an FSM snapshot TODO. This adds checks for sink write calls before we continue the refactor, which will resolve the other TODO comment we deleted as part of this change. --- agent/consul/fsm.go | 51 +++++++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/agent/consul/fsm.go b/agent/consul/fsm.go index 0d272546f7..4c67ddae4e 100644 --- a/agent/consul/fsm.go +++ b/agent/consul/fsm.go @@ -15,13 +15,6 @@ import ( "github.com/hashicorp/raft" ) -// TODO (slackpad) - There are two refactors we should do here: -// -// 1. Register the different types from the state store and make the FSM more -// generic, especially around snapshot/restore. Those should really just -// pass the encoder into a WriteSnapshot() kind of method. -// 2. Check all the error return values from all the Write() calls. - // msgpackHandle is a shared handle for encoding/decoding msgpack payloads var msgpackHandle = &codec.MsgpackHandle{} @@ -592,7 +585,9 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink, } // Register the node itself - sink.Write([]byte{byte(structs.RegisterRequestType)}) + if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { + return err + } if err := encoder.Encode(&req); err != nil { return err } @@ -603,7 +598,9 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink, return err } for service := services.Next(); service != nil; service = services.Next() { - sink.Write([]byte{byte(structs.RegisterRequestType)}) + if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { + return err + } req.Service = service.(*structs.ServiceNode).ToNodeService() if err := encoder.Encode(&req); err != nil { return err @@ -617,7 +614,9 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink, return err } for check := checks.Next(); check != nil; check = checks.Next() { - sink.Write([]byte{byte(structs.RegisterRequestType)}) + if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { + return err + } req.Check = check.(*structs.HealthCheck) if err := encoder.Encode(&req); err != nil { return err @@ -633,7 +632,9 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink, return err } for coord := coords.Next(); coord != nil; coord = coords.Next() { - sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}) + if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil { + return err + } updates := structs.Coordinates{coord.(*structs.Coordinate)} if err := encoder.Encode(&updates); err != nil { return err @@ -650,7 +651,9 @@ func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink, } for session := sessions.Next(); session != nil; session = sessions.Next() { - sink.Write([]byte{byte(structs.SessionRequestType)}) + if _, err := sink.Write([]byte{byte(structs.SessionRequestType)}); err != nil { + return err + } if err := encoder.Encode(session.(*structs.Session)); err != nil { return err } @@ -666,7 +669,9 @@ func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink, } for acl := acls.Next(); acl != nil; acl = acls.Next() { - sink.Write([]byte{byte(structs.ACLRequestType)}) + if _, err := sink.Write([]byte{byte(structs.ACLRequestType)}); err != nil { + return err + } if err := encoder.Encode(acl.(*structs.ACL)); err != nil { return err } @@ -677,7 +682,9 @@ func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink, return err } if bs != nil { - sink.Write([]byte{byte(structs.ACLBootstrapRequestType)}) + if _, err := sink.Write([]byte{byte(structs.ACLBootstrapRequestType)}); err != nil { + return err + } if err := encoder.Encode(bs); err != nil { return err } @@ -694,7 +701,9 @@ func (s *consulSnapshot) persistKVs(sink raft.SnapshotSink, } for entry := entries.Next(); entry != nil; entry = entries.Next() { - sink.Write([]byte{byte(structs.KVSRequestType)}) + if _, err := sink.Write([]byte{byte(structs.KVSRequestType)}); err != nil { + return err + } if err := encoder.Encode(entry.(*structs.DirEntry)); err != nil { return err } @@ -710,7 +719,9 @@ func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink, } for stone := stones.Next(); stone != nil; stone = stones.Next() { - sink.Write([]byte{byte(structs.TombstoneRequestType)}) + if _, err := sink.Write([]byte{byte(structs.TombstoneRequestType)}); err != nil { + return err + } // For historical reasons, these are serialized in the snapshots // as KV entries. We want to keep the snapshot format compatible @@ -737,7 +748,9 @@ func (s *consulSnapshot) persistPreparedQueries(sink raft.SnapshotSink, } for _, query := range queries { - sink.Write([]byte{byte(structs.PreparedQueryRequestType)}) + if _, err := sink.Write([]byte{byte(structs.PreparedQueryRequestType)}); err != nil { + return err + } if err := encoder.Encode(query); err != nil { return err } @@ -755,7 +768,9 @@ func (s *consulSnapshot) persistAutopilot(sink raft.SnapshotSink, return nil } - sink.Write([]byte{byte(structs.AutopilotRequestType)}) + if _, err := sink.Write([]byte{byte(structs.AutopilotRequestType)}); err != nil { + return err + } if err := encoder.Encode(autopilot); err != nil { return err }