agent: ensure that most agent behavior correctly respects partition configuration (#10880)

pull/10882/head
R.B. Boyer 2021-08-19 15:09:42 -05:00 committed by GitHub
parent e62b1d05d8
commit 097e1645e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 500 additions and 199 deletions

View File

@ -87,6 +87,8 @@ func (a *Agent) vetServiceUpdateWithAuthorizer(authz acl.Authorizer, serviceID s
}
func (a *Agent) vetCheckRegisterWithAuthorizer(authz acl.Authorizer, check *structs.HealthCheck) error {
// TODO(partitions)
var authzContext acl.AuthorizerContext
check.FillAuthzContext(&authzContext)
// Vet the check itself.
@ -147,7 +149,7 @@ func (a *Agent) filterMembers(token string, members *[]serf.Member) error {
}
var authzContext acl.AuthorizerContext
structs.DefaultEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext)
a.agentEnterpriseMeta().FillAuthzContext(&authzContext)
// Filter out members based on the node policy.
m := *members
for i := 0; i < len(m); i++ {
@ -188,7 +190,8 @@ func (a *Agent) filterChecksWithAuthorizer(authz acl.Authorizer, checks *map[str
continue
}
} else {
structs.DefaultEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext)
// TODO(partition): should this be a Default or Node flavored entmeta?
check.NodeEnterpriseMetaForPartition().FillAuthzContext(&authzContext)
if authz.NodeRead(a.config.NodeName, &authzContext) == acl.Allow {
continue
}

View File

@ -434,6 +434,7 @@ func LocalConfig(cfg *config.RuntimeConfig) local.Config {
DiscardCheckOutput: cfg.DiscardCheckOutput,
NodeID: cfg.NodeID,
NodeName: cfg.NodeName,
Partition: cfg.PartitionOrDefault(),
TaggedAddresses: map[string]string{},
}
for k, v := range cfg.TaggedAddresses {
@ -561,8 +562,9 @@ func (a *Agent) Start(ctx context.Context) error {
State: a.State,
Tokens: a.baseDeps.Tokens,
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
@ -1529,11 +1531,13 @@ func (a *Agent) LocalMember() serf.Member {
// LANMembers is used to retrieve the LAN members
func (a *Agent) LANMembers() []serf.Member {
// TODO(partitions): filter this by the partition?
return a.delegate.LANMembers()
}
// WANMembers is used to retrieve the WAN members
func (a *Agent) WANMembers() []serf.Member {
// TODO(partitions): filter this by the partition by omitting wan results for now?
if srv, ok := a.delegate.(*consul.Server); ok {
return srv.WANMembers()
}
@ -1646,11 +1650,12 @@ OUTER:
for segment, coord := range cs {
agentToken := a.tokens.AgentToken()
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Segment: segment,
Coord: coord,
WriteRequest: structs.WriteRequest{Token: agentToken},
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Segment: segment,
Coord: coord,
EnterpriseMeta: *a.agentEnterpriseMeta(),
WriteRequest: structs.WriteRequest{Token: agentToken},
}
var reply struct{}
// todo(kit) port all of these logger calls to hclog w/ loglevel configuration
@ -1674,7 +1679,7 @@ OUTER:
// reapServicesInternal does a single pass, looking for services to reap.
func (a *Agent) reapServicesInternal() {
reaped := make(map[structs.ServiceID]bool)
for checkID, cs := range a.State.CriticalCheckStates(structs.WildcardEnterpriseMetaInDefaultPartition()) {
for checkID, cs := range a.State.AllCriticalCheckStates() {
serviceID := cs.Check.CompoundServiceID()
// There's nothing to do if there's no service.
@ -2004,7 +2009,7 @@ func (a *Agent) addServiceInternal(req addServiceInternalRequest) error {
// Agent.Start does not have a snapshot, and we don't want to query
// State.Checks each time.
if req.checkStateSnapshot == nil {
req.checkStateSnapshot = a.State.Checks(structs.WildcardEnterpriseMetaInDefaultPartition())
req.checkStateSnapshot = a.State.AllChecks()
}
// Create an associated health check
@ -2458,6 +2463,8 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
// Need its config to know whether we should reroute checks to it
var proxy *structs.NodeService
if service != nil {
// NOTE: Both services must live in the same namespace and
// partition so this will correctly scope the results.
for _, svc := range a.State.Services(&service.EnterpriseMeta) {
if svc.Proxy.DestinationServiceID == service.ID {
proxy = svc
@ -2719,6 +2726,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
var rpcReq structs.NodeSpecificRequest
rpcReq.Datacenter = a.config.Datacenter
rpcReq.EnterpriseMeta = *a.agentEnterpriseMeta()
// The token to set is really important. The behavior below follows
// the same behavior as anti-entropy: we use the user-specified token
@ -3297,7 +3305,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
// unloadServices will deregister all services.
func (a *Agent) unloadServices() error {
for id := range a.State.Services(structs.WildcardEnterpriseMetaInDefaultPartition()) {
for id := range a.State.AllServices() {
if err := a.removeServiceLocked(id, false); err != nil {
return fmt.Errorf("Failed deregistering service '%s': %v", id, err)
}
@ -3411,7 +3419,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig, snap map[structs.CheckID]
// unloadChecks will deregister all checks known to the local agent.
func (a *Agent) unloadChecks() error {
for id := range a.State.Checks(structs.WildcardEnterpriseMetaInDefaultPartition()) {
for id := range a.State.AllChecks() {
if err := a.removeCheckLocked(id, false); err != nil {
return fmt.Errorf("Failed deregistering check '%s': %s", id, err)
}
@ -3423,7 +3431,7 @@ func (a *Agent) unloadChecks() error {
// checks. This is done before we reload our checks, so that we can properly
// restore into the same state.
func (a *Agent) snapshotCheckState() map[structs.CheckID]*structs.HealthCheck {
return a.State.Checks(structs.WildcardEnterpriseMetaInDefaultPartition())
return a.State.AllChecks()
}
// loadMetadata loads node metadata fields from the agent config and

View File

@ -82,6 +82,7 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i
PrimaryDatacenter string
NodeName string
NodeID string
Partition string `json:",omitempty"`
Revision string
Server bool
Version string
@ -90,6 +91,7 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i
PrimaryDatacenter: s.agent.config.PrimaryDatacenter,
NodeName: s.agent.config.NodeName,
NodeID: string(s.agent.config.NodeID),
Partition: s.agent.config.PartitionOrEmpty(),
Revision: s.agent.config.Revision,
Server: s.agent.config.ServerMode,
Version: s.agent.config.Version,
@ -305,6 +307,12 @@ func (s *HTTPHandlers) AgentServices(resp http.ResponseWriter, req *http.Request
return nil, err
}
if !s.validateRequestPartition(resp, &entMeta) {
return nil, nil
}
// NOTE: we're explicitly fetching things in the requested partition and
// namespace here.
services := s.agent.State.Services(&entMeta)
if err := s.agent.filterServicesWithAuthorizer(authz, &services); err != nil {
return nil, err
@ -368,6 +376,10 @@ func (s *HTTPHandlers) AgentService(resp http.ResponseWriter, req *http.Request)
sid := structs.NewServiceID(id, &entMeta)
if !s.validateRequestPartition(resp, &entMeta) {
return nil, nil
}
dc := s.agent.config.Datacenter
resultHash, service, err := s.agent.LocalBlockingQuery(false, hash, queryOpts.MaxQueryTime,
@ -400,6 +412,7 @@ func (s *HTTPHandlers) AgentService(resp http.ResponseWriter, req *http.Request)
aSvc := buildAgentService(svc, dc)
reply := &aSvc
// TODO(partitions): do we need to do anything here?
rawHash, err := hashstructure.Hash(reply, nil)
if err != nil {
return "", nil, err
@ -432,6 +445,10 @@ func (s *HTTPHandlers) AgentChecks(resp http.ResponseWriter, req *http.Request)
return nil, err
}
if !s.validateRequestPartition(resp, &entMeta) {
return nil, nil
}
var filterExpression string
s.parseFilter(req, &filterExpression)
filter, err := bexpr.CreateFilter(filterExpression, nil, nil)
@ -439,6 +456,7 @@ func (s *HTTPHandlers) AgentChecks(resp http.ResponseWriter, req *http.Request)
return nil, err
}
// NOTE(partitions): this works because nodes exist in ONE partition
checks := s.agent.State.Checks(&entMeta)
if err := s.agent.filterChecksWithAuthorizer(authz, &checks); err != nil {
return nil, err
@ -485,6 +503,8 @@ func (s *HTTPHandlers) AgentMembers(resp http.ResponseWriter, req *http.Request)
}
}
// TODO(partitions): likely partitions+segment integration will take care of this
var members []serf.Member
if wan {
members = s.agent.WANMembers()
@ -521,6 +541,7 @@ func (s *HTTPHandlers) AgentJoin(resp http.ResponseWriter, req *http.Request) (i
wan := false
if other := req.URL.Query().Get("wan"); other != "" {
wan = true
// TODO(partitions) : block wan join
}
// Get the address
@ -616,6 +637,10 @@ func (s *HTTPHandlers) AgentRegisterCheck(resp http.ResponseWriter, req *http.Re
return nil, err
}
if !s.validateRequestPartition(resp, &args.EnterpriseMeta) {
return nil, nil
}
// Construct the health check.
health := args.HealthCheck(s.agent.config.NodeName)
@ -674,6 +699,10 @@ func (s *HTTPHandlers) AgentDeregisterCheck(resp http.ResponseWriter, req *http.
return nil, err
}
if !s.validateRequestPartition(resp, &checkID.EnterpriseMeta) {
return nil, nil
}
if err := s.agent.RemoveCheck(checkID, true); err != nil {
return nil, err
}
@ -740,7 +769,7 @@ func (s *HTTPHandlers) AgentCheckUpdate(resp http.ResponseWriter, req *http.Requ
return s.agentCheckUpdate(resp, req, checkID, update.Status, update.Output)
}
func (s *HTTPHandlers) agentCheckUpdate(_resp http.ResponseWriter, req *http.Request, checkID types.CheckID, status string, output string) (interface{}, error) {
func (s *HTTPHandlers) agentCheckUpdate(resp http.ResponseWriter, req *http.Request, checkID types.CheckID, status string, output string) (interface{}, error) {
cid := structs.NewCheckID(checkID, nil)
// Get the provided token, if any, and vet against any ACL policies.
@ -762,6 +791,10 @@ func (s *HTTPHandlers) agentCheckUpdate(_resp http.ResponseWriter, req *http.Req
return nil, err
}
if !s.validateRequestPartition(resp, &cid.EnterpriseMeta) {
return nil, nil
}
if err := s.agent.updateTTLCheck(cid, status, output); err != nil {
return nil, err
}
@ -833,6 +866,10 @@ func (s *HTTPHandlers) AgentHealthServiceByID(resp http.ResponseWriter, req *htt
return nil, err
}
if !s.validateRequestPartition(resp, &entMeta) {
return nil, nil
}
sid := structs.NewServiceID(serviceID, &entMeta)
dc := s.agent.config.Datacenter
@ -891,35 +928,38 @@ func (s *HTTPHandlers) AgentHealthServiceByName(resp http.ResponseWriter, req *h
return nil, acl.ErrPermissionDenied
}
if !s.validateRequestPartition(resp, &entMeta) {
return nil, nil
}
dc := s.agent.config.Datacenter
code := http.StatusNotFound
status := fmt.Sprintf("ServiceName %s Not Found", serviceName)
services := s.agent.State.Services(&entMeta)
services := s.agent.State.ServicesByName(structs.NewServiceName(serviceName, &entMeta))
result := make([]api.AgentServiceChecksInfo, 0, 16)
for _, service := range services {
if service.Service == serviceName {
sid := structs.NewServiceID(service.ID, &entMeta)
sid := structs.NewServiceID(service.ID, &entMeta)
scode, sstatus, healthChecks := agentHealthService(sid, s)
serviceInfo := buildAgentService(service, dc)
res := api.AgentServiceChecksInfo{
AggregatedStatus: sstatus,
Checks: healthChecks,
Service: &serviceInfo,
}
result = append(result, res)
// When service is not found, we ignore it and keep existing HTTP status
if code == http.StatusNotFound {
code = scode
status = sstatus
}
// We take the worst of all statuses, so we keep iterating
// passing: 200 < warning: 429 < critical: 503
if code < scode {
code = scode
status = sstatus
}
scode, sstatus, healthChecks := agentHealthService(sid, s)
serviceInfo := buildAgentService(service, dc)
res := api.AgentServiceChecksInfo{
AggregatedStatus: sstatus,
Checks: healthChecks,
Service: &serviceInfo,
}
result = append(result, res)
// When service is not found, we ignore it and keep existing HTTP status
if code == http.StatusNotFound {
code = scode
status = sstatus
}
// We take the worst of all statuses, so we keep iterating
// passing: 200 < warning: 429 < critical: 503
if code < scode {
code = scode
status = sstatus
}
}
if returnTextPlain(req) {
@ -965,6 +1005,10 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.
return nil, err
}
if !s.validateRequestPartition(resp, &args.EnterpriseMeta) {
return nil, nil
}
// Get the node service.
ns := args.NodeService()
if ns.Weights != nil {
@ -1104,6 +1148,10 @@ func (s *HTTPHandlers) AgentDeregisterService(resp http.ResponseWriter, req *htt
return nil, err
}
if !s.validateRequestPartition(resp, &sid.EnterpriseMeta) {
return nil, nil
}
if err := s.agent.RemoveService(sid); err != nil {
return nil, err
}
@ -1403,6 +1451,10 @@ func (s *HTTPHandlers) AgentConnectCALeafCert(resp http.ResponseWriter, req *htt
args.MaxQueryTime = qOpts.MaxQueryTime
args.Token = qOpts.Token
if !s.validateRequestPartition(resp, &args.EnterpriseMeta) {
return nil, nil
}
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCALeafName, &args)
if err != nil {
return nil, err
@ -1442,6 +1494,10 @@ func (s *HTTPHandlers) AgentConnectAuthorize(resp http.ResponseWriter, req *http
return nil, BadRequestError{fmt.Sprintf("Request decode failed: %v", err)}
}
if !s.validateRequestPartition(resp, &authReq.EnterpriseMeta) {
return nil, nil
}
authz, reason, cacheMeta, err := s.agent.ConnectAuthorize(token, &authReq)
if err != nil {
return nil, err

View File

@ -0,0 +1,13 @@
// +build !consulent
package agent
import (
"net/http"
"github.com/hashicorp/consul/agent/structs"
)
func (s *HTTPHandlers) validateRequestPartition(_ http.ResponseWriter, _ *structs.EnterpriseMeta) bool {
return true
}

View File

@ -4180,7 +4180,7 @@ func testAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T, extraHCL s
resp.Body.String())
// Sanity the target service registration
svcs := a.State.Services(nil)
svcs := a.State.AllServices()
// Parse the expected definition into a ServiceDefinition
var sd structs.ServiceDefinition
@ -4229,7 +4229,7 @@ func testAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T, extraHCL s
require.NoError(err)
require.Nil(obj)
svcs := a.State.Services(nil)
svcs := a.State.AllServices()
_, ok = svcs[structs.NewServiceID(tt.wantNS.ID, nil)]
if tt.wantSidecarIDLeftAfterDereg {
require.True(ok, "removed non-sidecar service at "+tt.wantNS.ID)

View File

@ -50,3 +50,7 @@ func (a *Agent) stopLicenseManager() {}
func (a *Agent) enterpriseStats() map[string]map[string]string {
return nil
}
func (a *Agent) agentEnterpriseMeta() *structs.EnterpriseMeta {
return structs.NodeEnterpriseMetaInDefaultPartition()
}

View File

@ -108,7 +108,7 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) {
}
updateStatus := func() {
checks := c.Notify.Checks(structs.WildcardEnterpriseMetaInDefaultPartition())
checks := c.Notify.Checks(c.WildcardEnterpriseMetaForPartition())
checksList := make([]*structs.HealthCheck, 0, len(checks))
for _, chk := range checks {
checksList = append(checksList, chk)

View File

@ -688,7 +688,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
}
autoEncryptAllowTLS := boolVal(c.AutoEncrypt.AllowTLS)
autoConfig := b.autoConfigVal(c.AutoConfig)
autoConfig := b.autoConfigVal(c.AutoConfig, stringVal(c.Partition))
if autoEncryptAllowTLS || autoConfig.Enabled {
connectEnabled = true
}
@ -2231,7 +2231,7 @@ func (b *builder) makeAddrs(pri []net.Addr, sec []*net.IPAddr, port int) []net.A
return x
}
func (b *builder) autoConfigVal(raw AutoConfigRaw) AutoConfig {
func (b *builder) autoConfigVal(raw AutoConfigRaw, agentPartition string) AutoConfig {
var val AutoConfig
val.Enabled = boolValWithDefault(raw.Enabled, false)
@ -2259,12 +2259,12 @@ func (b *builder) autoConfigVal(raw AutoConfigRaw) AutoConfig {
val.IPSANs = append(val.IPSANs, ip)
}
val.Authorizer = b.autoConfigAuthorizerVal(raw.Authorization)
val.Authorizer = b.autoConfigAuthorizerVal(raw.Authorization, agentPartition)
return val
}
func (b *builder) autoConfigAuthorizerVal(raw AutoConfigAuthorizationRaw) AutoConfigAuthorizer {
func (b *builder) autoConfigAuthorizerVal(raw AutoConfigAuthorizationRaw, agentPartition string) AutoConfigAuthorizer {
// Our config file syntax wraps the static authorizer configuration in a "static" stanza. However
// internally we do not support multiple configured authorization types so the RuntimeConfig just
// inlines the static one. While we can and probably should extend the authorization types in the
@ -2272,13 +2272,16 @@ func (b *builder) autoConfigAuthorizerVal(raw AutoConfigAuthorizationRaw) AutoCo
// needed right now so the configuration types will remain simplistic until they need to be otherwise.
var val AutoConfigAuthorizer
entMeta := structs.DefaultEnterpriseMetaInPartition(agentPartition)
entMeta.Normalize()
val.Enabled = boolValWithDefault(raw.Enabled, false)
val.ClaimAssertions = raw.Static.ClaimAssertions
val.AllowReuse = boolValWithDefault(raw.Static.AllowReuse, false)
val.AuthMethod = structs.ACLAuthMethod{
Name: "Auto Config Authorizer",
Type: "jwt",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
EnterpriseMeta: *entMeta,
Config: map[string]interface{}{
"JWTSupportedAlgs": raw.Static.JWTSupportedAlgs,
"BoundAudiences": raw.Static.BoundAudiences,

View File

@ -4,4 +4,5 @@ package config
type EnterpriseRuntimeConfig struct{}
func (c *RuntimeConfig) PartitionOrEmpty() string { return "" }
func (c *RuntimeConfig) PartitionOrEmpty() string { return "" }
func (c *RuntimeConfig) PartitionOrDefault() string { return "" }

View File

@ -1457,6 +1457,7 @@ func (f *aclFilter) filterNodeServices(services **structs.NodeServices) {
}
var authzContext acl.AuthorizerContext
// TODO(partitions): put partition into this wildcard?
structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext)
if !f.allowNode((*services).Node.Node, &authzContext) {
*services = nil
@ -1481,6 +1482,7 @@ func (f *aclFilter) filterNodeServiceList(services **structs.NodeServiceList) {
}
var authzContext acl.AuthorizerContext
// TODO(partitions): put partition into this wildcard?
structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext)
if !f.allowNode((*services).Node.Node, &authzContext) {
*services = nil
@ -1578,6 +1580,7 @@ func (f *aclFilter) filterSessions(sessions *structs.Sessions) {
func (f *aclFilter) filterCoordinates(coords *structs.Coordinates) {
c := *coords
var authzContext acl.AuthorizerContext
// TODO(partitions): put partition into this wildcard?
structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext)
for i := 0; i < len(c); i++ {
@ -1619,6 +1622,7 @@ func (f *aclFilter) filterNodeDump(dump *structs.NodeDump) {
info := nd[i]
// Filter nodes
// TODO(partitions): put partition into this wildcard?
structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext)
if node := info.Node; !f.allowNode(node, &authzContext) {
f.logger.Debug("dropping node from result due to ACLs", "node", node)
@ -1687,6 +1691,7 @@ func (f *aclFilter) filterNodes(nodes *structs.Nodes) {
n := *nodes
var authzContext acl.AuthorizerContext
// TODO(partitions): put partition into this wildcard?
structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext)
for i := 0; i < len(n); i++ {

View File

@ -474,11 +474,10 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
var err error
// TODO(partitions)
if len(args.NodeMetaFilters) > 0 {
reply.Index, reply.Nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters, nil)
reply.Index, reply.Nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta)
} else {
reply.Index, reply.Nodes, err = state.Nodes(ws, nil)
reply.Index, reply.Nodes, err = state.Nodes(ws, &args.EnterpriseMeta)
}
if err != nil {
return err

View File

@ -61,6 +61,7 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
segment: c.config.Segment,
server: false,
}
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
@ -68,7 +69,7 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
return nil, err
}
addEnterpriseSerfTags(conf.Tags)
addEnterpriseSerfTags(conf.Tags, c.config.agentEnterpriseMeta())
conf.ReconnectTimeoutOverride = libserf.NewReconnectOverride(c.logger)

View File

@ -0,0 +1,9 @@
// +build !consulent
package consul
import "github.com/hashicorp/consul/agent/structs"
func (c *Config) agentEnterpriseMeta() *structs.EnterpriseMeta {
return structs.NodeEnterpriseMetaInDefaultPartition()
}

View File

@ -86,10 +86,13 @@ func (c *Coordinate) batchApplyUpdates() error {
break
}
update.EnterpriseMeta.Normalize()
updates[i] = &structs.Coordinate{
Node: update.Node,
Segment: update.Segment,
Coord: update.Coord,
Node: update.Node,
Segment: update.Segment,
Coord: update.Coord,
Partition: update.PartitionOrEmpty(),
}
i++
}
@ -138,12 +141,17 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
}
// Fetch the ACL token, if any, and enforce the node policy if enabled.
authz, err := c.srv.ResolveToken(args.Token)
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
var authzContext acl.AuthorizerContext
structs.DefaultEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext)
args.DefaultEnterpriseMetaForPartition().FillAuthzContext(&authzContext)
if authz.NodeWrite(args.Node, &authzContext) != acl.Allow {
return acl.ErrPermissionDenied
}
@ -166,6 +174,8 @@ func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.Datacenter
return err
}
// TODO(partitions):
var out []structs.DatacenterMap
// Strip the datacenter suffixes from all the node names.
@ -194,11 +204,19 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
return err
}
_, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
// TODO(partitions)
index, coords, err := state.Coordinates(ws, nil)
index, coords, err := state.Coordinates(ws, &args.EnterpriseMeta)
if err != nil {
return err
}
@ -220,21 +238,27 @@ func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.Inde
// Fetch the ACL token, if any, and enforce the node policy if enabled.
authz, err := c.srv.ResolveToken(args.Token)
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
var authzContext acl.AuthorizerContext
structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext)
args.WildcardEnterpriseMetaForPartition().FillAuthzContext(&authzContext)
if authz.NodeRead(args.Node, &authzContext) != acl.Allow {
return acl.ErrPermissionDenied
}
// TODO(partitions): do we have to add EnterpriseMeta to the reply like in Catalog.ListServices?
return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
// TODO(partitions)
index, nodeCoords, err := state.Coordinate(ws, args.Node, nil)
index, nodeCoords, err := state.Coordinate(ws, args.Node, &args.EnterpriseMeta)
if err != nil {
return err
}
@ -242,9 +266,10 @@ func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.Inde
var coords structs.Coordinates
for segment, coord := range nodeCoords {
coords = append(coords, &structs.Coordinate{
Node: args.Node,
Segment: segment,
Coord: coord,
Node: args.Node,
Segment: segment,
Partition: args.PartitionOrEmpty(),
Coord: coord,
})
}
reply.Index, reply.Coordinates = index, coords

View File

@ -84,14 +84,12 @@ func TestCoordinate_Update(t *testing.T) {
// Make sure the updates did not yet apply because the update period
// hasn't expired.
state := s1.fsm.State()
// TODO(partitions)
_, c, err := state.Coordinate(nil, "node1", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
require.Equal(t, lib.CoordinateSet{}, c)
// TODO(partitions)
_, c, err = state.Coordinate(nil, "node2", nil)
if err != nil {
t.Fatalf("err: %v", err)
@ -107,7 +105,6 @@ func TestCoordinate_Update(t *testing.T) {
// Wait a while and the updates should get picked up.
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
// TODO(partitions)
_, c, err = state.Coordinate(nil, "node1", nil)
if err != nil {
t.Fatalf("err: %v", err)
@ -117,7 +114,6 @@ func TestCoordinate_Update(t *testing.T) {
}
require.Equal(t, expected, c)
// TODO(partitions)
_, c, err = state.Coordinate(nil, "node2", nil)
if err != nil {
t.Fatalf("err: %v", err)
@ -157,7 +153,6 @@ func TestCoordinate_Update(t *testing.T) {
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
numDropped := 0
for i := 0; i < spamLen; i++ {
// TODO(partitions)
_, c, err = state.Coordinate(nil, fmt.Sprintf("bogusnode%d", i), nil)
if err != nil {
t.Fatalf("err: %v", err)

View File

@ -71,7 +71,7 @@ func (s *Server) validateEnterpriseIntentionNamespace(ns string, _ bool) error {
return errors.New("Namespaces is a Consul Enterprise feature")
}
func addEnterpriseSerfTags(_ map[string]string) {
func addEnterpriseSerfTags(_ map[string]string, _ *structs.EnterpriseMeta) {
// do nothing
}

View File

@ -47,6 +47,7 @@ func (t *txnResultsFilter) Filter(i int) bool {
result.KV.EnterpriseMeta.FillAuthzContext(&authzContext)
return t.authorizer.KeyRead(result.KV.Key, &authzContext) != acl.Allow
case result.Node != nil:
// TODO(partitions): put partition into this wildcard?
structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext)
return t.authorizer.NodeRead(result.Node.Node, &authzContext) != acl.Allow
case result.Service != nil:

View File

@ -532,6 +532,8 @@ func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error {
s.logger.Info("initializing acls")
// TODO(partitions): initialize acls in all of the partitions?
// Create/Upgrade the builtin global-management policy
_, policy, err := s.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, structs.DefaultEnterpriseMetaInDefaultPartition())
if err != nil {
@ -1110,9 +1112,13 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error {
// reconcileReaped is used to reconcile nodes that have failed and been reaped
// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
// We generate a "reap" event to cause the node to be cleaned up.
func (s *Server) reconcileReaped(known map[string]struct{}) error {
func (s *Server) reconcileReaped(known map[string]struct{}, nodeEntMeta *structs.EnterpriseMeta) error {
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
state := s.fsm.State()
_, checks, err := state.ChecksInState(nil, api.HealthAny, structs.DefaultEnterpriseMetaInDefaultPartition())
_, checks, err := state.ChecksInState(nil, api.HealthAny, nodeEntMeta)
if err != nil {
return err
}
@ -1128,7 +1134,7 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
}
// Get the node services, look for ConsulServiceID
_, services, err := state.NodeServices(nil, check.Node, structs.DefaultEnterpriseMetaInDefaultPartition())
_, services, err := state.NodeServices(nil, check.Node, nodeEntMeta)
if err != nil {
return err
}
@ -1139,8 +1145,7 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
CHECKS:
for _, service := range services.Services {
if service.ID == structs.ConsulServiceID {
// TODO(partitions)
_, node, err := state.GetNode(check.Node, nil)
_, node, err := state.GetNode(check.Node, nodeEntMeta)
if err != nil {
s.logger.Error("Unable to look up node with name", "name", check.Node, "error", err)
continue CHECKS
@ -1165,6 +1170,7 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
"role": "node",
},
}
addEnterpriseSerfTags(member.Tags, nodeEntMeta)
// Create the appropriate tags if this was a server node
if serverPort > 0 {
@ -1175,7 +1181,7 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
}
// Attempt to reap this member
if err := s.handleReapMember(member); err != nil {
if err := s.handleReapMember(member, nodeEntMeta); err != nil {
return err
}
}
@ -1187,23 +1193,28 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
func (s *Server) reconcileMember(member serf.Member) error {
// Check if this is a member we should handle
if !s.shouldHandleMember(member) {
// TODO(partition): log the partition name
s.logger.Warn("skipping reconcile of node", "member", member)
return nil
}
defer metrics.MeasureSince([]string{"leader", "reconcileMember"}, time.Now())
nodeEntMeta := getSerfMemberEnterpriseMeta(member)
var err error
switch member.Status {
case serf.StatusAlive:
err = s.handleAliveMember(member)
err = s.handleAliveMember(member, nodeEntMeta)
case serf.StatusFailed:
err = s.handleFailedMember(member)
err = s.handleFailedMember(member, nodeEntMeta)
case serf.StatusLeft:
err = s.handleLeftMember(member)
err = s.handleLeftMember(member, nodeEntMeta)
case StatusReap:
err = s.handleReapMember(member)
err = s.handleReapMember(member, nodeEntMeta)
}
if err != nil {
s.logger.Error("failed to reconcile member",
// TODO(partition): log the partition name
"member", member,
"error", err,
)
@ -1231,7 +1242,11 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
// handleAliveMember is used to ensure the node
// is registered, with a passing health check.
func (s *Server) handleAliveMember(member serf.Member) error {
func (s *Server) handleAliveMember(member serf.Member, nodeEntMeta *structs.EnterpriseMeta) error {
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Register consul service if a server
var service *structs.NodeService
if valid, parts := metadata.IsConsulServer(member); valid {
@ -1243,6 +1258,7 @@ func (s *Server) handleAliveMember(member serf.Member) error {
Passing: 1,
Warning: 1,
},
EnterpriseMeta: *nodeEntMeta,
Meta: map[string]string{
// DEPRECATED - remove nonvoter in favor of read_replica in a future version of consul
"non_voter": strconv.FormatBool(member.Tags["nonvoter"] == "1"),
@ -1263,8 +1279,7 @@ func (s *Server) handleAliveMember(member serf.Member) error {
// Check if the node exists
state := s.fsm.State()
// TODO(partitions)
_, node, err := state.GetNode(member.Name, nil)
_, node, err := state.GetNode(member.Name, nodeEntMeta)
if err != nil {
return err
}
@ -1272,7 +1287,7 @@ func (s *Server) handleAliveMember(member serf.Member) error {
// Check if the associated service is available
if service != nil {
match := false
_, services, err := state.NodeServices(nil, member.Name, structs.DefaultEnterpriseMetaInDefaultPartition())
_, services, err := state.NodeServices(nil, member.Name, nodeEntMeta)
if err != nil {
return err
}
@ -1290,7 +1305,7 @@ func (s *Server) handleAliveMember(member serf.Member) error {
}
// Check if the serfCheck is in the passing state
_, checks, err := state.NodeChecks(nil, member.Name, structs.DefaultEnterpriseMetaInDefaultPartition())
_, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta)
if err != nil {
return err
}
@ -1317,6 +1332,7 @@ AFTER_CHECK:
Status: api.HealthPassing,
Output: structs.SerfCheckAliveOutput,
},
EnterpriseMeta: *nodeEntMeta,
}
if node != nil {
req.TaggedAddresses = node.TaggedAddresses
@ -1329,11 +1345,14 @@ AFTER_CHECK:
// handleFailedMember is used to mark the node's status
// as being critical, along with all checks as unknown.
func (s *Server) handleFailedMember(member serf.Member) error {
func (s *Server) handleFailedMember(member serf.Member, nodeEntMeta *structs.EnterpriseMeta) error {
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Check if the node exists
state := s.fsm.State()
// TODO(partitions)
_, node, err := state.GetNode(member.Name, nil)
_, node, err := state.GetNode(member.Name, nodeEntMeta)
if err != nil {
return err
}
@ -1343,9 +1362,11 @@ func (s *Server) handleFailedMember(member serf.Member) error {
return nil
}
// TODO(partitions): get the ent meta by parsing serf tags
if node.Address == member.Addr.String() {
// Check if the serfCheck is in the critical state
_, checks, err := state.NodeChecks(nil, member.Name, structs.DefaultEnterpriseMetaInDefaultPartition())
_, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta)
if err != nil {
return err
}
@ -1359,10 +1380,11 @@ func (s *Server) handleFailedMember(member serf.Member) error {
// Register with the catalog
req := structs.RegisterRequest{
Datacenter: s.config.Datacenter,
Node: member.Name,
ID: types.NodeID(member.Tags["id"]),
Address: member.Addr.String(),
Datacenter: s.config.Datacenter,
Node: member.Name,
EnterpriseMeta: *nodeEntMeta,
ID: types.NodeID(member.Tags["id"]),
Address: member.Addr.String(),
Check: &structs.HealthCheck{
Node: member.Name,
CheckID: structs.SerfCheckID,
@ -1381,18 +1403,22 @@ func (s *Server) handleFailedMember(member serf.Member) error {
// handleLeftMember is used to handle members that gracefully
// left. They are deregistered if necessary.
func (s *Server) handleLeftMember(member serf.Member) error {
return s.handleDeregisterMember("left", member)
func (s *Server) handleLeftMember(member serf.Member, nodeEntMeta *structs.EnterpriseMeta) error {
return s.handleDeregisterMember("left", member, nodeEntMeta)
}
// handleReapMember is used to handle members that have been
// reaped after a prolonged failure. They are deregistered.
func (s *Server) handleReapMember(member serf.Member) error {
return s.handleDeregisterMember("reaped", member)
func (s *Server) handleReapMember(member serf.Member, nodeEntMeta *structs.EnterpriseMeta) error {
return s.handleDeregisterMember("reaped", member, nodeEntMeta)
}
// handleDeregisterMember is used to deregister a member of a given reason
func (s *Server) handleDeregisterMember(reason string, member serf.Member) error {
func (s *Server) handleDeregisterMember(reason string, member serf.Member, nodeEntMeta *structs.EnterpriseMeta) error {
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
@ -1410,8 +1436,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
// Check if the node does not exist
state := s.fsm.State()
// TODO(partitions)
_, node, err := state.GetNode(member.Name, nil)
_, node, err := state.GetNode(member.Name, nodeEntMeta)
if err != nil {
return err
}
@ -1422,8 +1447,9 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
// Deregister the node
s.logger.Info("deregistering member", "member", member.Name, "reason", reason)
req := structs.DeregisterRequest{
Datacenter: s.config.Datacenter,
Node: member.Name,
Datacenter: s.config.Datacenter,
Node: member.Name,
EnterpriseMeta: *nodeEntMeta,
}
_, err = s.raftApply(structs.DeregisterRequestType, &req)
return err

View File

@ -49,7 +49,6 @@ func TestLeader_RegisterMember(t *testing.T) {
// Client should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
@ -79,7 +78,6 @@ func TestLeader_RegisterMember(t *testing.T) {
// Server should be registered
retry.Run(t, func(r *retry.R) {
// TODO(partitions)
_, node, err := state.GetNode(s1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
@ -129,7 +127,6 @@ func TestLeader_FailedMember(t *testing.T) {
// Should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
@ -191,7 +188,6 @@ func TestLeader_LeftMember(t *testing.T) {
// Should be registered
retry.Run(t, func(r *retry.R) {
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
@ -207,7 +203,6 @@ func TestLeader_LeftMember(t *testing.T) {
// Should be deregistered
retry.Run(t, func(r *retry.R) {
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
@ -243,7 +238,6 @@ func TestLeader_ReapMember(t *testing.T) {
// Should be registered
retry.Run(t, func(r *retry.R) {
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
@ -269,7 +263,6 @@ func TestLeader_ReapMember(t *testing.T) {
// anti-entropy will put it back.
reaped := false
for start := time.Now(); time.Since(start) < 5*time.Second; {
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
@ -367,7 +360,7 @@ func TestLeader_CheckServersMeta(t *testing.T) {
member.Tags["nonvoter"] = "1"
member.Tags["read_replica"] = "1"
member.Tags["build"] = versionToExpect
err := s1.handleAliveMember(member)
err := s1.handleAliveMember(member, nil)
if err != nil {
r.Fatalf("Unexpected error :%v", err)
}
@ -439,7 +432,6 @@ func TestLeader_ReapServer(t *testing.T) {
// s3 should be registered
retry.Run(t, func(r *retry.R) {
// TODO(partitions)
_, node, err := state.GetNode(s3.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
@ -454,14 +446,13 @@ func TestLeader_ReapServer(t *testing.T) {
knownMembers[s1.config.NodeName] = struct{}{}
knownMembers[s2.config.NodeName] = struct{}{}
err := s1.reconcileReaped(knownMembers)
err := s1.reconcileReaped(knownMembers, nil)
if err != nil {
t.Fatalf("Unexpected error :%v", err)
}
// s3 should be deregistered
retry.Run(t, func(r *retry.R) {
// TODO(partitions)
_, node, err := state.GetNode(s3.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
@ -517,7 +508,6 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) {
// Node should be gone
state := s1.fsm.State()
// TODO(partitions)
_, node, err := state.GetNode("no-longer-around", nil)
if err != nil {
t.Fatalf("err: %v", err)
@ -551,7 +541,6 @@ func TestLeader_Reconcile(t *testing.T) {
// Should not be registered
state := s1.fsm.State()
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
@ -562,7 +551,6 @@ func TestLeader_Reconcile(t *testing.T) {
// Should be registered
retry.Run(t, func(r *retry.R) {
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
@ -595,7 +583,6 @@ func TestLeader_Reconcile_Races(t *testing.T) {
state := s1.fsm.State()
var nodeAddr string
retry.Run(t, func(r *retry.R) {
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
@ -632,7 +619,6 @@ func TestLeader_Reconcile_Races(t *testing.T) {
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
@ -657,7 +643,6 @@ func TestLeader_Reconcile_Races(t *testing.T) {
})
// Make sure the metadata didn't get clobbered.
// TODO(partitions)
_, node, err = state.GetNode(c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
@ -773,7 +758,6 @@ func TestLeader_LeftLeader(t *testing.T) {
// Verify the old leader is deregistered
state := remain.fsm.State()
retry.Run(t, func(r *retry.R) {
// TODO(partitions)
_, node, err := state.GetNode(leader.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)

View File

@ -3,10 +3,11 @@ package consul
import (
"fmt"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-version"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/types"
)
// lanMergeDelegate is used to handle a cluster merge on the LAN gossip
@ -17,6 +18,14 @@ type lanMergeDelegate struct {
nodeID types.NodeID
nodeName string
segment string
// TODO(partitions): use server and partition to reject gossip messages
// from nodes in the wrong partition depending upon the role the node is
// playing. For example servers will always be in the default partition,
// but all clients in all partitions should be aware of the servers so that
// general RPC routing works.
server bool
partition string
}
// uniqueIDMinVersion is the lowest version where we insist that nodes

View File

@ -22,8 +22,7 @@ func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort.
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
// TODO(partitions)
_, other, err := state.Coordinate(nil, node.Node, nil)
_, other, err := state.Coordinate(nil, node.Node, node.GetEnterpriseMeta())
if err != nil {
return nil, err
}
@ -63,8 +62,7 @@ func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.Servic
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
// TODO(partitions)
_, other, err := state.Coordinate(nil, node.Node, nil)
_, other, err := state.Coordinate(nil, node.Node, &node.EnterpriseMeta)
if err != nil {
return nil, err
}
@ -104,8 +102,7 @@ func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.Healt
state := s.fsm.State()
vec := make([]float64, len(checks))
for i, check := range checks {
// TODO(partitions)
_, other, err := state.Coordinate(nil, check.Node, nil)
_, other, err := state.Coordinate(nil, check.Node, &check.EnterpriseMeta)
if err != nil {
return nil, err
}
@ -145,8 +142,7 @@ func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.C
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
// TODO(partitions)
_, other, err := state.Coordinate(nil, node.Node.Node, nil)
_, other, err := state.Coordinate(nil, node.Node.Node, node.Node.GetEnterpriseMeta())
if err != nil {
return nil, err
}

View File

@ -8,8 +8,9 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/agent/structs"
)
var SegmentOSSSummaries = []prometheus.SummaryDefinition{
@ -62,12 +63,17 @@ func (s *Server) setupSegments(config *Config, port int, rpcListeners map[string
func (s *Server) floodSegments(config *Config) {
}
func getSerfMemberEnterpriseMeta(member serf.Member) *structs.EnterpriseMeta {
return structs.NodeEnterpriseMetaInDefaultPartition()
}
// reconcile is used to reconcile the differences between Serf membership and
// what is reflected in our strongly consistent store. Mainly we need to ensure
// all live nodes are registered, all failed nodes are marked as such, and all
// left nodes are deregistered.
func (s *Server) reconcile() (err error) {
defer metrics.MeasureSince([]string{"leader", "reconcile"}, time.Now())
members := s.serfLAN.Members()
knownMembers := make(map[string]struct{})
for _, member := range members {
@ -79,5 +85,5 @@ func (s *Server) reconcile() (err error) {
// Reconcile any members that have been reaped while we were not the
// leader.
return s.reconcileReaped(knownMembers)
return s.reconcileReaped(knownMembers, nil)
}

View File

@ -117,6 +117,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
nodeID: s.config.NodeID,
nodeName: s.config.NodeName,
segment: segment,
server: true,
}
}
@ -175,7 +176,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.ReconnectTimeoutOverride = libserf.NewReconnectOverride(s.logger)
addEnterpriseSerfTags(conf.Tags)
addEnterpriseSerfTags(conf.Tags, s.config.agentEnterpriseMeta())
if s.config.OverrideInitialSerfTags != nil {
s.config.OverrideInitialSerfTags(conf.Tags)

View File

@ -6,6 +6,7 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/agent/structs"
)
@ -46,6 +47,7 @@ func (s *Server) initializeSessionTimers() error {
// Scan all sessions and reset their timer
state := s.fsm.State()
// TODO(partitions): track all session timers in all partitions
_, sessions, err := state.SessionList(nil, structs.WildcardEnterpriseMetaInDefaultPartition())
if err != nil {
return err

View File

@ -234,7 +234,6 @@ func TestTxn_Apply(t *testing.T) {
t.Fatalf("bad: %v", d)
}
// TODO(partitions)
_, n, err := state.GetNode("foo", nil)
if err != nil {
t.Fatalf("err: %v", err)

View File

@ -82,6 +82,9 @@ func (s *HTTPHandlers) CoordinateNodes(resp http.ResponseWriter, req *http.Reque
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
if err := parseEntMetaPartition(req, &args.EnterpriseMeta); err != nil {
return nil, err
}
var out structs.IndexedCoordinates
defer setMeta(resp, &out.QueryMeta)
@ -105,6 +108,9 @@ func (s *HTTPHandlers) CoordinateNode(resp http.ResponseWriter, req *http.Reques
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
if err := parseEntMetaPartition(req, &args.EnterpriseMeta); err != nil {
return nil, err
}
var out structs.IndexedCoordinates
defer setMeta(resp, &out.QueryMeta)
@ -158,6 +164,10 @@ func (s *HTTPHandlers) CoordinateUpdate(resp http.ResponseWriter, req *http.Requ
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil {
return nil, err
}
var reply struct{}
if err := s.agent.RPC("Coordinate.Update", &args, &reply); err != nil {
return nil, err

View File

@ -122,6 +122,8 @@ type DNSServer struct {
// recursorEnabled stores whever the recursor handler is enabled as an atomic flag.
// the recursor handler is only enabled if recursors are configured. This flag is used during config hot-reloading
recursorEnabled uint32
defaultEnterpriseMeta structs.EnterpriseMeta
}
func NewDNSServer(a *Agent) (*DNSServer, error) {
@ -130,10 +132,11 @@ func NewDNSServer(a *Agent) (*DNSServer, error) {
altDomain := dns.Fqdn(strings.ToLower(a.config.DNSAltDomain))
srv := &DNSServer{
agent: a,
domain: domain,
altDomain: altDomain,
logger: a.logger.Named(logging.DNS),
agent: a,
domain: domain,
altDomain: altDomain,
logger: a.logger.Named(logging.DNS),
defaultEnterpriseMeta: *a.agentEnterpriseMeta(),
}
cfg, err := GetDNSConfig(a.config)
if err != nil {
@ -414,7 +417,7 @@ func (d *DNSServer) handlePtr(resp dns.ResponseWriter, req *dns.Msg) {
AllowStale: cfg.AllowStale,
},
ServiceAddress: serviceAddress,
EnterpriseMeta: *structs.WildcardEnterpriseMetaInDefaultPartition(),
EnterpriseMeta: *d.defaultEnterpriseMeta.WildcardEnterpriseMetaForPartition(),
}
var sout structs.IndexedServiceNodes
@ -548,7 +551,7 @@ func (d *DNSServer) nameservers(cfg *dnsConfig, maxRecursionLevel int) (ns []dns
Service: structs.ConsulServiceName,
Connect: false,
Ingress: false,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
EnterpriseMeta: d.defaultEnterpriseMeta,
})
if err != nil {
d.logger.Warn("Unable to get list of servers", "error", err)
@ -645,8 +648,8 @@ func (d *DNSServer) dispatch(remoteAddr net.Addr, req, resp *dns.Msg, maxRecursi
// By default the query is in the default datacenter
datacenter := d.agent.config.Datacenter
// have to deref to clone it so we don't modify
var entMeta structs.EnterpriseMeta
// have to deref to clone it so we don't modify (start from the agent's defaults)
var entMeta = d.defaultEnterpriseMeta
// Get the QName without the domain suffix
qName := strings.ToLower(dns.Fqdn(req.Question[0].Name))
@ -1316,9 +1319,10 @@ func (d *DNSServer) preparedQueryLookup(cfg *dnsConfig, datacenter, query string
// send the local agent's data through to allow distance sorting
// relative to ourself on the server side.
Agent: structs.QuerySource{
Datacenter: d.agent.config.Datacenter,
Segment: d.agent.config.SegmentName,
Node: d.agent.config.NodeName,
Datacenter: d.agent.config.Datacenter,
Segment: d.agent.config.SegmentName,
Node: d.agent.config.NodeName,
NodePartition: d.agent.config.PartitionOrEmpty(),
},
}

View File

@ -333,6 +333,11 @@ func (s *HTTPHandlers) nodeName() string {
return s.agent.config.NodeName
}
// nodePartition returns the node partition of the agent
func (s *HTTPHandlers) nodePartition() string {
return s.agent.config.PartitionOrEmpty()
}
// aclEndpointRE is used to find old ACL endpoints that take tokens in the URL
// so that we can redact them. The ACL endpoints that take the token in the URL
// are all of the form /v1/acl/<verb>/<token>, and can optionally include query
@ -1032,6 +1037,7 @@ func (s *HTTPHandlers) parseSource(req *http.Request, source *structs.QuerySourc
} else {
source.Node = node
}
source.NodePartition = s.agent.config.PartitionOrEmpty()
}
}

View File

@ -17,7 +17,8 @@ func (s *HTTPHandlers) parseEntMeta(req *http.Request, entMeta *structs.Enterpri
if queryNS := req.URL.Query().Get("ns"); queryNS != "" {
return BadRequestError{Reason: "Invalid query parameter: \"ns\" - Namespaces are a Consul Enterprise feature"}
}
return nil
return parseEntMetaPartition(req, entMeta)
}
func (s *HTTPHandlers) validateEnterpriseIntentionNamespace(logName, ns string, _ bool) error {
@ -74,7 +75,13 @@ func (s *HTTPHandlers) uiTemplateDataTransform(data map[string]interface{}) erro
return nil
}
// parseEntMetaPartition is a noop for the enterprise implementation.
func parseEntMetaPartition(req *http.Request, meta *structs.EnterpriseMeta) error {
if headerAP := req.Header.Get("X-Consul-Partition"); headerAP != "" {
return BadRequestError{Reason: "Invalid header: \"X-Consul-Partition\" - Partitions are a Consul Enterprise feature"}
}
if queryAP := req.URL.Query().Get("partition"); queryAP != "" {
return BadRequestError{Reason: "Invalid query parameter: \"partition\" - Partitions are a Consul Enterprise feature"}
}
return nil
}

View File

@ -58,6 +58,7 @@ type Config struct {
DiscardCheckOutput bool
NodeID types.NodeID
NodeName string
Partition string // this defaults if empty
TaggedAddresses map[string]string
}
@ -176,6 +177,8 @@ type State struct {
// Config is the agent config
config Config
agentEnterpriseMeta structs.EnterpriseMeta
// nodeInfoInSync tracks whether the server has our correct top-level
// node information in sync
nodeInfoInSync bool
@ -208,14 +211,15 @@ type State struct {
// NewState creates a new local state for the agent.
func NewState(c Config, logger hclog.Logger, tokens *token.Store) *State {
l := &State{
config: c,
logger: logger,
services: make(map[structs.ServiceID]*ServiceState),
checks: make(map[structs.CheckID]*CheckState),
checkAliases: make(map[structs.ServiceID]map[structs.CheckID]chan<- struct{}),
metadata: make(map[string]string),
tokens: tokens,
notifyHandlers: make(map[chan<- struct{}]struct{}),
config: c,
logger: logger,
services: make(map[structs.ServiceID]*ServiceState),
checks: make(map[structs.CheckID]*CheckState),
checkAliases: make(map[structs.ServiceID]map[structs.CheckID]chan<- struct{}),
metadata: make(map[string]string),
tokens: tokens,
notifyHandlers: make(map[chan<- struct{}]struct{}),
agentEnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(c.Partition),
}
l.SetDiscardCheckOutput(c.DiscardCheckOutput)
return l
@ -267,6 +271,10 @@ func (l *State) addServiceLocked(service *structs.NodeService, token string) err
service.ID = service.Service
}
if l.agentEnterpriseMeta.PartitionOrDefault() != service.PartitionOrDefault() {
return fmt.Errorf("cannot add service %q to node in partition %q", service.CompoundServiceID(), l.config.Partition)
}
l.setServiceStateLocked(&ServiceState{
Service: service,
Token: token,
@ -340,8 +348,8 @@ func (l *State) removeServiceLocked(id structs.ServiceID) error {
return nil
}
// Service returns the locally registered service that the
// agent is aware of and are being kept in sync with the server
// Service returns the locally registered service that the agent is aware of
// with this ID and are being kept in sync with the server.
func (l *State) Service(id structs.ServiceID) *structs.NodeService {
l.RLock()
defer l.RUnlock()
@ -353,9 +361,43 @@ func (l *State) Service(id structs.ServiceID) *structs.NodeService {
return s.Service
}
// Services returns the locally registered services that the
// ServicesByName returns all the locally registered service instances that the
// agent is aware of with this name and are being kept in sync with the server
func (l *State) ServicesByName(sn structs.ServiceName) []*structs.NodeService {
l.RLock()
defer l.RUnlock()
var found []*structs.NodeService
for id, s := range l.services {
if s.Deleted {
continue
}
if !sn.EnterpriseMeta.Matches(&id.EnterpriseMeta) {
continue
}
if s.Service.Service == sn.Name {
found = append(found, s.Service)
}
}
return found
}
// AllServices returns the locally registered services that the
// agent is aware of and are being kept in sync with the server
func (l *State) AllServices() map[structs.ServiceID]*structs.NodeService {
return l.listServices(false, nil)
}
// Services returns the locally registered services that the agent is aware of
// and are being kept in sync with the server
//
// Results are scoped to the provided namespace and partition.
func (l *State) Services(entMeta *structs.EnterpriseMeta) map[structs.ServiceID]*structs.NodeService {
return l.listServices(true, entMeta)
}
func (l *State) listServices(filtered bool, entMeta *structs.EnterpriseMeta) map[structs.ServiceID]*structs.NodeService {
l.RLock()
defer l.RUnlock()
@ -365,7 +407,7 @@ func (l *State) Services(entMeta *structs.EnterpriseMeta) map[structs.ServiceID]
continue
}
if !entMeta.Matches(&id.EnterpriseMeta) {
if filtered && !entMeta.Matches(&id.EnterpriseMeta) {
continue
}
m[id] = s.Service
@ -395,6 +437,10 @@ func (l *State) SetServiceState(s *ServiceState) {
l.Lock()
defer l.Unlock()
if l.agentEnterpriseMeta.PartitionOrDefault() != s.Service.PartitionOrDefault() {
return
}
l.setServiceStateLocked(s)
}
@ -483,15 +529,19 @@ func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error {
check.Output = ""
}
// hard-set the node name and partition
check.Node = l.config.NodeName
check.EnterpriseMeta = structs.NewEnterpriseMetaWithPartition(
l.agentEnterpriseMeta.PartitionOrEmpty(),
check.NamespaceOrEmpty(),
)
// if there is a serviceID associated with the check, make sure it exists before adding it
// NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor
if _, ok := l.services[check.CompoundServiceID()]; check.ServiceID != "" && !ok {
return fmt.Errorf("Check %q refers to non-existent service %q", check.CheckID, check.ServiceID)
}
// hard-set the node name
check.Node = l.config.NodeName
l.setCheckStateLocked(&CheckState{
Check: check,
Token: token,
@ -510,6 +560,13 @@ func (l *State) AddAliasCheck(checkID structs.CheckID, srcServiceID structs.Serv
l.Lock()
defer l.Unlock()
if l.agentEnterpriseMeta.PartitionOrDefault() != checkID.PartitionOrDefault() {
return fmt.Errorf("cannot add alias check %q to node in partition %q", checkID.String(), l.config.Partition)
}
if l.agentEnterpriseMeta.PartitionOrDefault() != srcServiceID.PartitionOrDefault() {
return fmt.Errorf("cannot add alias check for %q to node in partition %q", srcServiceID.String(), l.config.Partition)
}
m, ok := l.checkAliases[srcServiceID]
if !ok {
m = make(map[structs.CheckID]chan<- struct{})
@ -663,11 +720,23 @@ func (l *State) Check(id structs.CheckID) *structs.HealthCheck {
return c.Check
}
// AllChecks returns the locally registered checks that the
// agent is aware of and are being kept in sync with the server
func (l *State) AllChecks() map[structs.CheckID]*structs.HealthCheck {
return l.listChecks(false, nil)
}
// Checks returns the locally registered checks that the
// agent is aware of and are being kept in sync with the server
//
// Results are scoped to the provided namespace and partition.
func (l *State) Checks(entMeta *structs.EnterpriseMeta) map[structs.CheckID]*structs.HealthCheck {
return l.listChecks(true, entMeta)
}
func (l *State) listChecks(filtered bool, entMeta *structs.EnterpriseMeta) map[structs.CheckID]*structs.HealthCheck {
m := make(map[structs.CheckID]*structs.HealthCheck)
for id, c := range l.CheckStates(entMeta) {
for id, c := range l.listCheckStates(filtered, entMeta) {
m[id] = c.Check
}
return m
@ -719,6 +788,10 @@ func (l *State) SetCheckState(c *CheckState) {
l.Lock()
defer l.Unlock()
if l.agentEnterpriseMeta.PartitionOrDefault() != c.Check.PartitionOrDefault() {
return
}
l.setCheckStateLocked(c)
}
@ -737,11 +810,25 @@ func (l *State) setCheckStateLocked(c *CheckState) {
l.TriggerSyncChanges()
}
// AllCheckStates returns a shallow copy of all health check state records.
// The map contains a shallow copy of the current check states.
//
// The defer timers still point to the original values and must not be modified.
func (l *State) AllCheckStates() map[structs.CheckID]*CheckState {
return l.listCheckStates(false, nil)
}
// CheckStates returns a shallow copy of all health check state records.
// The map contains a shallow copy of the current check states.
//
// The defer timers still point to the original values and must not be modified.
//
// Results are scoped to the provided namespace and partition.
func (l *State) CheckStates(entMeta *structs.EnterpriseMeta) map[structs.CheckID]*CheckState {
return l.listCheckStates(true, entMeta)
}
func (l *State) listCheckStates(filtered bool, entMeta *structs.EnterpriseMeta) map[structs.CheckID]*CheckState {
l.RLock()
defer l.RUnlock()
@ -750,7 +837,7 @@ func (l *State) CheckStates(entMeta *structs.EnterpriseMeta) map[structs.CheckID
if c.Deleted {
continue
}
if !entMeta.Matches(&id.EnterpriseMeta) {
if filtered && !entMeta.Matches(&id.EnterpriseMeta) {
continue
}
m[id] = c.Clone()
@ -758,12 +845,27 @@ func (l *State) CheckStates(entMeta *structs.EnterpriseMeta) map[structs.CheckID
return m
}
// AllCriticalCheckStates returns the locally registered checks that the
// agent is aware of and are being kept in sync with the server.
// The map contains a shallow copy of the current check states.
//
// The defer timers still point to the original values and must not be modified.
func (l *State) AllCriticalCheckStates() map[structs.CheckID]*CheckState {
return l.listCriticalCheckStates(false, nil)
}
// CriticalCheckStates returns the locally registered checks that the
// agent is aware of and are being kept in sync with the server.
// The map contains a shallow copy of the current check states.
//
// The defer timers still point to the original values and must not be modified.
//
// Results are scoped to the provided namespace and partition.
func (l *State) CriticalCheckStates(entMeta *structs.EnterpriseMeta) map[structs.CheckID]*CheckState {
return l.listCriticalCheckStates(true, entMeta)
}
func (l *State) listCriticalCheckStates(filtered bool, entMeta *structs.EnterpriseMeta) map[structs.CheckID]*CheckState {
l.RLock()
defer l.RUnlock()
@ -772,7 +874,7 @@ func (l *State) CriticalCheckStates(entMeta *structs.EnterpriseMeta) map[structs
if c.Deleted || !c.Critical() {
continue
}
if !entMeta.Matches(&id.EnterpriseMeta) {
if filtered && !entMeta.Matches(&id.EnterpriseMeta) {
continue
}
m[id] = c.Clone()
@ -887,7 +989,7 @@ func (l *State) updateSyncState() error {
AllowStale: true,
MaxStaleDuration: fullSyncReadMaxStale,
},
EnterpriseMeta: *structs.WildcardEnterpriseMetaInDefaultPartition(),
EnterpriseMeta: *l.agentEnterpriseMeta.WildcardEnterpriseMetaForPartition(),
}
var out1 structs.IndexedNodeServiceList
@ -958,7 +1060,7 @@ func (l *State) updateSyncState() error {
if ls == nil {
// The consul service is managed automatically and does
// not need to be deregistered
if id == structs.ConsulCompoundServiceID {
if structs.IsConsulServiceID(id) {
continue
}
@ -1002,7 +1104,7 @@ func (l *State) updateSyncState() error {
if lc == nil {
// The Serf check is created automatically and does not
// need to be deregistered.
if id == structs.SerfCompoundCheckID {
if structs.IsSerfCheckID(id) {
l.logger.Debug("Skipping remote check since it is managed automatically", "check", structs.SerfCheckID)
continue
}
@ -1366,6 +1468,7 @@ func (l *State) syncNodeInfo() error {
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
NodeMeta: l.metadata,
EnterpriseMeta: l.agentEnterpriseMeta,
WriteRequest: structs.WriteRequest{Token: at},
}
var out struct{}

View File

@ -86,7 +86,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: s.proxyCfg.DestinationServiceName,
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(s.proxyID.NamespaceOrEmpty()),
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, intentionUpstreamsID, s.ch)
if err != nil {
return snap, err
@ -97,7 +97,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
Name: structs.MeshConfigMesh,
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
EnterpriseMeta: *s.proxyID.DefaultEnterpriseMetaForPartition(),
}, meshConfigEntryID, s.ch)
if err != nil {
return snap, err
@ -228,7 +228,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv
// Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream
// This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled
// by the ResolveServiceConfig endpoint.
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMetaInDefaultPartition())
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, s.proxyID.WildcardEnterpriseMetaForPartition())
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardSID.String()]
if ok {
u = defaults

View File

@ -145,7 +145,7 @@ func (m *Manager) syncState() {
defer m.mu.Unlock()
// Traverse the local state and ensure all proxy services are registered
services := m.State.Services(structs.WildcardEnterpriseMetaInDefaultPartition())
services := m.State.AllServices()
for sid, svc := range services {
if svc.Kind != structs.ServiceKindConnectProxy &&
svc.Kind != structs.ServiceKindTerminatingGateway &&

View File

@ -330,6 +330,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
rootsCacheKey, leafCacheKey,
roots,
webProxyCopy.(*structs.NodeService),
local.Config{},
expectSnapCopy.(*ConfigSnapshot),
)
})
@ -349,13 +350,14 @@ func testManager_BasicLifecycle(
rootsCacheKey, leafCacheKey string,
roots *structs.IndexedCARoots,
webProxy *structs.NodeService,
agentConfig local.Config,
expectSnap *ConfigSnapshot,
) {
c := TestCacheWithTypes(t, types)
require := require.New(t)
logger := testutil.Logger(t)
state := local.NewState(local.Config{}, logger, &token.Store{})
state := local.NewState(agentConfig, logger, &token.Store{})
source := &structs.QuerySource{Datacenter: "dc1"}
// Stub state syncing

View File

@ -29,12 +29,14 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
return snap, err
}
wildcardEntMeta := s.proxyID.WildcardEnterpriseMetaForPartition()
// Watch for all services
err = s.cache.Notify(ctx, cachetype.CatalogServiceListName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
EnterpriseMeta: *structs.WildcardEnterpriseMetaInDefaultPartition(),
EnterpriseMeta: *wildcardEntMeta,
}, serviceListWatchID, s.ch)
if err != nil {
@ -85,7 +87,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Kind: structs.ServiceResolver,
EnterpriseMeta: *structs.WildcardEnterpriseMetaInDefaultPartition(),
EnterpriseMeta: *wildcardEntMeta,
}, serviceResolversWatchID, s.ch)
if err != nil {
s.logger.Named(logging.MeshGateway).

View File

@ -127,6 +127,7 @@ func TestUpstreamNodes(t testing.T) structs.CheckServiceNodes {
Node: "test1",
Address: "10.10.1.1",
Datacenter: "dc1",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
},
Service: structs.TestNodeService(t),
},
@ -136,6 +137,7 @@ func TestUpstreamNodes(t testing.T) structs.CheckServiceNodes {
Node: "test2",
Address: "10.10.1.2",
Datacenter: "dc1",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
},
Service: structs.TestNodeService(t),
},

View File

@ -229,7 +229,8 @@ func (s *handlerUpstreams) resetWatchesFromChain(
// Outside of transparent mode we only watch the chain target, B,
// since A is a virtual service and traffic will not be sent to it.
if !watchedChainEndpoints && s.proxyCfg.Mode == structs.ProxyModeTransparent {
chainEntMeta := structs.NewEnterpriseMetaInDefaultPartition(chain.Namespace)
// TODO(partitions): add partition to the disco chain
chainEntMeta := structs.NewEnterpriseMetaWithPartition("" /*TODO*/, chain.Namespace)
opts := targetWatchOpts{
upstreamID: id,

View File

@ -51,7 +51,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
logger.Trace("new subscription")
defer logger.Trace("subscription closed")
entMeta := structs.NewEnterpriseMetaInDefaultPartition(req.Namespace)
entMeta := structs.NewEnterpriseMetaWithPartition(req.Partition, req.Namespace)
authz, err := h.Backend.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil)
if err != nil {
return err
@ -94,6 +94,7 @@ func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs
Token: req.Token,
Index: req.Index,
Namespace: entMeta.NamespaceOrEmpty(),
Partition: entMeta.PartitionOrEmpty(),
}
}

View File

@ -29,6 +29,7 @@ func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index ui
Datacenter: srvReq.Datacenter,
Index: index,
Namespace: srvReq.EnterpriseMeta.NamespaceOrEmpty(),
Partition: srvReq.EnterpriseMeta.PartitionOrEmpty(),
}
if srvReq.Connect {
req.Topic = pbsubscribe.Topic_ServiceHealthConnect

View File

@ -126,7 +126,7 @@ func (a *Agent) sidecarServiceFromNodeService(ns *structs.NodeService, token str
// it doesn't seem to be necessary - even with thousands of services this is
// not expensive to compute.
usedPorts := make(map[int]struct{})
for _, otherNS := range a.State.Services(structs.WildcardEnterpriseMetaInDefaultPartition()) {
for _, otherNS := range a.State.AllServices() {
// Check if other port is in auto-assign range
if otherNS.Port >= a.config.ConnectSidecarMinPort &&
otherNS.Port <= a.config.ConnectSidecarMaxPort {

View File

@ -19,8 +19,3 @@ const (
ConsulServiceID = "consul"
ConsulServiceName = "consul"
)
var (
ConsulCompoundServiceID = NewServiceID(ConsulServiceID, nil) // TODO(partitions): delete this in favor of IsConsulServiceID(ServiceID)
SerfCompoundCheckID = NewCheckID(SerfCheckID, nil) // TODO(partitions): delete this in favor of IsSerfCheckID(CheckID)
)

View File

@ -152,11 +152,14 @@ func (s *HTTPHandlers) convertOps(resp http.ResponseWriter, req *http.Request) (
KV: &structs.TxnKVOp{
Verb: verb,
DirEnt: structs.DirEntry{
Key: in.KV.Key,
Value: in.KV.Value,
Flags: in.KV.Flags,
Session: in.KV.Session,
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(in.KV.Namespace),
Key: in.KV.Key,
Value: in.KV.Value,
Flags: in.KV.Flags,
Session: in.KV.Session,
EnterpriseMeta: structs.NewEnterpriseMetaWithPartition(
in.KV.Partition,
in.KV.Namespace,
),
RaftIndex: structs.RaftIndex{
ModifyIndex: in.KV.Index,
},
@ -182,6 +185,7 @@ func (s *HTTPHandlers) convertOps(resp http.ResponseWriter, req *http.Request) (
Node: structs.Node{
ID: types.NodeID(node.ID),
Node: node.Node,
Partition: node.Partition,
Address: node.Address,
Datacenter: node.Datacenter,
TaggedAddresses: node.TaggedAddresses,
@ -216,7 +220,10 @@ func (s *HTTPHandlers) convertOps(resp http.ResponseWriter, req *http.Request) (
Warning: svc.Weights.Warning,
},
EnableTagOverride: svc.EnableTagOverride,
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(svc.Namespace),
EnterpriseMeta: structs.NewEnterpriseMetaWithPartition(
svc.Partition,
svc.Namespace,
),
RaftIndex: structs.RaftIndex{
ModifyIndex: svc.ModifyIndex,
},
@ -274,7 +281,10 @@ func (s *HTTPHandlers) convertOps(resp http.ResponseWriter, req *http.Request) (
Timeout: timeout,
DeregisterCriticalServiceAfter: deregisterCriticalServiceAfter,
},
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(check.Namespace),
EnterpriseMeta: structs.NewEnterpriseMetaWithPartition(
check.Partition,
check.Namespace,
),
RaftIndex: structs.RaftIndex{
ModifyIndex: check.ModifyIndex,
},

View File

@ -603,6 +603,9 @@ func (s *HTTPHandlers) UIMetricsProxy(resp http.ResponseWriter, req *http.Reques
s.clearTokenFromHeaders(req)
var entMeta structs.EnterpriseMeta
if err := parseEntMetaPartition(req, &entMeta); err != nil {
return nil, err
}
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, nil)
if err != nil {
return nil, err
@ -611,9 +614,8 @@ func (s *HTTPHandlers) UIMetricsProxy(resp http.ResponseWriter, req *http.Reques
// This endpoint requires wildcard read on all services and all nodes.
//
// In enterprise it requires this _in all namespaces_ too.
wildMeta := structs.WildcardEnterpriseMetaInDefaultPartition()
var authzContext acl.AuthorizerContext
wildMeta.FillAuthzContext(&authzContext)
entMeta.WildcardEnterpriseMetaForPartition().FillAuthzContext(&authzContext)
if authz.NodeReadAll(&authzContext) != acl.Allow || authz.ServiceReadAll(&authzContext) != acl.Allow {
return nil, acl.ErrPermissionDenied

View File

@ -187,6 +187,7 @@ func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool {
}
// Scan for a match
// NOTE: this only works in the default partition and default namespace
services := a.State.Services(structs.DefaultEnterpriseMetaInDefaultPartition())
found := false
OUTER:

View File

@ -15,10 +15,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
)
func TestAPI_AgentSelf(t *testing.T) {
@ -793,6 +794,7 @@ func TestAPI_AgentService(t *testing.T) {
},
Meta: map[string]string{},
Namespace: defaultNamespace,
Partition: defaultPartition,
Datacenter: "dc1",
}
require.Equal(expect, got)

View File

@ -5,9 +5,10 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/serf/coordinate"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/sdk/testutil/retry"
)
func TestAPI_CoordinateDatacenters(t *testing.T) {
@ -85,8 +86,9 @@ func TestAPI_CoordinateUpdate(t *testing.T) {
newCoord := coordinate.NewCoordinate(coordinate.DefaultConfig())
newCoord.Height = 0.5
entry := &CoordinateEntry{
Node: node,
Coord: newCoord,
Node: node,
Partition: defaultPartition,
Coord: newCoord,
}
_, err = coord.Update(entry, nil)
if err != nil {

View File

@ -5,12 +5,16 @@ import (
"fmt"
"strings"
"github.com/hashicorp/consul/command/flags"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
"github.com/mitchellh/cli"
"github.com/hashicorp/consul/command/flags"
"github.com/hashicorp/consul/lib"
)
// TODO(partitions): how will this command work when asking for RTT between a
// partitioned client and a server in the default partition?
func New(ui cli.Ui) *cmd {
c := &cmd{UI: ui}
c.init()