You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/internal/controller/mem_consistency_test.go

341 lines
10 KiB

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package controller
import (
"context"
"sync"
"testing"
"time"
mockpbresource "github.com/hashicorp/consul/grpcmocks/proto-public/pbresource"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/resourcetest"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/prototest"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
var (
fakeType = &pbresource.Type{
Group: "testing",
GroupVersion: "v1",
Kind: "Fake",
}
fakeV2Type = &pbresource.Type{
Group: "testing",
GroupVersion: "v2",
Kind: "Fake",
}
)
type memCheckResult struct {
clientGet *pbresource.Resource
clientGetError error
cacheGet *pbresource.Resource
cacheGetError error
}
type memCheckReconciler struct {
mu sync.Mutex
closed bool
reconcileCh chan memCheckResult
mapCh chan memCheckResult
}
func newMemCheckReconciler(t testutil.TestingTB) *memCheckReconciler {
t.Helper()
r := &memCheckReconciler{
reconcileCh: make(chan memCheckResult, 10),
mapCh: make(chan memCheckResult, 10),
}
t.Cleanup(r.Shutdown)
return r
}
func (r *memCheckReconciler) Shutdown() {
r.mu.Lock()
defer r.mu.Unlock()
r.closed = true
close(r.reconcileCh)
close(r.mapCh)
}
func (r *memCheckReconciler) requireNotClosed(t testutil.TestingTB) {
t.Helper()
if r.closed {
require.FailNow(t, "the memCheckReconciler has been closed")
}
}
func (r *memCheckReconciler) checkReconcileResult(t testutil.TestingTB, ctx context.Context, res *pbresource.Resource) {
t.Helper()
r.requireEqualNotSameMemCheckResult(t, ctx, r.reconcileCh, res)
}
func (r *memCheckReconciler) checkMapResult(t testutil.TestingTB, ctx context.Context, res *pbresource.Resource) {
t.Helper()
r.requireEqualNotSameMemCheckResult(t, ctx, r.mapCh, res)
}
func (r *memCheckReconciler) requireEqualNotSameMemCheckResult(t testutil.TestingTB, ctx context.Context, ch <-chan memCheckResult, res *pbresource.Resource) {
t.Helper()
select {
case result := <-ch:
require.NoError(t, result.clientGetError)
require.NoError(t, result.cacheGetError)
// Equal but NotSame means the values are all the same but
// the pointers are different. Note that this probably doesn't
// check that the values within the resource haven't been shallow
// copied but that probably should be checked elsewhere
prototest.AssertDeepEqual(t, res, result.clientGet)
require.NotSame(t, res, result.clientGet)
prototest.AssertDeepEqual(t, res, result.cacheGet)
require.NotSame(t, res, result.cacheGet)
case <-ctx.Done():
require.Fail(t, "didn't receive mem check result before context cancellation", ctx.Err())
}
}
func (r *memCheckReconciler) Reconcile(ctx context.Context, rt Runtime, req Request) error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.closed {
r.getAndSend(ctx, rt, req.ID, r.reconcileCh)
}
return nil
}
func (r *memCheckReconciler) MapToNothing(
ctx context.Context,
rt Runtime,
res *pbresource.Resource,
) ([]Request, error) {
r.mu.Lock()
defer r.mu.Unlock()
if !r.closed {
r.getAndSend(ctx, rt, res.Id, r.mapCh)
}
return nil, nil
}
func (*memCheckReconciler) getAndSend(ctx context.Context, rt Runtime, id *pbresource.ID, ch chan<- memCheckResult) {
var res memCheckResult
response, err := rt.Client.Read(ctx, &pbresource.ReadRequest{
Id: id,
})
res.clientGetError = err
if response != nil {
res.clientGet = response.Resource
}
res.cacheGet, res.cacheGetError = rt.Cache.Get(id.Type, "id", id)
ch <- res
}
func watchListEvents(t testutil.TestingTB, events ...*pbresource.WatchEvent) pbresource.ResourceService_WatchListClient {
t.Helper()
ctx := testutil.TestContext(t)
watchListClient := mockpbresource.NewResourceService_WatchListClient(t)
// Return the events in the specified order as soon as they are requested
for _, event := range events {
watchListClient.EXPECT().
Recv().
RunAndReturn(func() (*pbresource.WatchEvent, error) {
return event, nil
}).
Once()
}
// Now that all specified events have been exhausted we loop until the test finishes
// and the context bound to the tests lifecycle has been cancelled. This prevents getting
// any weird errors from the controller manager/runner.
watchListClient.EXPECT().
Recv().
RunAndReturn(func() (*pbresource.WatchEvent, error) {
<-ctx.Done()
return nil, ctx.Err()
}).
Maybe()
return watchListClient
}
// TestControllerRuntimeMemoryCloning mainly is testing that the runtimes
// provided to reconcilers and dependency mappers will return data from
// the resource service client and the cache that have been cloned so that
// the controller should be free to modify the data as needed.
func TestControllerRuntimeMemoryCloning(t *testing.T) {
ctx := testutil.TestContext(t)
// create some resources to use during the test
res1 := resourcetest.Resource(fakeType, "foo").
WithTenancy(resource.DefaultNamespacedTenancy()).
Build()
res2 := resourcetest.Resource(fakeV2Type, "bar").
WithTenancy(resource.DefaultNamespacedTenancy()).
Build()
// create the reconciler that will read the desired resource
// from both the resource service client and the cache client.
reconciler := newMemCheckReconciler(t)
// Create the v1 watch list client to be returned when the controller runner
// calls WatchList on the v1 testing type.
v1WatchListClient := watchListEvents(t, &pbresource.WatchEvent{
Operation: pbresource.WatchEvent_OPERATION_UPSERT,
Resource: res1,
})
// Create the v2 watch list client to be returned when the controller runner
// calls WatchList on the v2 testing type.
v2WatchListClient := watchListEvents(t, nil, &pbresource.WatchEvent{
Operation: pbresource.WatchEvent_OPERATION_UPSERT,
Resource: res2,
})
// Create the mock resource service client
mres := mockpbresource.NewResourceServiceClient(t)
// Setup the expectation for the controller runner to issue a WatchList
// request for the managed type (fake v2 type)
mres.EXPECT().
WatchList(mock.Anything, &pbresource.WatchListRequest{
Type: fakeV2Type,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
Namespace: storage.Wildcard,
},
}).
Return(v2WatchListClient, nil).
Once()
// Setup the expectation for the controller runner to issue a WatchList
// request for the secondary Watch type (fake v1 type)
mres.EXPECT().
WatchList(mock.Anything, &pbresource.WatchListRequest{
Type: fakeType,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
Namespace: storage.Wildcard,
},
}).
Return(v1WatchListClient, nil).
Once()
// The cloning resource clients will forward actual calls onto the main resource service client.
// Here we are configuring the service mock to return either of the resources depending on the
// id present in the request.
mres.EXPECT().
Read(mock.Anything, mock.Anything).
RunAndReturn(func(_ context.Context, req *pbresource.ReadRequest, opts ...grpc.CallOption) (*pbresource.ReadResponse, error) {
res := res2
if resource.EqualID(res1.Id, req.Id) {
res = res1
}
return &pbresource.ReadResponse{Resource: res}, nil
}).
Times(0)
// create the test controller
ctl := NewController("test", fakeV2Type).
WithWatch(fakeType, reconciler.MapToNothing).
WithReconciler(reconciler)
// create the controller manager and register our test controller
manager := NewManager(mres, testutil.Logger(t))
manager.Register(ctl)
// run the controller manager
manager.SetRaftLeader(true)
go manager.Run(ctx)
// All future assertions should easily be able to run within 5s although they
// should typically run a couple orders of magnitude faster.
timeLimitedCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
t.Cleanup(cancel)
// validate that the v2 resource type event was seen and that the
// cache and the resource service client return cloned resources
reconciler.checkReconcileResult(t, timeLimitedCtx, res2)
// Validate that the dependency mapper's resource and cache clients return
// cloned resources.
reconciler.checkMapResult(t, timeLimitedCtx, res1)
}
// TestRunnerSharedMemoryCache is mainly testing to ensure that resources
// within the cache are shared with the resource service and have not been
// cloned.
func TestControllerRunnerSharedMemoryCache(t *testing.T) {
ctx := testutil.TestContext(t)
// create resource to use during the test
res := resourcetest.Resource(fakeV2Type, "bar").
WithTenancy(resource.DefaultNamespacedTenancy()).
Build()
// create the reconciler that will read the desired resource
// from both the resource service client and the cache client.
reconciler := newMemCheckReconciler(t)
// Create the v2 watch list client to be returned when the controller runner
// calls WatchList on the v2 testing type.
v2WatchListClient := watchListEvents(t, nil, &pbresource.WatchEvent{
Operation: pbresource.WatchEvent_OPERATION_UPSERT,
Resource: res,
})
// Create the mock resource service client
mres := mockpbresource.NewResourceServiceClient(t)
// Setup the expectation for the controller runner to issue a WatchList
// request for the managed type (fake v2 type)
mres.EXPECT().
WatchList(mock.Anything, &pbresource.WatchListRequest{
Type: fakeV2Type,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
Namespace: storage.Wildcard,
},
}).
Return(v2WatchListClient, nil).
Once()
// The cloning resource clients will forward actual calls onto the main resource service client.
// Here we are configuring the service mock to return our singular resource always.
mres.EXPECT().
Read(mock.Anything, mock.Anything).
Return(&pbresource.ReadResponse{Resource: res}, nil).
Times(0)
// create the test controller
ctl := NewController("test", fakeV2Type).
WithReconciler(reconciler)
runner := newControllerRunner(ctl, mres, testutil.Logger(t))
go runner.run(ctx)
// Wait for reconcile to be called before we check the values in
// the cache. This will also validate that the resource service client
// and cache client given to the reconciler cloned the resource but
// that is tested more thoroughly in another test and isn't of primary
// concern here.
reconciler.checkReconcileResult(t, ctx, res)
// Now validate that the cache hold the same resource pointer as the original data
actual, err := runner.cache.Get(fakeV2Type, "id", res.Id)
require.NoError(t, err)
require.Same(t, res, actual)
}