From 75f2aa85885a9114e63da85e7127e6572ea0f23d Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 24 Jan 2017 17:23:48 -0800 Subject: [PATCH] Pass state store pointer into the blocking query work function. Previously the blocking functions all closed over the state store from their first query, with would not have worked properly when a restore occurred. This makes sure they get a frest state store pointer each time, and that pointer is synchronized with the abandon watch. --- consul/acl_endpoint.go | 9 +++------ consul/catalog_endpoint.go | 17 +++++------------ consul/coordinate_endpoint.go | 4 ++-- consul/health_endpoint.go | 17 +++++------------ consul/internal_endpoint.go | 9 +++------ consul/kvs_endpoint.go | 13 ++++--------- consul/prepared_query_endpoint.go | 9 +++------ consul/rpc.go | 12 +++++++++--- consul/session_endpoint.go | 13 ++++--------- 9 files changed, 38 insertions(+), 65 deletions(-) diff --git a/consul/acl_endpoint.go b/consul/acl_endpoint.go index 990e23b88f..97607efa34 100644 --- a/consul/acl_endpoint.go +++ b/consul/acl_endpoint.go @@ -6,6 +6,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" @@ -145,11 +146,9 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest, return fmt.Errorf(aclDisabled) } - // Get the local state - state := a.srv.fsm.State() return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, acl, err := state.ACLGet(ws, args.ACL) if err != nil { return err @@ -224,11 +223,9 @@ func (a *ACL) List(args *structs.DCSpecificRequest, return permissionDeniedErr } - // Get the local state - state := a.srv.fsm.State() return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, acls, err := state.ACLList(ws) if err != nil { return err diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index dfa71f2608..1108f5dfee 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -5,6 +5,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-memdb" @@ -163,12 +164,10 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde return err } - // Get the list of nodes. - state := c.srv.fsm.State() return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { var index uint64 var nodes structs.Nodes var err error @@ -195,12 +194,10 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I return err } - // Get the list of services and their tags. - state := c.srv.fsm.State() return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { var index uint64 var services structs.Services var err error @@ -229,12 +226,10 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru return fmt.Errorf("Must provide service name") } - // Get the nodes - state := c.srv.fsm.State() err := c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { var index uint64 var services structs.ServiceNodes var err error @@ -286,12 +281,10 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs return fmt.Errorf("Must provide node") } - // Get the node services - state := c.srv.fsm.State() return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, services, err := state.NodeServices(ws, args.Node) if err != nil { return err diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index 1b5980cf1d..b818f904cd 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" "github.com/hashicorp/serf/coordinate" @@ -174,10 +175,9 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I return err } - state := c.srv.fsm.State() return c.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, coords, err := state.Coordinates(ws) if err != nil { return err diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index 0e33151abf..aa225fb830 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -3,6 +3,7 @@ package consul import ( "fmt" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" ) @@ -19,12 +20,10 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, return err } - // Get the state specific checks - state := h.srv.fsm.State() return h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { var index uint64 var checks structs.HealthChecks var err error @@ -51,12 +50,10 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, return err } - // Get the node checks - state := h.srv.fsm.State() return h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, checks, err := state.NodeChecks(ws, args.Node) if err != nil { return err @@ -79,12 +76,10 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, return err } - // Get the service checks - state := h.srv.fsm.State() return h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { var index uint64 var checks structs.HealthChecks var err error @@ -115,12 +110,10 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc return fmt.Errorf("Must provide service name") } - // Get the nodes - state := h.srv.fsm.State() err := h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { var index uint64 var nodes structs.CheckServiceNodes var err error diff --git a/consul/internal_endpoint.go b/consul/internal_endpoint.go index ff23fb562e..2d0c059619 100644 --- a/consul/internal_endpoint.go +++ b/consul/internal_endpoint.go @@ -3,6 +3,7 @@ package consul import ( "fmt" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" "github.com/hashicorp/serf/serf" @@ -22,12 +23,10 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest, return err } - // Get the node info - state := m.srv.fsm.State() return m.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, dump, err := state.NodeInfo(ws, args.Node) if err != nil { return err @@ -45,12 +44,10 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest, return err } - // Get all the node info - state := m.srv.fsm.State() return m.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, dump, err := state.NodeDump(ws) if err != nil { return err diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 114a764d2d..9f0d4cd0c5 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -6,6 +6,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" ) @@ -118,12 +119,10 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er return err } - // Get the local state - state := k.srv.fsm.State() return k.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, ent, err := state.KVSGet(ws, args.Key) if err != nil { return err @@ -159,12 +158,10 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e return err } - // Get the local state - state := k.srv.fsm.State() return k.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, ent, err := state.KVSList(ws, args.Key) if err != nil { return err @@ -201,12 +198,10 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi return err } - // Get the local state - state := k.srv.fsm.State() return k.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, keys, err := state.KVSListKeys(ws, args.Prefix, args.Seperator) if err != nil { return err diff --git a/consul/prepared_query_endpoint.go b/consul/prepared_query_endpoint.go index baa14776cb..84ad808148 100644 --- a/consul/prepared_query_endpoint.go +++ b/consul/prepared_query_endpoint.go @@ -8,6 +8,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" @@ -217,12 +218,10 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest, return err } - // Get the requested query. - state := p.srv.fsm.State() return p.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, query, err := state.PreparedQueryGet(ws, args.QueryID) if err != nil { return err @@ -263,12 +262,10 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind return err } - // Get the list of queries. - state := p.srv.fsm.State() return p.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, queries, err := state.PreparedQueryList(ws) if err != nil { return err diff --git a/consul/rpc.go b/consul/rpc.go index b71bcaa3d7..0c17ea5ec6 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -11,6 +11,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-memdb" @@ -354,7 +355,7 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, // queryFn is used to perform a query operation. If a re-query is needed, the // passed-in watch set will be used to block for changes. -type queryFn func(memdb.WatchSet) error +type queryFn func(memdb.WatchSet, *state.StateStore) error // blockingQuery is used to process a potentially blocking query operation. func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta, @@ -394,6 +395,11 @@ RUN_QUERY: // Run the query. metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1) + // Operate on a consistent set of state. This makes sure that the + // abandon channel goes with the state that the caller is using to + // build watches. + state := s.fsm.State() + // We can skip all watch tracking if this isn't a blocking query. var ws memdb.WatchSet if queryOpts.MinQueryIndex > 0 { @@ -401,11 +407,11 @@ RUN_QUERY: // This channel will be closed if a snapshot is restored and the // whole state store is abandoned. - ws.Add(s.fsm.State().AbandonCh()) + ws.Add(state.AbandonCh()) } // Block up to the timeout if we didn't see anything fresh. - err := fn(ws) + err := fn(ws, state) if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex { if expired := ws.Watch(timeout.C); !expired { goto RUN_QUERY diff --git a/consul/session_endpoint.go b/consul/session_endpoint.go index cf3679c9cf..557535c56f 100644 --- a/consul/session_endpoint.go +++ b/consul/session_endpoint.go @@ -5,6 +5,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" @@ -140,12 +141,10 @@ func (s *Session) Get(args *structs.SessionSpecificRequest, return err } - // Get the local state - state := s.srv.fsm.State() return s.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, session, err := state.SessionGet(ws, args.Session) if err != nil { return err @@ -171,12 +170,10 @@ func (s *Session) List(args *structs.DCSpecificRequest, return err } - // Get the local state - state := s.srv.fsm.State() return s.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, sessions, err := state.SessionList(ws) if err != nil { return err @@ -197,12 +194,10 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, return err } - // Get the local state - state := s.srv.fsm.State() return s.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, sessions, err := state.NodeSessions(ws, args.Node) if err != nil { return err