mirror of https://github.com/hashicorp/consul
375 lines
12 KiB
Go
375 lines
12 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package controller
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
"google.golang.org/grpc"
|
|
|
|
mockpbresource "github.com/hashicorp/consul/grpcmocks/proto-public/pbresource"
|
|
"github.com/hashicorp/consul/internal/resource"
|
|
"github.com/hashicorp/consul/internal/resource/resourcetest"
|
|
"github.com/hashicorp/consul/internal/storage"
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
"github.com/hashicorp/consul/proto/private/prototest"
|
|
"github.com/hashicorp/consul/sdk/testutil"
|
|
)
|
|
|
|
var (
|
|
fakeType = &pbresource.Type{
|
|
Group: "testing",
|
|
GroupVersion: "v1",
|
|
Kind: "Fake",
|
|
}
|
|
|
|
fakeV2Type = &pbresource.Type{
|
|
Group: "testing",
|
|
GroupVersion: "v2",
|
|
Kind: "Fake",
|
|
}
|
|
)
|
|
|
|
type memCheckResult struct {
|
|
clientGet *pbresource.Resource
|
|
clientGetError error
|
|
cacheGet *pbresource.Resource
|
|
cacheGetError error
|
|
}
|
|
|
|
type memCheckReconciler struct {
|
|
mu sync.Mutex
|
|
closed bool
|
|
reconcileCh chan memCheckResult
|
|
mapCh chan memCheckResult
|
|
}
|
|
|
|
func newMemCheckReconciler(t testutil.TestingTB) *memCheckReconciler {
|
|
t.Helper()
|
|
|
|
r := &memCheckReconciler{
|
|
reconcileCh: make(chan memCheckResult, 10),
|
|
mapCh: make(chan memCheckResult, 10),
|
|
}
|
|
|
|
t.Cleanup(r.Shutdown)
|
|
return r
|
|
}
|
|
|
|
func (r *memCheckReconciler) Shutdown() {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.closed = true
|
|
close(r.reconcileCh)
|
|
close(r.mapCh)
|
|
}
|
|
|
|
func (r *memCheckReconciler) requireNotClosed(t testutil.TestingTB) {
|
|
t.Helper()
|
|
if r.closed {
|
|
require.FailNow(t, "the memCheckReconciler has been closed")
|
|
}
|
|
}
|
|
|
|
func (r *memCheckReconciler) checkReconcileResult(t testutil.TestingTB, ctx context.Context, res *pbresource.Resource) {
|
|
t.Helper()
|
|
r.requireEqualNotSameMemCheckResult(t, ctx, r.reconcileCh, res)
|
|
}
|
|
|
|
func (r *memCheckReconciler) checkMapResult(t testutil.TestingTB, ctx context.Context, res *pbresource.Resource) {
|
|
t.Helper()
|
|
r.requireEqualNotSameMemCheckResult(t, ctx, r.mapCh, res)
|
|
}
|
|
|
|
func (r *memCheckReconciler) requireEqualNotSameMemCheckResult(t testutil.TestingTB, ctx context.Context, ch <-chan memCheckResult, res *pbresource.Resource) {
|
|
t.Helper()
|
|
|
|
select {
|
|
case result := <-ch:
|
|
require.NoError(t, result.clientGetError)
|
|
require.NoError(t, result.cacheGetError)
|
|
// Equal but NotSame means the values are all the same but
|
|
// the pointers are different. Note that this probably doesn't
|
|
// check that the values within the resource haven't been shallow
|
|
// copied but that probably should be checked elsewhere
|
|
prototest.AssertDeepEqual(t, res, result.clientGet)
|
|
require.NotSame(t, res, result.clientGet)
|
|
prototest.AssertDeepEqual(t, res, result.cacheGet)
|
|
require.NotSame(t, res, result.cacheGet)
|
|
case <-ctx.Done():
|
|
require.Fail(t, "didn't receive mem check result before context cancellation", ctx.Err())
|
|
}
|
|
}
|
|
|
|
func (r *memCheckReconciler) Reconcile(ctx context.Context, rt Runtime, req Request) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if !r.closed {
|
|
r.getAndSend(ctx, rt, req.ID, r.reconcileCh)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *memCheckReconciler) MapToNothing(
|
|
ctx context.Context,
|
|
rt Runtime,
|
|
res *pbresource.Resource,
|
|
) ([]Request, error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if !r.closed {
|
|
r.getAndSend(ctx, rt, res.Id, r.mapCh)
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (*memCheckReconciler) getAndSend(ctx context.Context, rt Runtime, id *pbresource.ID, ch chan<- memCheckResult) {
|
|
var res memCheckResult
|
|
response, err := rt.Client.Read(ctx, &pbresource.ReadRequest{
|
|
Id: id,
|
|
})
|
|
res.clientGetError = err
|
|
if response != nil {
|
|
res.clientGet = response.Resource
|
|
}
|
|
|
|
res.cacheGet, res.cacheGetError = rt.Cache.Get(id.Type, "id", id)
|
|
|
|
ch <- res
|
|
}
|
|
|
|
func watchListEvents(t testutil.TestingTB, events ...*pbresource.WatchEvent) pbresource.ResourceService_WatchListClient {
|
|
t.Helper()
|
|
ctx := testutil.TestContext(t)
|
|
|
|
watchListClient := mockpbresource.NewResourceService_WatchListClient(t)
|
|
|
|
// Return the events in the specified order as soon as they are requested
|
|
for _, event := range events {
|
|
evt := event
|
|
watchListClient.EXPECT().
|
|
Recv().
|
|
RunAndReturn(func() (*pbresource.WatchEvent, error) {
|
|
return evt, nil
|
|
}).
|
|
Once()
|
|
}
|
|
|
|
watchListClient.EXPECT().
|
|
Recv().
|
|
RunAndReturn(func() (*pbresource.WatchEvent, error) {
|
|
return &pbresource.WatchEvent{
|
|
Event: &pbresource.WatchEvent_EndOfSnapshot_{
|
|
EndOfSnapshot: &pbresource.WatchEvent_EndOfSnapshot{},
|
|
},
|
|
}, nil
|
|
}).
|
|
Once()
|
|
|
|
// Now that all specified events have been exhausted we loop until the test finishes
|
|
// and the context bound to the tests lifecycle has been cancelled. This prevents getting
|
|
// any weird errors from the controller manager/runner.
|
|
watchListClient.EXPECT().
|
|
Recv().
|
|
RunAndReturn(func() (*pbresource.WatchEvent, error) {
|
|
<-ctx.Done()
|
|
return nil, ctx.Err()
|
|
}).
|
|
Maybe()
|
|
|
|
return watchListClient
|
|
}
|
|
|
|
// TestControllerRuntimeMemoryCloning mainly is testing that the runtimes
|
|
// provided to reconcilers and dependency mappers will return data from
|
|
// the resource service client and the cache that have been cloned so that
|
|
// the controller should be free to modify the data as needed.
|
|
func TestControllerRuntimeMemoryCloning(t *testing.T) {
|
|
ctx := testutil.TestContext(t)
|
|
|
|
// create some resources to use during the test
|
|
res1 := resourcetest.Resource(fakeType, "foo").
|
|
WithTenancy(resource.DefaultNamespacedTenancy()).
|
|
Build()
|
|
res2 := resourcetest.Resource(fakeV2Type, "bar").
|
|
WithTenancy(resource.DefaultNamespacedTenancy()).
|
|
Build()
|
|
|
|
// create the reconciler that will read the desired resource
|
|
// from both the resource service client and the cache client.
|
|
reconciler := newMemCheckReconciler(t)
|
|
|
|
// Create the v1 watch list client to be returned when the controller runner
|
|
// calls WatchList on the v1 testing type.
|
|
v1WatchListClientCreate := func() pbresource.ResourceService_WatchListClient {
|
|
return watchListEvents(t, &pbresource.WatchEvent{
|
|
Event: &pbresource.WatchEvent_Upsert_{
|
|
Upsert: &pbresource.WatchEvent_Upsert{
|
|
Resource: res1,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
// Create the v2 watch list client to be returned when the controller runner
|
|
// calls WatchList on the v2 testing type.
|
|
v2WatchListClientCreate := func() pbresource.ResourceService_WatchListClient {
|
|
return watchListEvents(t, nil, &pbresource.WatchEvent{
|
|
Event: &pbresource.WatchEvent_Upsert_{
|
|
Upsert: &pbresource.WatchEvent_Upsert{
|
|
Resource: res2,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
// Create the mock resource service client
|
|
mres := mockpbresource.NewResourceServiceClient(t)
|
|
// Setup the expectation for the controller runner to issue a WatchList
|
|
// request for the managed type (fake v2 type)
|
|
mres.EXPECT().
|
|
WatchList(mock.Anything, &pbresource.WatchListRequest{
|
|
Type: fakeV2Type,
|
|
Tenancy: &pbresource.Tenancy{
|
|
Partition: storage.Wildcard,
|
|
Namespace: storage.Wildcard,
|
|
},
|
|
}).
|
|
RunAndReturn(func(_ context.Context, _ *pbresource.WatchListRequest, _ ...grpc.CallOption) (pbresource.ResourceService_WatchListClient, error) {
|
|
return v2WatchListClientCreate(), nil
|
|
}).
|
|
Twice() // once for cache prime, once for the rest
|
|
|
|
// Setup the expectation for the controller runner to issue a WatchList
|
|
// request for the secondary Watch type (fake v1 type)
|
|
mres.EXPECT().
|
|
WatchList(mock.Anything, &pbresource.WatchListRequest{
|
|
Type: fakeType,
|
|
Tenancy: &pbresource.Tenancy{
|
|
Partition: storage.Wildcard,
|
|
Namespace: storage.Wildcard,
|
|
},
|
|
}).
|
|
RunAndReturn(func(_ context.Context, _ *pbresource.WatchListRequest, _ ...grpc.CallOption) (pbresource.ResourceService_WatchListClient, error) {
|
|
return v1WatchListClientCreate(), nil
|
|
}).
|
|
Twice() // once for cache prime, once for the rest
|
|
|
|
// The cloning resource clients will forward actual calls onto the main resource service client.
|
|
// Here we are configuring the service mock to return either of the resources depending on the
|
|
// id present in the request.
|
|
mres.EXPECT().
|
|
Read(mock.Anything, mock.Anything).
|
|
RunAndReturn(func(_ context.Context, req *pbresource.ReadRequest, opts ...grpc.CallOption) (*pbresource.ReadResponse, error) {
|
|
res := res2
|
|
if resource.EqualID(res1.Id, req.Id) {
|
|
res = res1
|
|
}
|
|
return &pbresource.ReadResponse{Resource: res}, nil
|
|
}).
|
|
Times(0)
|
|
|
|
// create the test controller
|
|
ctl := NewController("test", fakeV2Type).
|
|
WithWatch(fakeType, reconciler.MapToNothing).
|
|
WithReconciler(reconciler)
|
|
|
|
// create the controller manager and register our test controller
|
|
manager := NewManager(mres, testutil.Logger(t))
|
|
manager.Register(ctl)
|
|
|
|
// run the controller manager
|
|
manager.SetRaftLeader(true)
|
|
go manager.Run(ctx)
|
|
|
|
// All future assertions should easily be able to run within 5s although they
|
|
// should typically run a couple orders of magnitude faster.
|
|
timeLimitedCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
t.Cleanup(cancel)
|
|
|
|
// validate that the v2 resource type event was seen and that the
|
|
// cache and the resource service client return cloned resources
|
|
reconciler.checkReconcileResult(t, timeLimitedCtx, res2)
|
|
|
|
// Validate that the dependency mapper's resource and cache clients return
|
|
// cloned resources.
|
|
reconciler.checkMapResult(t, timeLimitedCtx, res1)
|
|
}
|
|
|
|
// TestRunnerSharedMemoryCache is mainly testing to ensure that resources
|
|
// within the cache are shared with the resource service and have not been
|
|
// cloned.
|
|
func TestControllerRunnerSharedMemoryCache(t *testing.T) {
|
|
ctx := testutil.TestContext(t)
|
|
|
|
// create resource to use during the test
|
|
res := resourcetest.Resource(fakeV2Type, "bar").
|
|
WithTenancy(resource.DefaultNamespacedTenancy()).
|
|
Build()
|
|
|
|
// create the reconciler that will read the desired resource
|
|
// from both the resource service client and the cache client.
|
|
reconciler := newMemCheckReconciler(t)
|
|
|
|
// Create the v2 watch list client to be returned when the controller runner
|
|
// calls WatchList on the v2 testing type.
|
|
v2WatchListClientCreate := func() pbresource.ResourceService_WatchListClient {
|
|
return watchListEvents(t, nil, &pbresource.WatchEvent{
|
|
Event: &pbresource.WatchEvent_Upsert_{
|
|
Upsert: &pbresource.WatchEvent_Upsert{
|
|
Resource: res,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
// Create the mock resource service client
|
|
mres := mockpbresource.NewResourceServiceClient(t)
|
|
// Setup the expectation for the controller runner to issue a WatchList
|
|
// request for the managed type (fake v2 type)
|
|
mres.EXPECT().
|
|
WatchList(mock.Anything, &pbresource.WatchListRequest{
|
|
Type: fakeV2Type,
|
|
Tenancy: &pbresource.Tenancy{
|
|
Partition: storage.Wildcard,
|
|
Namespace: storage.Wildcard,
|
|
},
|
|
}).
|
|
RunAndReturn(func(_ context.Context, _ *pbresource.WatchListRequest, _ ...grpc.CallOption) (pbresource.ResourceService_WatchListClient, error) {
|
|
return v2WatchListClientCreate(), nil
|
|
}).
|
|
Twice() // once for cache prime, once for the rest
|
|
|
|
// The cloning resource clients will forward actual calls onto the main resource service client.
|
|
// Here we are configuring the service mock to return our singular resource always.
|
|
mres.EXPECT().
|
|
Read(mock.Anything, mock.Anything).
|
|
Return(&pbresource.ReadResponse{Resource: res}, nil).
|
|
Times(0)
|
|
|
|
// create the test controller
|
|
ctl := NewController("test", fakeV2Type).
|
|
WithReconciler(reconciler)
|
|
|
|
runner := newControllerRunner(ctl, mres, testutil.Logger(t))
|
|
go runner.run(ctx)
|
|
|
|
// Wait for reconcile to be called before we check the values in
|
|
// the cache. This will also validate that the resource service client
|
|
// and cache client given to the reconciler cloned the resource but
|
|
// that is tested more thoroughly in another test and isn't of primary
|
|
// concern here.
|
|
reconciler.checkReconcileResult(t, ctx, res)
|
|
|
|
// Now validate that the cache hold the same resource pointer as the original data
|
|
actual, err := runner.cache.Get(fakeV2Type, "id", res.Id)
|
|
require.NoError(t, err)
|
|
require.Same(t, res, actual)
|
|
}
|