Pass state store pointer into the blocking query work function.

Previously the blocking functions all closed over the state store from
their first query, with would not have worked properly when a restore
occurred. This makes sure they get a frest state store pointer each time,
and that pointer is synchronized with the abandon watch.
pull/2671/head
James Phillips 2017-01-24 17:23:48 -08:00
parent d97c3c6c18
commit 75f2aa8588
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
9 changed files with 38 additions and 65 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
@ -145,11 +146,9 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
return fmt.Errorf(aclDisabled) return fmt.Errorf(aclDisabled)
} }
// Get the local state
state := a.srv.fsm.State()
return a.srv.blockingQuery(&args.QueryOptions, return a.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, acl, err := state.ACLGet(ws, args.ACL) index, acl, err := state.ACLGet(ws, args.ACL)
if err != nil { if err != nil {
return err return err
@ -224,11 +223,9 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
return permissionDeniedErr return permissionDeniedErr
} }
// Get the local state
state := a.srv.fsm.State()
return a.srv.blockingQuery(&args.QueryOptions, return a.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, acls, err := state.ACLList(ws) index, acls, err := state.ACLList(ws)
if err != nil { if err != nil {
return err return err

View File

@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
@ -163,12 +164,10 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
return err return err
} }
// Get the list of nodes.
state := c.srv.fsm.State()
return c.srv.blockingQuery( return c.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
var index uint64 var index uint64
var nodes structs.Nodes var nodes structs.Nodes
var err error var err error
@ -195,12 +194,10 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return err return err
} }
// Get the list of services and their tags.
state := c.srv.fsm.State()
return c.srv.blockingQuery( return c.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
var index uint64 var index uint64
var services structs.Services var services structs.Services
var err error var err error
@ -229,12 +226,10 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
return fmt.Errorf("Must provide service name") return fmt.Errorf("Must provide service name")
} }
// Get the nodes
state := c.srv.fsm.State()
err := c.srv.blockingQuery( err := c.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
var index uint64 var index uint64
var services structs.ServiceNodes var services structs.ServiceNodes
var err error var err error
@ -286,12 +281,10 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
return fmt.Errorf("Must provide node") return fmt.Errorf("Must provide node")
} }
// Get the node services
state := c.srv.fsm.State()
return c.srv.blockingQuery( return c.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, services, err := state.NodeServices(ws, args.Node) index, services, err := state.NodeServices(ws, args.Node)
if err != nil { if err != nil {
return err return err

View File

@ -7,6 +7,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
@ -174,10 +175,9 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
return err return err
} }
state := c.srv.fsm.State()
return c.srv.blockingQuery(&args.QueryOptions, return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, coords, err := state.Coordinates(ws) index, coords, err := state.Coordinates(ws)
if err != nil { if err != nil {
return err return err

View File

@ -3,6 +3,7 @@ package consul
import ( import (
"fmt" "fmt"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
) )
@ -19,12 +20,10 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
return err return err
} }
// Get the state specific checks
state := h.srv.fsm.State()
return h.srv.blockingQuery( return h.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
var index uint64 var index uint64
var checks structs.HealthChecks var checks structs.HealthChecks
var err error var err error
@ -51,12 +50,10 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
return err return err
} }
// Get the node checks
state := h.srv.fsm.State()
return h.srv.blockingQuery( return h.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, checks, err := state.NodeChecks(ws, args.Node) index, checks, err := state.NodeChecks(ws, args.Node)
if err != nil { if err != nil {
return err return err
@ -79,12 +76,10 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
return err return err
} }
// Get the service checks
state := h.srv.fsm.State()
return h.srv.blockingQuery( return h.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
var index uint64 var index uint64
var checks structs.HealthChecks var checks structs.HealthChecks
var err error var err error
@ -115,12 +110,10 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
return fmt.Errorf("Must provide service name") return fmt.Errorf("Must provide service name")
} }
// Get the nodes
state := h.srv.fsm.State()
err := h.srv.blockingQuery( err := h.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
var index uint64 var index uint64
var nodes structs.CheckServiceNodes var nodes structs.CheckServiceNodes
var err error var err error

View File

@ -3,6 +3,7 @@ package consul
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
@ -22,12 +23,10 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest,
return err return err
} }
// Get the node info
state := m.srv.fsm.State()
return m.srv.blockingQuery( return m.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, dump, err := state.NodeInfo(ws, args.Node) index, dump, err := state.NodeInfo(ws, args.Node)
if err != nil { if err != nil {
return err return err
@ -45,12 +44,10 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest,
return err return err
} }
// Get all the node info
state := m.srv.fsm.State()
return m.srv.blockingQuery( return m.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, dump, err := state.NodeDump(ws) index, dump, err := state.NodeDump(ws)
if err != nil { if err != nil {
return err return err

View File

@ -6,6 +6,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
) )
@ -118,12 +119,10 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
return err return err
} }
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingQuery( return k.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, ent, err := state.KVSGet(ws, args.Key) index, ent, err := state.KVSGet(ws, args.Key)
if err != nil { if err != nil {
return err return err
@ -159,12 +158,10 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
return err return err
} }
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingQuery( return k.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, ent, err := state.KVSList(ws, args.Key) index, ent, err := state.KVSList(ws, args.Key)
if err != nil { if err != nil {
return err return err
@ -201,12 +198,10 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
return err return err
} }
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingQuery( return k.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, keys, err := state.KVSListKeys(ws, args.Prefix, args.Seperator) index, keys, err := state.KVSListKeys(ws, args.Prefix, args.Seperator)
if err != nil { if err != nil {
return err return err

View File

@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
@ -217,12 +218,10 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest,
return err return err
} }
// Get the requested query.
state := p.srv.fsm.State()
return p.srv.blockingQuery( return p.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, query, err := state.PreparedQueryGet(ws, args.QueryID) index, query, err := state.PreparedQueryGet(ws, args.QueryID)
if err != nil { if err != nil {
return err return err
@ -263,12 +262,10 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind
return err return err
} }
// Get the list of queries.
state := p.srv.fsm.State()
return p.srv.blockingQuery( return p.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, queries, err := state.PreparedQueryList(ws) index, queries, err := state.PreparedQueryList(ws)
if err != nil { if err != nil {
return err return err

View File

@ -11,6 +11,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
@ -354,7 +355,7 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
// queryFn is used to perform a query operation. If a re-query is needed, the // queryFn is used to perform a query operation. If a re-query is needed, the
// passed-in watch set will be used to block for changes. // passed-in watch set will be used to block for changes.
type queryFn func(memdb.WatchSet) error type queryFn func(memdb.WatchSet, *state.StateStore) error
// blockingQuery is used to process a potentially blocking query operation. // blockingQuery is used to process a potentially blocking query operation.
func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta, func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
@ -394,6 +395,11 @@ RUN_QUERY:
// Run the query. // Run the query.
metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1) metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1)
// Operate on a consistent set of state. This makes sure that the
// abandon channel goes with the state that the caller is using to
// build watches.
state := s.fsm.State()
// We can skip all watch tracking if this isn't a blocking query. // We can skip all watch tracking if this isn't a blocking query.
var ws memdb.WatchSet var ws memdb.WatchSet
if queryOpts.MinQueryIndex > 0 { if queryOpts.MinQueryIndex > 0 {
@ -401,11 +407,11 @@ RUN_QUERY:
// This channel will be closed if a snapshot is restored and the // This channel will be closed if a snapshot is restored and the
// whole state store is abandoned. // whole state store is abandoned.
ws.Add(s.fsm.State().AbandonCh()) ws.Add(state.AbandonCh())
} }
// Block up to the timeout if we didn't see anything fresh. // Block up to the timeout if we didn't see anything fresh.
err := fn(ws) err := fn(ws, state)
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex { if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
if expired := ws.Watch(timeout.C); !expired { if expired := ws.Watch(timeout.C); !expired {
goto RUN_QUERY goto RUN_QUERY

View File

@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
@ -140,12 +141,10 @@ func (s *Session) Get(args *structs.SessionSpecificRequest,
return err return err
} }
// Get the local state
state := s.srv.fsm.State()
return s.srv.blockingQuery( return s.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, session, err := state.SessionGet(ws, args.Session) index, session, err := state.SessionGet(ws, args.Session)
if err != nil { if err != nil {
return err return err
@ -171,12 +170,10 @@ func (s *Session) List(args *structs.DCSpecificRequest,
return err return err
} }
// Get the local state
state := s.srv.fsm.State()
return s.srv.blockingQuery( return s.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, sessions, err := state.SessionList(ws) index, sessions, err := state.SessionList(ws)
if err != nil { if err != nil {
return err return err
@ -197,12 +194,10 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest,
return err return err
} }
// Get the local state
state := s.srv.fsm.State()
return s.srv.blockingQuery( return s.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet) error { func(ws memdb.WatchSet, state *state.StateStore) error {
index, sessions, err := state.NodeSessions(ws, args.Node) index, sessions, err := state.NodeSessions(ws, args.Node)
if err != nil { if err != nil {
return err return err