From 097e1645e32502e988463b73a4f218b1057ce2d0 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Thu, 19 Aug 2021 15:09:42 -0500 Subject: [PATCH] agent: ensure that most agent behavior correctly respects partition configuration (#10880) --- agent/acl.go | 7 +- agent/agent.go | 32 +++-- agent/agent_endpoint.go | 102 ++++++++++++---- agent/agent_endpoint_oss.go | 13 ++ agent/agent_endpoint_test.go | 4 +- agent/agent_oss.go | 4 + agent/checks/alias.go | 2 +- agent/config/builder.go | 13 +- agent/config/runtime_oss.go | 3 +- agent/consul/acl.go | 5 + agent/consul/catalog_endpoint.go | 5 +- agent/consul/client_serf.go | 3 +- agent/consul/config_oss.go | 9 ++ agent/consul/coordinate_endpoint.go | 53 ++++++--- agent/consul/coordinate_endpoint_test.go | 5 - agent/consul/enterprise_server_oss.go | 2 +- agent/consul/filter.go | 1 + agent/consul/leader.go | 90 +++++++++----- agent/consul/leader_test.go | 20 +--- agent/consul/merge.go | 13 +- agent/consul/rtt.go | 12 +- agent/consul/segment_oss.go | 10 +- agent/consul/server_serf.go | 3 +- agent/consul/session_ttl.go | 2 + agent/consul/txn_endpoint_test.go | 1 - agent/coordinate_endpoint.go | 10 ++ agent/dns.go | 26 ++-- agent/http.go | 6 + agent/http_oss.go | 11 +- agent/local/state.go | 145 +++++++++++++++++++---- agent/proxycfg/connect_proxy.go | 6 +- agent/proxycfg/manager.go | 2 +- agent/proxycfg/manager_test.go | 4 +- agent/proxycfg/mesh_gateway.go | 6 +- agent/proxycfg/testing.go | 2 + agent/proxycfg/upstreams.go | 3 +- agent/rpc/subscribe/subscribe.go | 3 +- agent/rpcclient/health/view.go | 1 + agent/sidecar_service.go | 2 +- agent/structs/catalog.go | 5 - agent/txn_endpoint.go | 24 ++-- agent/ui_endpoint.go | 6 +- agent/user_event.go | 1 + api/agent_test.go | 6 +- api/coordinate_test.go | 8 +- command/rtt/rtt.go | 8 +- 46 files changed, 500 insertions(+), 199 deletions(-) create mode 100644 agent/agent_endpoint_oss.go create mode 100644 agent/consul/config_oss.go diff --git a/agent/acl.go b/agent/acl.go index 7e6a1455f7..f3600ba0b0 100644 --- a/agent/acl.go +++ b/agent/acl.go @@ -87,6 +87,8 @@ func (a *Agent) vetServiceUpdateWithAuthorizer(authz acl.Authorizer, serviceID s } func (a *Agent) vetCheckRegisterWithAuthorizer(authz acl.Authorizer, check *structs.HealthCheck) error { + // TODO(partitions) + var authzContext acl.AuthorizerContext check.FillAuthzContext(&authzContext) // Vet the check itself. @@ -147,7 +149,7 @@ func (a *Agent) filterMembers(token string, members *[]serf.Member) error { } var authzContext acl.AuthorizerContext - structs.DefaultEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext) + a.agentEnterpriseMeta().FillAuthzContext(&authzContext) // Filter out members based on the node policy. m := *members for i := 0; i < len(m); i++ { @@ -188,7 +190,8 @@ func (a *Agent) filterChecksWithAuthorizer(authz acl.Authorizer, checks *map[str continue } } else { - structs.DefaultEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext) + // TODO(partition): should this be a Default or Node flavored entmeta? + check.NodeEnterpriseMetaForPartition().FillAuthzContext(&authzContext) if authz.NodeRead(a.config.NodeName, &authzContext) == acl.Allow { continue } diff --git a/agent/agent.go b/agent/agent.go index b54af8b55f..5b9e4a8b89 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -434,6 +434,7 @@ func LocalConfig(cfg *config.RuntimeConfig) local.Config { DiscardCheckOutput: cfg.DiscardCheckOutput, NodeID: cfg.NodeID, NodeName: cfg.NodeName, + Partition: cfg.PartitionOrDefault(), TaggedAddresses: map[string]string{}, } for k, v := range cfg.TaggedAddresses { @@ -561,8 +562,9 @@ func (a *Agent) Start(ctx context.Context) error { State: a.State, Tokens: a.baseDeps.Tokens, Source: &structs.QuerySource{ - Datacenter: a.config.Datacenter, - Segment: a.config.SegmentName, + Datacenter: a.config.Datacenter, + Segment: a.config.SegmentName, + NodePartition: a.config.PartitionOrEmpty(), }, DNSConfig: proxycfg.DNSConfig{ Domain: a.config.DNSDomain, @@ -1529,11 +1531,13 @@ func (a *Agent) LocalMember() serf.Member { // LANMembers is used to retrieve the LAN members func (a *Agent) LANMembers() []serf.Member { + // TODO(partitions): filter this by the partition? return a.delegate.LANMembers() } // WANMembers is used to retrieve the WAN members func (a *Agent) WANMembers() []serf.Member { + // TODO(partitions): filter this by the partition by omitting wan results for now? if srv, ok := a.delegate.(*consul.Server); ok { return srv.WANMembers() } @@ -1646,11 +1650,12 @@ OUTER: for segment, coord := range cs { agentToken := a.tokens.AgentToken() req := structs.CoordinateUpdateRequest{ - Datacenter: a.config.Datacenter, - Node: a.config.NodeName, - Segment: segment, - Coord: coord, - WriteRequest: structs.WriteRequest{Token: agentToken}, + Datacenter: a.config.Datacenter, + Node: a.config.NodeName, + Segment: segment, + Coord: coord, + EnterpriseMeta: *a.agentEnterpriseMeta(), + WriteRequest: structs.WriteRequest{Token: agentToken}, } var reply struct{} // todo(kit) port all of these logger calls to hclog w/ loglevel configuration @@ -1674,7 +1679,7 @@ OUTER: // reapServicesInternal does a single pass, looking for services to reap. func (a *Agent) reapServicesInternal() { reaped := make(map[structs.ServiceID]bool) - for checkID, cs := range a.State.CriticalCheckStates(structs.WildcardEnterpriseMetaInDefaultPartition()) { + for checkID, cs := range a.State.AllCriticalCheckStates() { serviceID := cs.Check.CompoundServiceID() // There's nothing to do if there's no service. @@ -2004,7 +2009,7 @@ func (a *Agent) addServiceInternal(req addServiceInternalRequest) error { // Agent.Start does not have a snapshot, and we don't want to query // State.Checks each time. if req.checkStateSnapshot == nil { - req.checkStateSnapshot = a.State.Checks(structs.WildcardEnterpriseMetaInDefaultPartition()) + req.checkStateSnapshot = a.State.AllChecks() } // Create an associated health check @@ -2458,6 +2463,8 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType, // Need its config to know whether we should reroute checks to it var proxy *structs.NodeService if service != nil { + // NOTE: Both services must live in the same namespace and + // partition so this will correctly scope the results. for _, svc := range a.State.Services(&service.EnterpriseMeta) { if svc.Proxy.DestinationServiceID == service.ID { proxy = svc @@ -2719,6 +2726,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType, var rpcReq structs.NodeSpecificRequest rpcReq.Datacenter = a.config.Datacenter + rpcReq.EnterpriseMeta = *a.agentEnterpriseMeta() // The token to set is really important. The behavior below follows // the same behavior as anti-entropy: we use the user-specified token @@ -3297,7 +3305,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI // unloadServices will deregister all services. func (a *Agent) unloadServices() error { - for id := range a.State.Services(structs.WildcardEnterpriseMetaInDefaultPartition()) { + for id := range a.State.AllServices() { if err := a.removeServiceLocked(id, false); err != nil { return fmt.Errorf("Failed deregistering service '%s': %v", id, err) } @@ -3411,7 +3419,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig, snap map[structs.CheckID] // unloadChecks will deregister all checks known to the local agent. func (a *Agent) unloadChecks() error { - for id := range a.State.Checks(structs.WildcardEnterpriseMetaInDefaultPartition()) { + for id := range a.State.AllChecks() { if err := a.removeCheckLocked(id, false); err != nil { return fmt.Errorf("Failed deregistering check '%s': %s", id, err) } @@ -3423,7 +3431,7 @@ func (a *Agent) unloadChecks() error { // checks. This is done before we reload our checks, so that we can properly // restore into the same state. func (a *Agent) snapshotCheckState() map[structs.CheckID]*structs.HealthCheck { - return a.State.Checks(structs.WildcardEnterpriseMetaInDefaultPartition()) + return a.State.AllChecks() } // loadMetadata loads node metadata fields from the agent config and diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 216b8c8593..9f846484b9 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -82,6 +82,7 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i PrimaryDatacenter string NodeName string NodeID string + Partition string `json:",omitempty"` Revision string Server bool Version string @@ -90,6 +91,7 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i PrimaryDatacenter: s.agent.config.PrimaryDatacenter, NodeName: s.agent.config.NodeName, NodeID: string(s.agent.config.NodeID), + Partition: s.agent.config.PartitionOrEmpty(), Revision: s.agent.config.Revision, Server: s.agent.config.ServerMode, Version: s.agent.config.Version, @@ -305,6 +307,12 @@ func (s *HTTPHandlers) AgentServices(resp http.ResponseWriter, req *http.Request return nil, err } + if !s.validateRequestPartition(resp, &entMeta) { + return nil, nil + } + + // NOTE: we're explicitly fetching things in the requested partition and + // namespace here. services := s.agent.State.Services(&entMeta) if err := s.agent.filterServicesWithAuthorizer(authz, &services); err != nil { return nil, err @@ -368,6 +376,10 @@ func (s *HTTPHandlers) AgentService(resp http.ResponseWriter, req *http.Request) sid := structs.NewServiceID(id, &entMeta) + if !s.validateRequestPartition(resp, &entMeta) { + return nil, nil + } + dc := s.agent.config.Datacenter resultHash, service, err := s.agent.LocalBlockingQuery(false, hash, queryOpts.MaxQueryTime, @@ -400,6 +412,7 @@ func (s *HTTPHandlers) AgentService(resp http.ResponseWriter, req *http.Request) aSvc := buildAgentService(svc, dc) reply := &aSvc + // TODO(partitions): do we need to do anything here? rawHash, err := hashstructure.Hash(reply, nil) if err != nil { return "", nil, err @@ -432,6 +445,10 @@ func (s *HTTPHandlers) AgentChecks(resp http.ResponseWriter, req *http.Request) return nil, err } + if !s.validateRequestPartition(resp, &entMeta) { + return nil, nil + } + var filterExpression string s.parseFilter(req, &filterExpression) filter, err := bexpr.CreateFilter(filterExpression, nil, nil) @@ -439,6 +456,7 @@ func (s *HTTPHandlers) AgentChecks(resp http.ResponseWriter, req *http.Request) return nil, err } + // NOTE(partitions): this works because nodes exist in ONE partition checks := s.agent.State.Checks(&entMeta) if err := s.agent.filterChecksWithAuthorizer(authz, &checks); err != nil { return nil, err @@ -485,6 +503,8 @@ func (s *HTTPHandlers) AgentMembers(resp http.ResponseWriter, req *http.Request) } } + // TODO(partitions): likely partitions+segment integration will take care of this + var members []serf.Member if wan { members = s.agent.WANMembers() @@ -521,6 +541,7 @@ func (s *HTTPHandlers) AgentJoin(resp http.ResponseWriter, req *http.Request) (i wan := false if other := req.URL.Query().Get("wan"); other != "" { wan = true + // TODO(partitions) : block wan join } // Get the address @@ -616,6 +637,10 @@ func (s *HTTPHandlers) AgentRegisterCheck(resp http.ResponseWriter, req *http.Re return nil, err } + if !s.validateRequestPartition(resp, &args.EnterpriseMeta) { + return nil, nil + } + // Construct the health check. health := args.HealthCheck(s.agent.config.NodeName) @@ -674,6 +699,10 @@ func (s *HTTPHandlers) AgentDeregisterCheck(resp http.ResponseWriter, req *http. return nil, err } + if !s.validateRequestPartition(resp, &checkID.EnterpriseMeta) { + return nil, nil + } + if err := s.agent.RemoveCheck(checkID, true); err != nil { return nil, err } @@ -740,7 +769,7 @@ func (s *HTTPHandlers) AgentCheckUpdate(resp http.ResponseWriter, req *http.Requ return s.agentCheckUpdate(resp, req, checkID, update.Status, update.Output) } -func (s *HTTPHandlers) agentCheckUpdate(_resp http.ResponseWriter, req *http.Request, checkID types.CheckID, status string, output string) (interface{}, error) { +func (s *HTTPHandlers) agentCheckUpdate(resp http.ResponseWriter, req *http.Request, checkID types.CheckID, status string, output string) (interface{}, error) { cid := structs.NewCheckID(checkID, nil) // Get the provided token, if any, and vet against any ACL policies. @@ -762,6 +791,10 @@ func (s *HTTPHandlers) agentCheckUpdate(_resp http.ResponseWriter, req *http.Req return nil, err } + if !s.validateRequestPartition(resp, &cid.EnterpriseMeta) { + return nil, nil + } + if err := s.agent.updateTTLCheck(cid, status, output); err != nil { return nil, err } @@ -833,6 +866,10 @@ func (s *HTTPHandlers) AgentHealthServiceByID(resp http.ResponseWriter, req *htt return nil, err } + if !s.validateRequestPartition(resp, &entMeta) { + return nil, nil + } + sid := structs.NewServiceID(serviceID, &entMeta) dc := s.agent.config.Datacenter @@ -891,35 +928,38 @@ func (s *HTTPHandlers) AgentHealthServiceByName(resp http.ResponseWriter, req *h return nil, acl.ErrPermissionDenied } + if !s.validateRequestPartition(resp, &entMeta) { + return nil, nil + } + dc := s.agent.config.Datacenter code := http.StatusNotFound status := fmt.Sprintf("ServiceName %s Not Found", serviceName) - services := s.agent.State.Services(&entMeta) + + services := s.agent.State.ServicesByName(structs.NewServiceName(serviceName, &entMeta)) result := make([]api.AgentServiceChecksInfo, 0, 16) for _, service := range services { - if service.Service == serviceName { - sid := structs.NewServiceID(service.ID, &entMeta) + sid := structs.NewServiceID(service.ID, &entMeta) - scode, sstatus, healthChecks := agentHealthService(sid, s) - serviceInfo := buildAgentService(service, dc) - res := api.AgentServiceChecksInfo{ - AggregatedStatus: sstatus, - Checks: healthChecks, - Service: &serviceInfo, - } - result = append(result, res) - // When service is not found, we ignore it and keep existing HTTP status - if code == http.StatusNotFound { - code = scode - status = sstatus - } - // We take the worst of all statuses, so we keep iterating - // passing: 200 < warning: 429 < critical: 503 - if code < scode { - code = scode - status = sstatus - } + scode, sstatus, healthChecks := agentHealthService(sid, s) + serviceInfo := buildAgentService(service, dc) + res := api.AgentServiceChecksInfo{ + AggregatedStatus: sstatus, + Checks: healthChecks, + Service: &serviceInfo, + } + result = append(result, res) + // When service is not found, we ignore it and keep existing HTTP status + if code == http.StatusNotFound { + code = scode + status = sstatus + } + // We take the worst of all statuses, so we keep iterating + // passing: 200 < warning: 429 < critical: 503 + if code < scode { + code = scode + status = sstatus } } if returnTextPlain(req) { @@ -965,6 +1005,10 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http. return nil, err } + if !s.validateRequestPartition(resp, &args.EnterpriseMeta) { + return nil, nil + } + // Get the node service. ns := args.NodeService() if ns.Weights != nil { @@ -1104,6 +1148,10 @@ func (s *HTTPHandlers) AgentDeregisterService(resp http.ResponseWriter, req *htt return nil, err } + if !s.validateRequestPartition(resp, &sid.EnterpriseMeta) { + return nil, nil + } + if err := s.agent.RemoveService(sid); err != nil { return nil, err } @@ -1403,6 +1451,10 @@ func (s *HTTPHandlers) AgentConnectCALeafCert(resp http.ResponseWriter, req *htt args.MaxQueryTime = qOpts.MaxQueryTime args.Token = qOpts.Token + if !s.validateRequestPartition(resp, &args.EnterpriseMeta) { + return nil, nil + } + raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCALeafName, &args) if err != nil { return nil, err @@ -1442,6 +1494,10 @@ func (s *HTTPHandlers) AgentConnectAuthorize(resp http.ResponseWriter, req *http return nil, BadRequestError{fmt.Sprintf("Request decode failed: %v", err)} } + if !s.validateRequestPartition(resp, &authReq.EnterpriseMeta) { + return nil, nil + } + authz, reason, cacheMeta, err := s.agent.ConnectAuthorize(token, &authReq) if err != nil { return nil, err diff --git a/agent/agent_endpoint_oss.go b/agent/agent_endpoint_oss.go new file mode 100644 index 0000000000..1c4dd44285 --- /dev/null +++ b/agent/agent_endpoint_oss.go @@ -0,0 +1,13 @@ +// +build !consulent + +package agent + +import ( + "net/http" + + "github.com/hashicorp/consul/agent/structs" +) + +func (s *HTTPHandlers) validateRequestPartition(_ http.ResponseWriter, _ *structs.EnterpriseMeta) bool { + return true +} diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 0febf1405f..95a829de29 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -4180,7 +4180,7 @@ func testAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T, extraHCL s resp.Body.String()) // Sanity the target service registration - svcs := a.State.Services(nil) + svcs := a.State.AllServices() // Parse the expected definition into a ServiceDefinition var sd structs.ServiceDefinition @@ -4229,7 +4229,7 @@ func testAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T, extraHCL s require.NoError(err) require.Nil(obj) - svcs := a.State.Services(nil) + svcs := a.State.AllServices() _, ok = svcs[structs.NewServiceID(tt.wantNS.ID, nil)] if tt.wantSidecarIDLeftAfterDereg { require.True(ok, "removed non-sidecar service at "+tt.wantNS.ID) diff --git a/agent/agent_oss.go b/agent/agent_oss.go index 4be00ed806..88496e28be 100644 --- a/agent/agent_oss.go +++ b/agent/agent_oss.go @@ -50,3 +50,7 @@ func (a *Agent) stopLicenseManager() {} func (a *Agent) enterpriseStats() map[string]map[string]string { return nil } + +func (a *Agent) agentEnterpriseMeta() *structs.EnterpriseMeta { + return structs.NodeEnterpriseMetaInDefaultPartition() +} diff --git a/agent/checks/alias.go b/agent/checks/alias.go index b2fb9c0495..78c4a32b4c 100644 --- a/agent/checks/alias.go +++ b/agent/checks/alias.go @@ -108,7 +108,7 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) { } updateStatus := func() { - checks := c.Notify.Checks(structs.WildcardEnterpriseMetaInDefaultPartition()) + checks := c.Notify.Checks(c.WildcardEnterpriseMetaForPartition()) checksList := make([]*structs.HealthCheck, 0, len(checks)) for _, chk := range checks { checksList = append(checksList, chk) diff --git a/agent/config/builder.go b/agent/config/builder.go index 849c54b0dc..daae2c2f06 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -688,7 +688,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) { } autoEncryptAllowTLS := boolVal(c.AutoEncrypt.AllowTLS) - autoConfig := b.autoConfigVal(c.AutoConfig) + autoConfig := b.autoConfigVal(c.AutoConfig, stringVal(c.Partition)) if autoEncryptAllowTLS || autoConfig.Enabled { connectEnabled = true } @@ -2231,7 +2231,7 @@ func (b *builder) makeAddrs(pri []net.Addr, sec []*net.IPAddr, port int) []net.A return x } -func (b *builder) autoConfigVal(raw AutoConfigRaw) AutoConfig { +func (b *builder) autoConfigVal(raw AutoConfigRaw, agentPartition string) AutoConfig { var val AutoConfig val.Enabled = boolValWithDefault(raw.Enabled, false) @@ -2259,12 +2259,12 @@ func (b *builder) autoConfigVal(raw AutoConfigRaw) AutoConfig { val.IPSANs = append(val.IPSANs, ip) } - val.Authorizer = b.autoConfigAuthorizerVal(raw.Authorization) + val.Authorizer = b.autoConfigAuthorizerVal(raw.Authorization, agentPartition) return val } -func (b *builder) autoConfigAuthorizerVal(raw AutoConfigAuthorizationRaw) AutoConfigAuthorizer { +func (b *builder) autoConfigAuthorizerVal(raw AutoConfigAuthorizationRaw, agentPartition string) AutoConfigAuthorizer { // Our config file syntax wraps the static authorizer configuration in a "static" stanza. However // internally we do not support multiple configured authorization types so the RuntimeConfig just // inlines the static one. While we can and probably should extend the authorization types in the @@ -2272,13 +2272,16 @@ func (b *builder) autoConfigAuthorizerVal(raw AutoConfigAuthorizationRaw) AutoCo // needed right now so the configuration types will remain simplistic until they need to be otherwise. var val AutoConfigAuthorizer + entMeta := structs.DefaultEnterpriseMetaInPartition(agentPartition) + entMeta.Normalize() + val.Enabled = boolValWithDefault(raw.Enabled, false) val.ClaimAssertions = raw.Static.ClaimAssertions val.AllowReuse = boolValWithDefault(raw.Static.AllowReuse, false) val.AuthMethod = structs.ACLAuthMethod{ Name: "Auto Config Authorizer", Type: "jwt", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + EnterpriseMeta: *entMeta, Config: map[string]interface{}{ "JWTSupportedAlgs": raw.Static.JWTSupportedAlgs, "BoundAudiences": raw.Static.BoundAudiences, diff --git a/agent/config/runtime_oss.go b/agent/config/runtime_oss.go index a83946f106..fcc9135dc2 100644 --- a/agent/config/runtime_oss.go +++ b/agent/config/runtime_oss.go @@ -4,4 +4,5 @@ package config type EnterpriseRuntimeConfig struct{} -func (c *RuntimeConfig) PartitionOrEmpty() string { return "" } +func (c *RuntimeConfig) PartitionOrEmpty() string { return "" } +func (c *RuntimeConfig) PartitionOrDefault() string { return "" } diff --git a/agent/consul/acl.go b/agent/consul/acl.go index 461c05b82e..910e711a07 100644 --- a/agent/consul/acl.go +++ b/agent/consul/acl.go @@ -1457,6 +1457,7 @@ func (f *aclFilter) filterNodeServices(services **structs.NodeServices) { } var authzContext acl.AuthorizerContext + // TODO(partitions): put partition into this wildcard? structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext) if !f.allowNode((*services).Node.Node, &authzContext) { *services = nil @@ -1481,6 +1482,7 @@ func (f *aclFilter) filterNodeServiceList(services **structs.NodeServiceList) { } var authzContext acl.AuthorizerContext + // TODO(partitions): put partition into this wildcard? structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext) if !f.allowNode((*services).Node.Node, &authzContext) { *services = nil @@ -1578,6 +1580,7 @@ func (f *aclFilter) filterSessions(sessions *structs.Sessions) { func (f *aclFilter) filterCoordinates(coords *structs.Coordinates) { c := *coords var authzContext acl.AuthorizerContext + // TODO(partitions): put partition into this wildcard? structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext) for i := 0; i < len(c); i++ { @@ -1619,6 +1622,7 @@ func (f *aclFilter) filterNodeDump(dump *structs.NodeDump) { info := nd[i] // Filter nodes + // TODO(partitions): put partition into this wildcard? structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext) if node := info.Node; !f.allowNode(node, &authzContext) { f.logger.Debug("dropping node from result due to ACLs", "node", node) @@ -1687,6 +1691,7 @@ func (f *aclFilter) filterNodes(nodes *structs.Nodes) { n := *nodes var authzContext acl.AuthorizerContext + // TODO(partitions): put partition into this wildcard? structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext) for i := 0; i < len(n); i++ { diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index c3f9b9daa8..a48a8133aa 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -474,11 +474,10 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { var err error - // TODO(partitions) if len(args.NodeMetaFilters) > 0 { - reply.Index, reply.Nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters, nil) + reply.Index, reply.Nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta) } else { - reply.Index, reply.Nodes, err = state.Nodes(ws, nil) + reply.Index, reply.Nodes, err = state.Nodes(ws, &args.EnterpriseMeta) } if err != nil { return err diff --git a/agent/consul/client_serf.go b/agent/consul/client_serf.go index eb10498b95..f4692ef528 100644 --- a/agent/consul/client_serf.go +++ b/agent/consul/client_serf.go @@ -61,6 +61,7 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( nodeID: c.config.NodeID, nodeName: c.config.NodeName, segment: c.config.Segment, + server: false, } conf.SnapshotPath = filepath.Join(c.config.DataDir, path) @@ -68,7 +69,7 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( return nil, err } - addEnterpriseSerfTags(conf.Tags) + addEnterpriseSerfTags(conf.Tags, c.config.agentEnterpriseMeta()) conf.ReconnectTimeoutOverride = libserf.NewReconnectOverride(c.logger) diff --git a/agent/consul/config_oss.go b/agent/consul/config_oss.go new file mode 100644 index 0000000000..b2e295724b --- /dev/null +++ b/agent/consul/config_oss.go @@ -0,0 +1,9 @@ +// +build !consulent + +package consul + +import "github.com/hashicorp/consul/agent/structs" + +func (c *Config) agentEnterpriseMeta() *structs.EnterpriseMeta { + return structs.NodeEnterpriseMetaInDefaultPartition() +} diff --git a/agent/consul/coordinate_endpoint.go b/agent/consul/coordinate_endpoint.go index 503a8a6f87..ae3e169245 100644 --- a/agent/consul/coordinate_endpoint.go +++ b/agent/consul/coordinate_endpoint.go @@ -86,10 +86,13 @@ func (c *Coordinate) batchApplyUpdates() error { break } + update.EnterpriseMeta.Normalize() + updates[i] = &structs.Coordinate{ - Node: update.Node, - Segment: update.Segment, - Coord: update.Coord, + Node: update.Node, + Segment: update.Segment, + Coord: update.Coord, + Partition: update.PartitionOrEmpty(), } i++ } @@ -138,12 +141,17 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct } // Fetch the ACL token, if any, and enforce the node policy if enabled. - authz, err := c.srv.ResolveToken(args.Token) + authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil) if err != nil { return err } + + if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { + return err + } + var authzContext acl.AuthorizerContext - structs.DefaultEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext) + args.DefaultEnterpriseMetaForPartition().FillAuthzContext(&authzContext) if authz.NodeWrite(args.Node, &authzContext) != acl.Allow { return acl.ErrPermissionDenied } @@ -166,6 +174,8 @@ func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.Datacenter return err } + // TODO(partitions): + var out []structs.DatacenterMap // Strip the datacenter suffixes from all the node names. @@ -194,11 +204,19 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I return err } + _, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil) + if err != nil { + return err + } + + if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { + return err + } + return c.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - // TODO(partitions) - index, coords, err := state.Coordinates(ws, nil) + index, coords, err := state.Coordinates(ws, &args.EnterpriseMeta) if err != nil { return err } @@ -220,21 +238,27 @@ func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.Inde // Fetch the ACL token, if any, and enforce the node policy if enabled. - authz, err := c.srv.ResolveToken(args.Token) + authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil) if err != nil { return err } + + if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { + return err + } + var authzContext acl.AuthorizerContext - structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext) + args.WildcardEnterpriseMetaForPartition().FillAuthzContext(&authzContext) if authz.NodeRead(args.Node, &authzContext) != acl.Allow { return acl.ErrPermissionDenied } + // TODO(partitions): do we have to add EnterpriseMeta to the reply like in Catalog.ListServices? + return c.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - // TODO(partitions) - index, nodeCoords, err := state.Coordinate(ws, args.Node, nil) + index, nodeCoords, err := state.Coordinate(ws, args.Node, &args.EnterpriseMeta) if err != nil { return err } @@ -242,9 +266,10 @@ func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.Inde var coords structs.Coordinates for segment, coord := range nodeCoords { coords = append(coords, &structs.Coordinate{ - Node: args.Node, - Segment: segment, - Coord: coord, + Node: args.Node, + Segment: segment, + Partition: args.PartitionOrEmpty(), + Coord: coord, }) } reply.Index, reply.Coordinates = index, coords diff --git a/agent/consul/coordinate_endpoint_test.go b/agent/consul/coordinate_endpoint_test.go index 5741450f7c..646c85b19e 100644 --- a/agent/consul/coordinate_endpoint_test.go +++ b/agent/consul/coordinate_endpoint_test.go @@ -84,14 +84,12 @@ func TestCoordinate_Update(t *testing.T) { // Make sure the updates did not yet apply because the update period // hasn't expired. state := s1.fsm.State() - // TODO(partitions) _, c, err := state.Coordinate(nil, "node1", nil) if err != nil { t.Fatalf("err: %v", err) } require.Equal(t, lib.CoordinateSet{}, c) - // TODO(partitions) _, c, err = state.Coordinate(nil, "node2", nil) if err != nil { t.Fatalf("err: %v", err) @@ -107,7 +105,6 @@ func TestCoordinate_Update(t *testing.T) { // Wait a while and the updates should get picked up. time.Sleep(3 * s1.config.CoordinateUpdatePeriod) - // TODO(partitions) _, c, err = state.Coordinate(nil, "node1", nil) if err != nil { t.Fatalf("err: %v", err) @@ -117,7 +114,6 @@ func TestCoordinate_Update(t *testing.T) { } require.Equal(t, expected, c) - // TODO(partitions) _, c, err = state.Coordinate(nil, "node2", nil) if err != nil { t.Fatalf("err: %v", err) @@ -157,7 +153,6 @@ func TestCoordinate_Update(t *testing.T) { time.Sleep(3 * s1.config.CoordinateUpdatePeriod) numDropped := 0 for i := 0; i < spamLen; i++ { - // TODO(partitions) _, c, err = state.Coordinate(nil, fmt.Sprintf("bogusnode%d", i), nil) if err != nil { t.Fatalf("err: %v", err) diff --git a/agent/consul/enterprise_server_oss.go b/agent/consul/enterprise_server_oss.go index 90bf131303..12ede7bffd 100644 --- a/agent/consul/enterprise_server_oss.go +++ b/agent/consul/enterprise_server_oss.go @@ -71,7 +71,7 @@ func (s *Server) validateEnterpriseIntentionNamespace(ns string, _ bool) error { return errors.New("Namespaces is a Consul Enterprise feature") } -func addEnterpriseSerfTags(_ map[string]string) { +func addEnterpriseSerfTags(_ map[string]string, _ *structs.EnterpriseMeta) { // do nothing } diff --git a/agent/consul/filter.go b/agent/consul/filter.go index f650059e9a..a85ce65eed 100644 --- a/agent/consul/filter.go +++ b/agent/consul/filter.go @@ -47,6 +47,7 @@ func (t *txnResultsFilter) Filter(i int) bool { result.KV.EnterpriseMeta.FillAuthzContext(&authzContext) return t.authorizer.KeyRead(result.KV.Key, &authzContext) != acl.Allow case result.Node != nil: + // TODO(partitions): put partition into this wildcard? structs.WildcardEnterpriseMetaInDefaultPartition().FillAuthzContext(&authzContext) return t.authorizer.NodeRead(result.Node.Node, &authzContext) != acl.Allow case result.Service != nil: diff --git a/agent/consul/leader.go b/agent/consul/leader.go index d5239008e9..996abea621 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -532,6 +532,8 @@ func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error { s.logger.Info("initializing acls") + // TODO(partitions): initialize acls in all of the partitions? + // Create/Upgrade the builtin global-management policy _, policy, err := s.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, structs.DefaultEnterpriseMetaInDefaultPartition()) if err != nil { @@ -1110,9 +1112,13 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error { // reconcileReaped is used to reconcile nodes that have failed and been reaped // from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered. // We generate a "reap" event to cause the node to be cleaned up. -func (s *Server) reconcileReaped(known map[string]struct{}) error { +func (s *Server) reconcileReaped(known map[string]struct{}, nodeEntMeta *structs.EnterpriseMeta) error { + if nodeEntMeta == nil { + nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + state := s.fsm.State() - _, checks, err := state.ChecksInState(nil, api.HealthAny, structs.DefaultEnterpriseMetaInDefaultPartition()) + _, checks, err := state.ChecksInState(nil, api.HealthAny, nodeEntMeta) if err != nil { return err } @@ -1128,7 +1134,7 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error { } // Get the node services, look for ConsulServiceID - _, services, err := state.NodeServices(nil, check.Node, structs.DefaultEnterpriseMetaInDefaultPartition()) + _, services, err := state.NodeServices(nil, check.Node, nodeEntMeta) if err != nil { return err } @@ -1139,8 +1145,7 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error { CHECKS: for _, service := range services.Services { if service.ID == structs.ConsulServiceID { - // TODO(partitions) - _, node, err := state.GetNode(check.Node, nil) + _, node, err := state.GetNode(check.Node, nodeEntMeta) if err != nil { s.logger.Error("Unable to look up node with name", "name", check.Node, "error", err) continue CHECKS @@ -1165,6 +1170,7 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error { "role": "node", }, } + addEnterpriseSerfTags(member.Tags, nodeEntMeta) // Create the appropriate tags if this was a server node if serverPort > 0 { @@ -1175,7 +1181,7 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error { } // Attempt to reap this member - if err := s.handleReapMember(member); err != nil { + if err := s.handleReapMember(member, nodeEntMeta); err != nil { return err } } @@ -1187,23 +1193,28 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error { func (s *Server) reconcileMember(member serf.Member) error { // Check if this is a member we should handle if !s.shouldHandleMember(member) { + // TODO(partition): log the partition name s.logger.Warn("skipping reconcile of node", "member", member) return nil } defer metrics.MeasureSince([]string{"leader", "reconcileMember"}, time.Now()) + + nodeEntMeta := getSerfMemberEnterpriseMeta(member) + var err error switch member.Status { case serf.StatusAlive: - err = s.handleAliveMember(member) + err = s.handleAliveMember(member, nodeEntMeta) case serf.StatusFailed: - err = s.handleFailedMember(member) + err = s.handleFailedMember(member, nodeEntMeta) case serf.StatusLeft: - err = s.handleLeftMember(member) + err = s.handleLeftMember(member, nodeEntMeta) case StatusReap: - err = s.handleReapMember(member) + err = s.handleReapMember(member, nodeEntMeta) } if err != nil { s.logger.Error("failed to reconcile member", + // TODO(partition): log the partition name "member", member, "error", err, ) @@ -1231,7 +1242,11 @@ func (s *Server) shouldHandleMember(member serf.Member) bool { // handleAliveMember is used to ensure the node // is registered, with a passing health check. -func (s *Server) handleAliveMember(member serf.Member) error { +func (s *Server) handleAliveMember(member serf.Member, nodeEntMeta *structs.EnterpriseMeta) error { + if nodeEntMeta == nil { + nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + // Register consul service if a server var service *structs.NodeService if valid, parts := metadata.IsConsulServer(member); valid { @@ -1243,6 +1258,7 @@ func (s *Server) handleAliveMember(member serf.Member) error { Passing: 1, Warning: 1, }, + EnterpriseMeta: *nodeEntMeta, Meta: map[string]string{ // DEPRECATED - remove nonvoter in favor of read_replica in a future version of consul "non_voter": strconv.FormatBool(member.Tags["nonvoter"] == "1"), @@ -1263,8 +1279,7 @@ func (s *Server) handleAliveMember(member serf.Member) error { // Check if the node exists state := s.fsm.State() - // TODO(partitions) - _, node, err := state.GetNode(member.Name, nil) + _, node, err := state.GetNode(member.Name, nodeEntMeta) if err != nil { return err } @@ -1272,7 +1287,7 @@ func (s *Server) handleAliveMember(member serf.Member) error { // Check if the associated service is available if service != nil { match := false - _, services, err := state.NodeServices(nil, member.Name, structs.DefaultEnterpriseMetaInDefaultPartition()) + _, services, err := state.NodeServices(nil, member.Name, nodeEntMeta) if err != nil { return err } @@ -1290,7 +1305,7 @@ func (s *Server) handleAliveMember(member serf.Member) error { } // Check if the serfCheck is in the passing state - _, checks, err := state.NodeChecks(nil, member.Name, structs.DefaultEnterpriseMetaInDefaultPartition()) + _, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta) if err != nil { return err } @@ -1317,6 +1332,7 @@ AFTER_CHECK: Status: api.HealthPassing, Output: structs.SerfCheckAliveOutput, }, + EnterpriseMeta: *nodeEntMeta, } if node != nil { req.TaggedAddresses = node.TaggedAddresses @@ -1329,11 +1345,14 @@ AFTER_CHECK: // handleFailedMember is used to mark the node's status // as being critical, along with all checks as unknown. -func (s *Server) handleFailedMember(member serf.Member) error { +func (s *Server) handleFailedMember(member serf.Member, nodeEntMeta *structs.EnterpriseMeta) error { + if nodeEntMeta == nil { + nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + // Check if the node exists state := s.fsm.State() - // TODO(partitions) - _, node, err := state.GetNode(member.Name, nil) + _, node, err := state.GetNode(member.Name, nodeEntMeta) if err != nil { return err } @@ -1343,9 +1362,11 @@ func (s *Server) handleFailedMember(member serf.Member) error { return nil } + // TODO(partitions): get the ent meta by parsing serf tags + if node.Address == member.Addr.String() { // Check if the serfCheck is in the critical state - _, checks, err := state.NodeChecks(nil, member.Name, structs.DefaultEnterpriseMetaInDefaultPartition()) + _, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta) if err != nil { return err } @@ -1359,10 +1380,11 @@ func (s *Server) handleFailedMember(member serf.Member) error { // Register with the catalog req := structs.RegisterRequest{ - Datacenter: s.config.Datacenter, - Node: member.Name, - ID: types.NodeID(member.Tags["id"]), - Address: member.Addr.String(), + Datacenter: s.config.Datacenter, + Node: member.Name, + EnterpriseMeta: *nodeEntMeta, + ID: types.NodeID(member.Tags["id"]), + Address: member.Addr.String(), Check: &structs.HealthCheck{ Node: member.Name, CheckID: structs.SerfCheckID, @@ -1381,18 +1403,22 @@ func (s *Server) handleFailedMember(member serf.Member) error { // handleLeftMember is used to handle members that gracefully // left. They are deregistered if necessary. -func (s *Server) handleLeftMember(member serf.Member) error { - return s.handleDeregisterMember("left", member) +func (s *Server) handleLeftMember(member serf.Member, nodeEntMeta *structs.EnterpriseMeta) error { + return s.handleDeregisterMember("left", member, nodeEntMeta) } // handleReapMember is used to handle members that have been // reaped after a prolonged failure. They are deregistered. -func (s *Server) handleReapMember(member serf.Member) error { - return s.handleDeregisterMember("reaped", member) +func (s *Server) handleReapMember(member serf.Member, nodeEntMeta *structs.EnterpriseMeta) error { + return s.handleDeregisterMember("reaped", member, nodeEntMeta) } // handleDeregisterMember is used to deregister a member of a given reason -func (s *Server) handleDeregisterMember(reason string, member serf.Member) error { +func (s *Server) handleDeregisterMember(reason string, member serf.Member, nodeEntMeta *structs.EnterpriseMeta) error { + if nodeEntMeta == nil { + nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + // Do not deregister ourself. This can only happen if the current leader // is leaving. Instead, we should allow a follower to take-over and // deregister us later. @@ -1410,8 +1436,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error // Check if the node does not exist state := s.fsm.State() - // TODO(partitions) - _, node, err := state.GetNode(member.Name, nil) + _, node, err := state.GetNode(member.Name, nodeEntMeta) if err != nil { return err } @@ -1422,8 +1447,9 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error // Deregister the node s.logger.Info("deregistering member", "member", member.Name, "reason", reason) req := structs.DeregisterRequest{ - Datacenter: s.config.Datacenter, - Node: member.Name, + Datacenter: s.config.Datacenter, + Node: member.Name, + EnterpriseMeta: *nodeEntMeta, } _, err = s.raftApply(structs.DeregisterRequestType, &req) return err diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 7463b794df..9089815712 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -49,7 +49,6 @@ func TestLeader_RegisterMember(t *testing.T) { // Client should be registered state := s1.fsm.State() retry.Run(t, func(r *retry.R) { - // TODO(partitions) _, node, err := state.GetNode(c1.config.NodeName, nil) if err != nil { r.Fatalf("err: %v", err) @@ -79,7 +78,6 @@ func TestLeader_RegisterMember(t *testing.T) { // Server should be registered retry.Run(t, func(r *retry.R) { - // TODO(partitions) _, node, err := state.GetNode(s1.config.NodeName, nil) if err != nil { r.Fatalf("err: %v", err) @@ -129,7 +127,6 @@ func TestLeader_FailedMember(t *testing.T) { // Should be registered state := s1.fsm.State() retry.Run(t, func(r *retry.R) { - // TODO(partitions) _, node, err := state.GetNode(c1.config.NodeName, nil) if err != nil { r.Fatalf("err: %v", err) @@ -191,7 +188,6 @@ func TestLeader_LeftMember(t *testing.T) { // Should be registered retry.Run(t, func(r *retry.R) { - // TODO(partitions) _, node, err := state.GetNode(c1.config.NodeName, nil) if err != nil { r.Fatalf("err: %v", err) @@ -207,7 +203,6 @@ func TestLeader_LeftMember(t *testing.T) { // Should be deregistered retry.Run(t, func(r *retry.R) { - // TODO(partitions) _, node, err := state.GetNode(c1.config.NodeName, nil) if err != nil { r.Fatalf("err: %v", err) @@ -243,7 +238,6 @@ func TestLeader_ReapMember(t *testing.T) { // Should be registered retry.Run(t, func(r *retry.R) { - // TODO(partitions) _, node, err := state.GetNode(c1.config.NodeName, nil) if err != nil { r.Fatalf("err: %v", err) @@ -269,7 +263,6 @@ func TestLeader_ReapMember(t *testing.T) { // anti-entropy will put it back. reaped := false for start := time.Now(); time.Since(start) < 5*time.Second; { - // TODO(partitions) _, node, err := state.GetNode(c1.config.NodeName, nil) if err != nil { t.Fatalf("err: %v", err) @@ -367,7 +360,7 @@ func TestLeader_CheckServersMeta(t *testing.T) { member.Tags["nonvoter"] = "1" member.Tags["read_replica"] = "1" member.Tags["build"] = versionToExpect - err := s1.handleAliveMember(member) + err := s1.handleAliveMember(member, nil) if err != nil { r.Fatalf("Unexpected error :%v", err) } @@ -439,7 +432,6 @@ func TestLeader_ReapServer(t *testing.T) { // s3 should be registered retry.Run(t, func(r *retry.R) { - // TODO(partitions) _, node, err := state.GetNode(s3.config.NodeName, nil) if err != nil { r.Fatalf("err: %v", err) @@ -454,14 +446,13 @@ func TestLeader_ReapServer(t *testing.T) { knownMembers[s1.config.NodeName] = struct{}{} knownMembers[s2.config.NodeName] = struct{}{} - err := s1.reconcileReaped(knownMembers) + err := s1.reconcileReaped(knownMembers, nil) if err != nil { t.Fatalf("Unexpected error :%v", err) } // s3 should be deregistered retry.Run(t, func(r *retry.R) { - // TODO(partitions) _, node, err := state.GetNode(s3.config.NodeName, nil) if err != nil { r.Fatalf("err: %v", err) @@ -517,7 +508,6 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) { // Node should be gone state := s1.fsm.State() - // TODO(partitions) _, node, err := state.GetNode("no-longer-around", nil) if err != nil { t.Fatalf("err: %v", err) @@ -551,7 +541,6 @@ func TestLeader_Reconcile(t *testing.T) { // Should not be registered state := s1.fsm.State() - // TODO(partitions) _, node, err := state.GetNode(c1.config.NodeName, nil) if err != nil { t.Fatalf("err: %v", err) @@ -562,7 +551,6 @@ func TestLeader_Reconcile(t *testing.T) { // Should be registered retry.Run(t, func(r *retry.R) { - // TODO(partitions) _, node, err := state.GetNode(c1.config.NodeName, nil) if err != nil { r.Fatalf("err: %v", err) @@ -595,7 +583,6 @@ func TestLeader_Reconcile_Races(t *testing.T) { state := s1.fsm.State() var nodeAddr string retry.Run(t, func(r *retry.R) { - // TODO(partitions) _, node, err := state.GetNode(c1.config.NodeName, nil) if err != nil { r.Fatalf("err: %v", err) @@ -632,7 +619,6 @@ func TestLeader_Reconcile_Races(t *testing.T) { if err := s1.reconcile(); err != nil { t.Fatalf("err: %v", err) } - // TODO(partitions) _, node, err := state.GetNode(c1.config.NodeName, nil) if err != nil { t.Fatalf("err: %v", err) @@ -657,7 +643,6 @@ func TestLeader_Reconcile_Races(t *testing.T) { }) // Make sure the metadata didn't get clobbered. - // TODO(partitions) _, node, err = state.GetNode(c1.config.NodeName, nil) if err != nil { t.Fatalf("err: %v", err) @@ -773,7 +758,6 @@ func TestLeader_LeftLeader(t *testing.T) { // Verify the old leader is deregistered state := remain.fsm.State() retry.Run(t, func(r *retry.R) { - // TODO(partitions) _, node, err := state.GetNode(leader.config.NodeName, nil) if err != nil { r.Fatalf("err: %v", err) diff --git a/agent/consul/merge.go b/agent/consul/merge.go index 006e41355a..16bc55ddc8 100644 --- a/agent/consul/merge.go +++ b/agent/consul/merge.go @@ -3,10 +3,11 @@ package consul import ( "fmt" - "github.com/hashicorp/consul/agent/metadata" - "github.com/hashicorp/consul/types" "github.com/hashicorp/go-version" "github.com/hashicorp/serf/serf" + + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/types" ) // lanMergeDelegate is used to handle a cluster merge on the LAN gossip @@ -17,6 +18,14 @@ type lanMergeDelegate struct { nodeID types.NodeID nodeName string segment string + + // TODO(partitions): use server and partition to reject gossip messages + // from nodes in the wrong partition depending upon the role the node is + // playing. For example servers will always be in the default partition, + // but all clients in all partitions should be aware of the servers so that + // general RPC routing works. + server bool + partition string } // uniqueIDMinVersion is the lowest version where we insist that nodes diff --git a/agent/consul/rtt.go b/agent/consul/rtt.go index 9a77802513..caf8116ed0 100644 --- a/agent/consul/rtt.go +++ b/agent/consul/rtt.go @@ -22,8 +22,7 @@ func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort. state := s.fsm.State() vec := make([]float64, len(nodes)) for i, node := range nodes { - // TODO(partitions) - _, other, err := state.Coordinate(nil, node.Node, nil) + _, other, err := state.Coordinate(nil, node.Node, node.GetEnterpriseMeta()) if err != nil { return nil, err } @@ -63,8 +62,7 @@ func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.Servic state := s.fsm.State() vec := make([]float64, len(nodes)) for i, node := range nodes { - // TODO(partitions) - _, other, err := state.Coordinate(nil, node.Node, nil) + _, other, err := state.Coordinate(nil, node.Node, &node.EnterpriseMeta) if err != nil { return nil, err } @@ -104,8 +102,7 @@ func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.Healt state := s.fsm.State() vec := make([]float64, len(checks)) for i, check := range checks { - // TODO(partitions) - _, other, err := state.Coordinate(nil, check.Node, nil) + _, other, err := state.Coordinate(nil, check.Node, &check.EnterpriseMeta) if err != nil { return nil, err } @@ -145,8 +142,7 @@ func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.C state := s.fsm.State() vec := make([]float64, len(nodes)) for i, node := range nodes { - // TODO(partitions) - _, other, err := state.Coordinate(nil, node.Node.Node, nil) + _, other, err := state.Coordinate(nil, node.Node.Node, node.Node.GetEnterpriseMeta()) if err != nil { return nil, err } diff --git a/agent/consul/segment_oss.go b/agent/consul/segment_oss.go index 690132c347..8e8936a3c5 100644 --- a/agent/consul/segment_oss.go +++ b/agent/consul/segment_oss.go @@ -8,8 +8,9 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/serf/serf" + + "github.com/hashicorp/consul/agent/structs" ) var SegmentOSSSummaries = []prometheus.SummaryDefinition{ @@ -62,12 +63,17 @@ func (s *Server) setupSegments(config *Config, port int, rpcListeners map[string func (s *Server) floodSegments(config *Config) { } +func getSerfMemberEnterpriseMeta(member serf.Member) *structs.EnterpriseMeta { + return structs.NodeEnterpriseMetaInDefaultPartition() +} + // reconcile is used to reconcile the differences between Serf membership and // what is reflected in our strongly consistent store. Mainly we need to ensure // all live nodes are registered, all failed nodes are marked as such, and all // left nodes are deregistered. func (s *Server) reconcile() (err error) { defer metrics.MeasureSince([]string{"leader", "reconcile"}, time.Now()) + members := s.serfLAN.Members() knownMembers := make(map[string]struct{}) for _, member := range members { @@ -79,5 +85,5 @@ func (s *Server) reconcile() (err error) { // Reconcile any members that have been reaped while we were not the // leader. - return s.reconcileReaped(knownMembers) + return s.reconcileReaped(knownMembers, nil) } diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index f26c843028..d0c698daa5 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -117,6 +117,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w nodeID: s.config.NodeID, nodeName: s.config.NodeName, segment: segment, + server: true, } } @@ -175,7 +176,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w conf.ReconnectTimeoutOverride = libserf.NewReconnectOverride(s.logger) - addEnterpriseSerfTags(conf.Tags) + addEnterpriseSerfTags(conf.Tags, s.config.agentEnterpriseMeta()) if s.config.OverrideInitialSerfTags != nil { s.config.OverrideInitialSerfTags(conf.Tags) diff --git a/agent/consul/session_ttl.go b/agent/consul/session_ttl.go index 497fc7b540..426179d96a 100644 --- a/agent/consul/session_ttl.go +++ b/agent/consul/session_ttl.go @@ -6,6 +6,7 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/consul/agent/structs" ) @@ -46,6 +47,7 @@ func (s *Server) initializeSessionTimers() error { // Scan all sessions and reset their timer state := s.fsm.State() + // TODO(partitions): track all session timers in all partitions _, sessions, err := state.SessionList(nil, structs.WildcardEnterpriseMetaInDefaultPartition()) if err != nil { return err diff --git a/agent/consul/txn_endpoint_test.go b/agent/consul/txn_endpoint_test.go index 53f9b36ded..4abb691402 100644 --- a/agent/consul/txn_endpoint_test.go +++ b/agent/consul/txn_endpoint_test.go @@ -234,7 +234,6 @@ func TestTxn_Apply(t *testing.T) { t.Fatalf("bad: %v", d) } - // TODO(partitions) _, n, err := state.GetNode("foo", nil) if err != nil { t.Fatalf("err: %v", err) diff --git a/agent/coordinate_endpoint.go b/agent/coordinate_endpoint.go index 596f63ae06..08f92f7d9a 100644 --- a/agent/coordinate_endpoint.go +++ b/agent/coordinate_endpoint.go @@ -82,6 +82,9 @@ func (s *HTTPHandlers) CoordinateNodes(resp http.ResponseWriter, req *http.Reque if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } + if err := parseEntMetaPartition(req, &args.EnterpriseMeta); err != nil { + return nil, err + } var out structs.IndexedCoordinates defer setMeta(resp, &out.QueryMeta) @@ -105,6 +108,9 @@ func (s *HTTPHandlers) CoordinateNode(resp http.ResponseWriter, req *http.Reques if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } + if err := parseEntMetaPartition(req, &args.EnterpriseMeta); err != nil { + return nil, err + } var out structs.IndexedCoordinates defer setMeta(resp, &out.QueryMeta) @@ -158,6 +164,10 @@ func (s *HTTPHandlers) CoordinateUpdate(resp http.ResponseWriter, req *http.Requ s.parseDC(req, &args.Datacenter) s.parseToken(req, &args.Token) + if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil { + return nil, err + } + var reply struct{} if err := s.agent.RPC("Coordinate.Update", &args, &reply); err != nil { return nil, err diff --git a/agent/dns.go b/agent/dns.go index 8224b27306..5693178c28 100644 --- a/agent/dns.go +++ b/agent/dns.go @@ -122,6 +122,8 @@ type DNSServer struct { // recursorEnabled stores whever the recursor handler is enabled as an atomic flag. // the recursor handler is only enabled if recursors are configured. This flag is used during config hot-reloading recursorEnabled uint32 + + defaultEnterpriseMeta structs.EnterpriseMeta } func NewDNSServer(a *Agent) (*DNSServer, error) { @@ -130,10 +132,11 @@ func NewDNSServer(a *Agent) (*DNSServer, error) { altDomain := dns.Fqdn(strings.ToLower(a.config.DNSAltDomain)) srv := &DNSServer{ - agent: a, - domain: domain, - altDomain: altDomain, - logger: a.logger.Named(logging.DNS), + agent: a, + domain: domain, + altDomain: altDomain, + logger: a.logger.Named(logging.DNS), + defaultEnterpriseMeta: *a.agentEnterpriseMeta(), } cfg, err := GetDNSConfig(a.config) if err != nil { @@ -414,7 +417,7 @@ func (d *DNSServer) handlePtr(resp dns.ResponseWriter, req *dns.Msg) { AllowStale: cfg.AllowStale, }, ServiceAddress: serviceAddress, - EnterpriseMeta: *structs.WildcardEnterpriseMetaInDefaultPartition(), + EnterpriseMeta: *d.defaultEnterpriseMeta.WildcardEnterpriseMetaForPartition(), } var sout structs.IndexedServiceNodes @@ -548,7 +551,7 @@ func (d *DNSServer) nameservers(cfg *dnsConfig, maxRecursionLevel int) (ns []dns Service: structs.ConsulServiceName, Connect: false, Ingress: false, - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + EnterpriseMeta: d.defaultEnterpriseMeta, }) if err != nil { d.logger.Warn("Unable to get list of servers", "error", err) @@ -645,8 +648,8 @@ func (d *DNSServer) dispatch(remoteAddr net.Addr, req, resp *dns.Msg, maxRecursi // By default the query is in the default datacenter datacenter := d.agent.config.Datacenter - // have to deref to clone it so we don't modify - var entMeta structs.EnterpriseMeta + // have to deref to clone it so we don't modify (start from the agent's defaults) + var entMeta = d.defaultEnterpriseMeta // Get the QName without the domain suffix qName := strings.ToLower(dns.Fqdn(req.Question[0].Name)) @@ -1316,9 +1319,10 @@ func (d *DNSServer) preparedQueryLookup(cfg *dnsConfig, datacenter, query string // send the local agent's data through to allow distance sorting // relative to ourself on the server side. Agent: structs.QuerySource{ - Datacenter: d.agent.config.Datacenter, - Segment: d.agent.config.SegmentName, - Node: d.agent.config.NodeName, + Datacenter: d.agent.config.Datacenter, + Segment: d.agent.config.SegmentName, + Node: d.agent.config.NodeName, + NodePartition: d.agent.config.PartitionOrEmpty(), }, } diff --git a/agent/http.go b/agent/http.go index 3c854287a8..af26cdc2c6 100644 --- a/agent/http.go +++ b/agent/http.go @@ -333,6 +333,11 @@ func (s *HTTPHandlers) nodeName() string { return s.agent.config.NodeName } +// nodePartition returns the node partition of the agent +func (s *HTTPHandlers) nodePartition() string { + return s.agent.config.PartitionOrEmpty() +} + // aclEndpointRE is used to find old ACL endpoints that take tokens in the URL // so that we can redact them. The ACL endpoints that take the token in the URL // are all of the form /v1/acl//, and can optionally include query @@ -1032,6 +1037,7 @@ func (s *HTTPHandlers) parseSource(req *http.Request, source *structs.QuerySourc } else { source.Node = node } + source.NodePartition = s.agent.config.PartitionOrEmpty() } } diff --git a/agent/http_oss.go b/agent/http_oss.go index 6622404726..4b7808ab25 100644 --- a/agent/http_oss.go +++ b/agent/http_oss.go @@ -17,7 +17,8 @@ func (s *HTTPHandlers) parseEntMeta(req *http.Request, entMeta *structs.Enterpri if queryNS := req.URL.Query().Get("ns"); queryNS != "" { return BadRequestError{Reason: "Invalid query parameter: \"ns\" - Namespaces are a Consul Enterprise feature"} } - return nil + + return parseEntMetaPartition(req, entMeta) } func (s *HTTPHandlers) validateEnterpriseIntentionNamespace(logName, ns string, _ bool) error { @@ -74,7 +75,13 @@ func (s *HTTPHandlers) uiTemplateDataTransform(data map[string]interface{}) erro return nil } -// parseEntMetaPartition is a noop for the enterprise implementation. func parseEntMetaPartition(req *http.Request, meta *structs.EnterpriseMeta) error { + if headerAP := req.Header.Get("X-Consul-Partition"); headerAP != "" { + return BadRequestError{Reason: "Invalid header: \"X-Consul-Partition\" - Partitions are a Consul Enterprise feature"} + } + if queryAP := req.URL.Query().Get("partition"); queryAP != "" { + return BadRequestError{Reason: "Invalid query parameter: \"partition\" - Partitions are a Consul Enterprise feature"} + } + return nil } diff --git a/agent/local/state.go b/agent/local/state.go index 5908c4af27..862f046fe2 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -58,6 +58,7 @@ type Config struct { DiscardCheckOutput bool NodeID types.NodeID NodeName string + Partition string // this defaults if empty TaggedAddresses map[string]string } @@ -176,6 +177,8 @@ type State struct { // Config is the agent config config Config + agentEnterpriseMeta structs.EnterpriseMeta + // nodeInfoInSync tracks whether the server has our correct top-level // node information in sync nodeInfoInSync bool @@ -208,14 +211,15 @@ type State struct { // NewState creates a new local state for the agent. func NewState(c Config, logger hclog.Logger, tokens *token.Store) *State { l := &State{ - config: c, - logger: logger, - services: make(map[structs.ServiceID]*ServiceState), - checks: make(map[structs.CheckID]*CheckState), - checkAliases: make(map[structs.ServiceID]map[structs.CheckID]chan<- struct{}), - metadata: make(map[string]string), - tokens: tokens, - notifyHandlers: make(map[chan<- struct{}]struct{}), + config: c, + logger: logger, + services: make(map[structs.ServiceID]*ServiceState), + checks: make(map[structs.CheckID]*CheckState), + checkAliases: make(map[structs.ServiceID]map[structs.CheckID]chan<- struct{}), + metadata: make(map[string]string), + tokens: tokens, + notifyHandlers: make(map[chan<- struct{}]struct{}), + agentEnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(c.Partition), } l.SetDiscardCheckOutput(c.DiscardCheckOutput) return l @@ -267,6 +271,10 @@ func (l *State) addServiceLocked(service *structs.NodeService, token string) err service.ID = service.Service } + if l.agentEnterpriseMeta.PartitionOrDefault() != service.PartitionOrDefault() { + return fmt.Errorf("cannot add service %q to node in partition %q", service.CompoundServiceID(), l.config.Partition) + } + l.setServiceStateLocked(&ServiceState{ Service: service, Token: token, @@ -340,8 +348,8 @@ func (l *State) removeServiceLocked(id structs.ServiceID) error { return nil } -// Service returns the locally registered service that the -// agent is aware of and are being kept in sync with the server +// Service returns the locally registered service that the agent is aware of +// with this ID and are being kept in sync with the server. func (l *State) Service(id structs.ServiceID) *structs.NodeService { l.RLock() defer l.RUnlock() @@ -353,9 +361,43 @@ func (l *State) Service(id structs.ServiceID) *structs.NodeService { return s.Service } -// Services returns the locally registered services that the +// ServicesByName returns all the locally registered service instances that the +// agent is aware of with this name and are being kept in sync with the server +func (l *State) ServicesByName(sn structs.ServiceName) []*structs.NodeService { + l.RLock() + defer l.RUnlock() + + var found []*structs.NodeService + for id, s := range l.services { + if s.Deleted { + continue + } + + if !sn.EnterpriseMeta.Matches(&id.EnterpriseMeta) { + continue + } + if s.Service.Service == sn.Name { + found = append(found, s.Service) + } + } + return found +} + +// AllServices returns the locally registered services that the // agent is aware of and are being kept in sync with the server +func (l *State) AllServices() map[structs.ServiceID]*structs.NodeService { + return l.listServices(false, nil) +} + +// Services returns the locally registered services that the agent is aware of +// and are being kept in sync with the server +// +// Results are scoped to the provided namespace and partition. func (l *State) Services(entMeta *structs.EnterpriseMeta) map[structs.ServiceID]*structs.NodeService { + return l.listServices(true, entMeta) +} + +func (l *State) listServices(filtered bool, entMeta *structs.EnterpriseMeta) map[structs.ServiceID]*structs.NodeService { l.RLock() defer l.RUnlock() @@ -365,7 +407,7 @@ func (l *State) Services(entMeta *structs.EnterpriseMeta) map[structs.ServiceID] continue } - if !entMeta.Matches(&id.EnterpriseMeta) { + if filtered && !entMeta.Matches(&id.EnterpriseMeta) { continue } m[id] = s.Service @@ -395,6 +437,10 @@ func (l *State) SetServiceState(s *ServiceState) { l.Lock() defer l.Unlock() + if l.agentEnterpriseMeta.PartitionOrDefault() != s.Service.PartitionOrDefault() { + return + } + l.setServiceStateLocked(s) } @@ -483,15 +529,19 @@ func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error { check.Output = "" } + // hard-set the node name and partition + check.Node = l.config.NodeName + check.EnterpriseMeta = structs.NewEnterpriseMetaWithPartition( + l.agentEnterpriseMeta.PartitionOrEmpty(), + check.NamespaceOrEmpty(), + ) + // if there is a serviceID associated with the check, make sure it exists before adding it // NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor if _, ok := l.services[check.CompoundServiceID()]; check.ServiceID != "" && !ok { return fmt.Errorf("Check %q refers to non-existent service %q", check.CheckID, check.ServiceID) } - // hard-set the node name - check.Node = l.config.NodeName - l.setCheckStateLocked(&CheckState{ Check: check, Token: token, @@ -510,6 +560,13 @@ func (l *State) AddAliasCheck(checkID structs.CheckID, srcServiceID structs.Serv l.Lock() defer l.Unlock() + if l.agentEnterpriseMeta.PartitionOrDefault() != checkID.PartitionOrDefault() { + return fmt.Errorf("cannot add alias check %q to node in partition %q", checkID.String(), l.config.Partition) + } + if l.agentEnterpriseMeta.PartitionOrDefault() != srcServiceID.PartitionOrDefault() { + return fmt.Errorf("cannot add alias check for %q to node in partition %q", srcServiceID.String(), l.config.Partition) + } + m, ok := l.checkAliases[srcServiceID] if !ok { m = make(map[structs.CheckID]chan<- struct{}) @@ -663,11 +720,23 @@ func (l *State) Check(id structs.CheckID) *structs.HealthCheck { return c.Check } +// AllChecks returns the locally registered checks that the +// agent is aware of and are being kept in sync with the server +func (l *State) AllChecks() map[structs.CheckID]*structs.HealthCheck { + return l.listChecks(false, nil) +} + // Checks returns the locally registered checks that the // agent is aware of and are being kept in sync with the server +// +// Results are scoped to the provided namespace and partition. func (l *State) Checks(entMeta *structs.EnterpriseMeta) map[structs.CheckID]*structs.HealthCheck { + return l.listChecks(true, entMeta) +} + +func (l *State) listChecks(filtered bool, entMeta *structs.EnterpriseMeta) map[structs.CheckID]*structs.HealthCheck { m := make(map[structs.CheckID]*structs.HealthCheck) - for id, c := range l.CheckStates(entMeta) { + for id, c := range l.listCheckStates(filtered, entMeta) { m[id] = c.Check } return m @@ -719,6 +788,10 @@ func (l *State) SetCheckState(c *CheckState) { l.Lock() defer l.Unlock() + if l.agentEnterpriseMeta.PartitionOrDefault() != c.Check.PartitionOrDefault() { + return + } + l.setCheckStateLocked(c) } @@ -737,11 +810,25 @@ func (l *State) setCheckStateLocked(c *CheckState) { l.TriggerSyncChanges() } +// AllCheckStates returns a shallow copy of all health check state records. +// The map contains a shallow copy of the current check states. +// +// The defer timers still point to the original values and must not be modified. +func (l *State) AllCheckStates() map[structs.CheckID]*CheckState { + return l.listCheckStates(false, nil) +} + // CheckStates returns a shallow copy of all health check state records. // The map contains a shallow copy of the current check states. // // The defer timers still point to the original values and must not be modified. +// +// Results are scoped to the provided namespace and partition. func (l *State) CheckStates(entMeta *structs.EnterpriseMeta) map[structs.CheckID]*CheckState { + return l.listCheckStates(true, entMeta) +} + +func (l *State) listCheckStates(filtered bool, entMeta *structs.EnterpriseMeta) map[structs.CheckID]*CheckState { l.RLock() defer l.RUnlock() @@ -750,7 +837,7 @@ func (l *State) CheckStates(entMeta *structs.EnterpriseMeta) map[structs.CheckID if c.Deleted { continue } - if !entMeta.Matches(&id.EnterpriseMeta) { + if filtered && !entMeta.Matches(&id.EnterpriseMeta) { continue } m[id] = c.Clone() @@ -758,12 +845,27 @@ func (l *State) CheckStates(entMeta *structs.EnterpriseMeta) map[structs.CheckID return m } +// AllCriticalCheckStates returns the locally registered checks that the +// agent is aware of and are being kept in sync with the server. +// The map contains a shallow copy of the current check states. +// +// The defer timers still point to the original values and must not be modified. +func (l *State) AllCriticalCheckStates() map[structs.CheckID]*CheckState { + return l.listCriticalCheckStates(false, nil) +} + // CriticalCheckStates returns the locally registered checks that the // agent is aware of and are being kept in sync with the server. // The map contains a shallow copy of the current check states. // // The defer timers still point to the original values and must not be modified. +// +// Results are scoped to the provided namespace and partition. func (l *State) CriticalCheckStates(entMeta *structs.EnterpriseMeta) map[structs.CheckID]*CheckState { + return l.listCriticalCheckStates(true, entMeta) +} + +func (l *State) listCriticalCheckStates(filtered bool, entMeta *structs.EnterpriseMeta) map[structs.CheckID]*CheckState { l.RLock() defer l.RUnlock() @@ -772,7 +874,7 @@ func (l *State) CriticalCheckStates(entMeta *structs.EnterpriseMeta) map[structs if c.Deleted || !c.Critical() { continue } - if !entMeta.Matches(&id.EnterpriseMeta) { + if filtered && !entMeta.Matches(&id.EnterpriseMeta) { continue } m[id] = c.Clone() @@ -887,7 +989,7 @@ func (l *State) updateSyncState() error { AllowStale: true, MaxStaleDuration: fullSyncReadMaxStale, }, - EnterpriseMeta: *structs.WildcardEnterpriseMetaInDefaultPartition(), + EnterpriseMeta: *l.agentEnterpriseMeta.WildcardEnterpriseMetaForPartition(), } var out1 structs.IndexedNodeServiceList @@ -958,7 +1060,7 @@ func (l *State) updateSyncState() error { if ls == nil { // The consul service is managed automatically and does // not need to be deregistered - if id == structs.ConsulCompoundServiceID { + if structs.IsConsulServiceID(id) { continue } @@ -1002,7 +1104,7 @@ func (l *State) updateSyncState() error { if lc == nil { // The Serf check is created automatically and does not // need to be deregistered. - if id == structs.SerfCompoundCheckID { + if structs.IsSerfCheckID(id) { l.logger.Debug("Skipping remote check since it is managed automatically", "check", structs.SerfCheckID) continue } @@ -1366,6 +1468,7 @@ func (l *State) syncNodeInfo() error { Address: l.config.AdvertiseAddr, TaggedAddresses: l.config.TaggedAddresses, NodeMeta: l.metadata, + EnterpriseMeta: l.agentEnterpriseMeta, WriteRequest: structs.WriteRequest{Token: at}, } var out struct{} diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index 2a2e1e6842..b69a4f720f 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -86,7 +86,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: s.proxyCfg.DestinationServiceName, - EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(s.proxyID.NamespaceOrEmpty()), + EnterpriseMeta: s.proxyID.EnterpriseMeta, }, intentionUpstreamsID, s.ch) if err != nil { return snap, err @@ -97,7 +97,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e Name: structs.MeshConfigMesh, Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + EnterpriseMeta: *s.proxyID.DefaultEnterpriseMetaForPartition(), }, meshConfigEntryID, s.ch) if err != nil { return snap, err @@ -228,7 +228,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv // Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream // This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled // by the ResolveServiceConfig endpoint. - wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMetaInDefaultPartition()) + wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, s.proxyID.WildcardEnterpriseMetaForPartition()) defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardSID.String()] if ok { u = defaults diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index aca122b0c5..083291c137 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -145,7 +145,7 @@ func (m *Manager) syncState() { defer m.mu.Unlock() // Traverse the local state and ensure all proxy services are registered - services := m.State.Services(structs.WildcardEnterpriseMetaInDefaultPartition()) + services := m.State.AllServices() for sid, svc := range services { if svc.Kind != structs.ServiceKindConnectProxy && svc.Kind != structs.ServiceKindTerminatingGateway && diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 8cd19a5139..324098f704 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -330,6 +330,7 @@ func TestManager_BasicLifecycle(t *testing.T) { rootsCacheKey, leafCacheKey, roots, webProxyCopy.(*structs.NodeService), + local.Config{}, expectSnapCopy.(*ConfigSnapshot), ) }) @@ -349,13 +350,14 @@ func testManager_BasicLifecycle( rootsCacheKey, leafCacheKey string, roots *structs.IndexedCARoots, webProxy *structs.NodeService, + agentConfig local.Config, expectSnap *ConfigSnapshot, ) { c := TestCacheWithTypes(t, types) require := require.New(t) logger := testutil.Logger(t) - state := local.NewState(local.Config{}, logger, &token.Store{}) + state := local.NewState(agentConfig, logger, &token.Store{}) source := &structs.QuerySource{Datacenter: "dc1"} // Stub state syncing diff --git a/agent/proxycfg/mesh_gateway.go b/agent/proxycfg/mesh_gateway.go index 7a402441b6..5ffc590b4f 100644 --- a/agent/proxycfg/mesh_gateway.go +++ b/agent/proxycfg/mesh_gateway.go @@ -29,12 +29,14 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er return snap, err } + wildcardEntMeta := s.proxyID.WildcardEnterpriseMetaForPartition() + // Watch for all services err = s.cache.Notify(ctx, cachetype.CatalogServiceListName, &structs.DCSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Source: *s.source, - EnterpriseMeta: *structs.WildcardEnterpriseMetaInDefaultPartition(), + EnterpriseMeta: *wildcardEntMeta, }, serviceListWatchID, s.ch) if err != nil { @@ -85,7 +87,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Kind: structs.ServiceResolver, - EnterpriseMeta: *structs.WildcardEnterpriseMetaInDefaultPartition(), + EnterpriseMeta: *wildcardEntMeta, }, serviceResolversWatchID, s.ch) if err != nil { s.logger.Named(logging.MeshGateway). diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index 7292cb44f5..dd5465d3d8 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -127,6 +127,7 @@ func TestUpstreamNodes(t testing.T) structs.CheckServiceNodes { Node: "test1", Address: "10.10.1.1", Datacenter: "dc1", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), }, Service: structs.TestNodeService(t), }, @@ -136,6 +137,7 @@ func TestUpstreamNodes(t testing.T) structs.CheckServiceNodes { Node: "test2", Address: "10.10.1.2", Datacenter: "dc1", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), }, Service: structs.TestNodeService(t), }, diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index ff0f1f2cdb..310290c25e 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -229,7 +229,8 @@ func (s *handlerUpstreams) resetWatchesFromChain( // Outside of transparent mode we only watch the chain target, B, // since A is a virtual service and traffic will not be sent to it. if !watchedChainEndpoints && s.proxyCfg.Mode == structs.ProxyModeTransparent { - chainEntMeta := structs.NewEnterpriseMetaInDefaultPartition(chain.Namespace) + // TODO(partitions): add partition to the disco chain + chainEntMeta := structs.NewEnterpriseMetaWithPartition("" /*TODO*/, chain.Namespace) opts := targetWatchOpts{ upstreamID: id, diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index be19e9fa9e..0e7bfb24d2 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -51,7 +51,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub logger.Trace("new subscription") defer logger.Trace("subscription closed") - entMeta := structs.NewEnterpriseMetaInDefaultPartition(req.Namespace) + entMeta := structs.NewEnterpriseMetaWithPartition(req.Partition, req.Namespace) authz, err := h.Backend.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil) if err != nil { return err @@ -94,6 +94,7 @@ func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs Token: req.Token, Index: req.Index, Namespace: entMeta.NamespaceOrEmpty(), + Partition: entMeta.PartitionOrEmpty(), } } diff --git a/agent/rpcclient/health/view.go b/agent/rpcclient/health/view.go index 2f31dde210..60af617f00 100644 --- a/agent/rpcclient/health/view.go +++ b/agent/rpcclient/health/view.go @@ -29,6 +29,7 @@ func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index ui Datacenter: srvReq.Datacenter, Index: index, Namespace: srvReq.EnterpriseMeta.NamespaceOrEmpty(), + Partition: srvReq.EnterpriseMeta.PartitionOrEmpty(), } if srvReq.Connect { req.Topic = pbsubscribe.Topic_ServiceHealthConnect diff --git a/agent/sidecar_service.go b/agent/sidecar_service.go index da87b6fccf..673a02252e 100644 --- a/agent/sidecar_service.go +++ b/agent/sidecar_service.go @@ -126,7 +126,7 @@ func (a *Agent) sidecarServiceFromNodeService(ns *structs.NodeService, token str // it doesn't seem to be necessary - even with thousands of services this is // not expensive to compute. usedPorts := make(map[int]struct{}) - for _, otherNS := range a.State.Services(structs.WildcardEnterpriseMetaInDefaultPartition()) { + for _, otherNS := range a.State.AllServices() { // Check if other port is in auto-assign range if otherNS.Port >= a.config.ConnectSidecarMinPort && otherNS.Port <= a.config.ConnectSidecarMaxPort { diff --git a/agent/structs/catalog.go b/agent/structs/catalog.go index 7f3fa4a157..b118b99352 100644 --- a/agent/structs/catalog.go +++ b/agent/structs/catalog.go @@ -19,8 +19,3 @@ const ( ConsulServiceID = "consul" ConsulServiceName = "consul" ) - -var ( - ConsulCompoundServiceID = NewServiceID(ConsulServiceID, nil) // TODO(partitions): delete this in favor of IsConsulServiceID(ServiceID) - SerfCompoundCheckID = NewCheckID(SerfCheckID, nil) // TODO(partitions): delete this in favor of IsSerfCheckID(CheckID) -) diff --git a/agent/txn_endpoint.go b/agent/txn_endpoint.go index 42c2251c6d..afe298d629 100644 --- a/agent/txn_endpoint.go +++ b/agent/txn_endpoint.go @@ -152,11 +152,14 @@ func (s *HTTPHandlers) convertOps(resp http.ResponseWriter, req *http.Request) ( KV: &structs.TxnKVOp{ Verb: verb, DirEnt: structs.DirEntry{ - Key: in.KV.Key, - Value: in.KV.Value, - Flags: in.KV.Flags, - Session: in.KV.Session, - EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(in.KV.Namespace), + Key: in.KV.Key, + Value: in.KV.Value, + Flags: in.KV.Flags, + Session: in.KV.Session, + EnterpriseMeta: structs.NewEnterpriseMetaWithPartition( + in.KV.Partition, + in.KV.Namespace, + ), RaftIndex: structs.RaftIndex{ ModifyIndex: in.KV.Index, }, @@ -182,6 +185,7 @@ func (s *HTTPHandlers) convertOps(resp http.ResponseWriter, req *http.Request) ( Node: structs.Node{ ID: types.NodeID(node.ID), Node: node.Node, + Partition: node.Partition, Address: node.Address, Datacenter: node.Datacenter, TaggedAddresses: node.TaggedAddresses, @@ -216,7 +220,10 @@ func (s *HTTPHandlers) convertOps(resp http.ResponseWriter, req *http.Request) ( Warning: svc.Weights.Warning, }, EnableTagOverride: svc.EnableTagOverride, - EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(svc.Namespace), + EnterpriseMeta: structs.NewEnterpriseMetaWithPartition( + svc.Partition, + svc.Namespace, + ), RaftIndex: structs.RaftIndex{ ModifyIndex: svc.ModifyIndex, }, @@ -274,7 +281,10 @@ func (s *HTTPHandlers) convertOps(resp http.ResponseWriter, req *http.Request) ( Timeout: timeout, DeregisterCriticalServiceAfter: deregisterCriticalServiceAfter, }, - EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(check.Namespace), + EnterpriseMeta: structs.NewEnterpriseMetaWithPartition( + check.Partition, + check.Namespace, + ), RaftIndex: structs.RaftIndex{ ModifyIndex: check.ModifyIndex, }, diff --git a/agent/ui_endpoint.go b/agent/ui_endpoint.go index df2505759b..92fb70d6e7 100644 --- a/agent/ui_endpoint.go +++ b/agent/ui_endpoint.go @@ -603,6 +603,9 @@ func (s *HTTPHandlers) UIMetricsProxy(resp http.ResponseWriter, req *http.Reques s.clearTokenFromHeaders(req) var entMeta structs.EnterpriseMeta + if err := parseEntMetaPartition(req, &entMeta); err != nil { + return nil, err + } authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, nil) if err != nil { return nil, err @@ -611,9 +614,8 @@ func (s *HTTPHandlers) UIMetricsProxy(resp http.ResponseWriter, req *http.Reques // This endpoint requires wildcard read on all services and all nodes. // // In enterprise it requires this _in all namespaces_ too. - wildMeta := structs.WildcardEnterpriseMetaInDefaultPartition() var authzContext acl.AuthorizerContext - wildMeta.FillAuthzContext(&authzContext) + entMeta.WildcardEnterpriseMetaForPartition().FillAuthzContext(&authzContext) if authz.NodeReadAll(&authzContext) != acl.Allow || authz.ServiceReadAll(&authzContext) != acl.Allow { return nil, acl.ErrPermissionDenied diff --git a/agent/user_event.go b/agent/user_event.go index d3b6224b03..3df2b6bd82 100644 --- a/agent/user_event.go +++ b/agent/user_event.go @@ -187,6 +187,7 @@ func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool { } // Scan for a match + // NOTE: this only works in the default partition and default namespace services := a.State.Services(structs.DefaultEnterpriseMetaInDefaultPartition()) found := false OUTER: diff --git a/api/agent_test.go b/api/agent_test.go index 082857a484..f875092517 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -15,10 +15,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/serf/serf" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/sdk/testutil/retry" ) func TestAPI_AgentSelf(t *testing.T) { @@ -793,6 +794,7 @@ func TestAPI_AgentService(t *testing.T) { }, Meta: map[string]string{}, Namespace: defaultNamespace, + Partition: defaultPartition, Datacenter: "dc1", } require.Equal(expect, got) diff --git a/api/coordinate_test.go b/api/coordinate_test.go index 8c8e986145..071b1f99e4 100644 --- a/api/coordinate_test.go +++ b/api/coordinate_test.go @@ -5,9 +5,10 @@ import ( "testing" "time" - "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/serf/coordinate" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/sdk/testutil/retry" ) func TestAPI_CoordinateDatacenters(t *testing.T) { @@ -85,8 +86,9 @@ func TestAPI_CoordinateUpdate(t *testing.T) { newCoord := coordinate.NewCoordinate(coordinate.DefaultConfig()) newCoord.Height = 0.5 entry := &CoordinateEntry{ - Node: node, - Coord: newCoord, + Node: node, + Partition: defaultPartition, + Coord: newCoord, } _, err = coord.Update(entry, nil) if err != nil { diff --git a/command/rtt/rtt.go b/command/rtt/rtt.go index 21e174157e..0710a4ef83 100644 --- a/command/rtt/rtt.go +++ b/command/rtt/rtt.go @@ -5,12 +5,16 @@ import ( "fmt" "strings" - "github.com/hashicorp/consul/command/flags" - "github.com/hashicorp/consul/lib" "github.com/hashicorp/serf/coordinate" "github.com/mitchellh/cli" + + "github.com/hashicorp/consul/command/flags" + "github.com/hashicorp/consul/lib" ) +// TODO(partitions): how will this command work when asking for RTT between a +// partitioned client and a server in the default partition? + func New(ui cli.Ui) *cmd { c := &cmd{UI: ui} c.init()