Merge pull request #9284 from hashicorp/dnephin/agent-service-register

local: mark service as InSync when added to local agent state
pull/9295/head
Daniel Nephin 2020-11-27 15:49:55 -05:00 committed by GitHub
commit 9f0f2bd589
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 77 additions and 15 deletions

3
.changelog/9284.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
agent: prevent duplicate services and check registrations from being synced to servers.
```

View File

@ -3,12 +3,14 @@ package ae
import (
"fmt"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
"math"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
)
// scaleThreshold is the number of nodes after which regular sync runs are
@ -173,8 +175,7 @@ func (s *StateSyncer) nextFSMState(fs fsmState) fsmState {
return retryFullSyncState
}
err := s.State.SyncFull()
if err != nil {
if err := s.State.SyncFull(); err != nil {
s.Logger.Error("failed to sync remote state", "error", err)
return retryFullSyncState
}

View File

@ -11,13 +11,14 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
)
var StateCounters = []prometheus.CounterDefinition{
@ -287,7 +288,6 @@ func (l *State) AddServiceWithChecks(service *structs.NodeService, checks []*str
return err
}
}
return nil
}
@ -399,12 +399,14 @@ func (l *State) SetServiceState(s *ServiceState) {
}
func (l *State) setServiceStateLocked(s *ServiceState) {
s.WatchCh = make(chan struct{}, 1)
key := s.Service.CompoundServiceID()
old, hasOld := l.services[key]
if hasOld {
s.InSync = s.Service.IsSame(old.Service)
}
l.services[key] = s
s.WatchCh = make(chan struct{}, 1)
if hasOld && old.WatchCh != nil {
close(old.WatchCh)
}
@ -722,7 +724,13 @@ func (l *State) SetCheckState(c *CheckState) {
}
func (l *State) setCheckStateLocked(c *CheckState) {
l.checks[c.Check.CompoundCheckID()] = c
id := c.Check.CompoundCheckID()
existing := l.checks[id]
if existing != nil {
c.InSync = c.Check.IsSame(existing.Check)
}
l.checks[id] = c
// If this is a check for an aliased service, then notify the waiters.
l.notifyIfAliased(c.Check.CompoundServiceID())
@ -868,8 +876,8 @@ func (l *State) Stats() map[string]string {
}
}
// updateSyncState does a read of the server state, and updates
// the local sync status as appropriate
// updateSyncState queries the server for all the services and checks in the catalog
// registered to this node, and updates the local entries as InSync or Deleted.
func (l *State) updateSyncState() error {
// Get all checks and services from the master
req := structs.NodeSpecificRequest{

View File

@ -7,8 +7,9 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/config"
@ -17,9 +18,8 @@ import (
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func unNilMap(in map[string]string) map[string]string {
@ -28,6 +28,7 @@ func unNilMap(in map[string]string) map[string]string {
}
return in
}
func TestAgentAntiEntropy_Services(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t, "")
@ -2195,3 +2196,52 @@ func drainCh(ch chan struct{}) {
}
}
}
func TestState_SyncChanges_DuplicateAddServiceOnlySyncsOnce(t *testing.T) {
state := local.NewState(local.Config{}, hclog.New(nil), new(token.Store))
rpc := &fakeRPC{}
state.Delegate = rpc
state.TriggerSyncChanges = func() {}
srv := &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "the-service-id",
Service: "web",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
checks := []*structs.HealthCheck{
{Node: "this-node", CheckID: "the-id-1", Name: "check-healthy-1"},
{Node: "this-node", CheckID: "the-id-2", Name: "check-healthy-2"},
}
tok := "the-token"
err := state.AddServiceWithChecks(srv, checks, tok)
require.NoError(t, err)
require.NoError(t, state.SyncChanges())
// 4 rpc calls, one node register, one service register, two checks
require.Len(t, rpc.calls, 4)
// adding the service again should not catalog register
err = state.AddServiceWithChecks(srv, checks, tok)
require.NoError(t, err)
require.NoError(t, state.SyncChanges())
require.Len(t, rpc.calls, 4)
}
type fakeRPC struct {
calls []callRPC
}
type callRPC struct {
method string
args interface{}
reply interface{}
}
func (f *fakeRPC) RPC(method string, args interface{}, reply interface{}) error {
f.calls = append(f.calls, callRPC{method: method, args: args, reply: reply})
return nil
}
func (f *fakeRPC) ResolveTokenToIdentity(_ string) (structs.ACLIdentity, error) {
return nil, nil
}