You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/submatview/streaming_test.go

196 lines
4.9 KiB

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package submatview
import (
"context"
"fmt"
"sync"
"github.com/hashicorp/consul/proto/private/pbcommon"
"google.golang.org/grpc"
"github.com/hashicorp/consul/proto/private/pbservice"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
"github.com/hashicorp/consul/types"
)
// TestStreamingClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type TestStreamingClient struct {
expectedNamespace string
subClients []*subscribeClient
lock sync.RWMutex
events []eventOrErr
}
type eventOrErr struct {
Err error
Event *pbsubscribe.Event
}
func NewTestStreamingClient(ns string) *TestStreamingClient {
return &TestStreamingClient{expectedNamespace: ns}
}
func (s *TestStreamingClient) Subscribe(
ctx context.Context,
req *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
if ns := req.GetNamedSubject().GetNamespace(); ns != s.expectedNamespace {
return nil, fmt.Errorf("wrong SubscribeRequest.NamedSubject.Namespace %v, expected %v",
ns, s.expectedNamespace)
}
c := &subscribeClient{
events: make(chan eventOrErr, 32),
ctx: ctx,
}
s.lock.Lock()
s.subClients = append(s.subClients, c)
for _, event := range s.events {
c.events <- event
}
s.lock.Unlock()
return c, nil
}
type subscribeClient struct {
grpc.ClientStream
events chan eventOrErr
ctx context.Context
}
func (s *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
s.lock.Lock()
for _, e := range events {
s.events = append(s.events, eventOrErr{Event: e})
for _, c := range s.subClients {
c.events <- eventOrErr{Event: e}
}
}
s.lock.Unlock()
}
func (s *TestStreamingClient) QueueErr(err error) {
s.lock.Lock()
s.events = append(s.events, eventOrErr{Err: err})
for _, c := range s.subClients {
c.events <- eventOrErr{Err: err}
}
s.lock.Unlock()
}
func (c *subscribeClient) Recv() (*pbsubscribe.Event, error) {
select {
case eoe := <-c.events:
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-c.ctx.Done():
return nil, c.ctx.Err()
}
}
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
}
}
func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
return &pbsubscribe.Event{
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
}
}
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
ID: string(nodeID),
Node: node,
Address: addr,
Datacenter: "dc1",
RaftIndex: &pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
RaftIndex: &pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
}
}
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Deregister,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: node,
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
RaftIndex: &pbcommon.RaftIndex{
// The original insertion index since a delete doesn't update
// this. This magic value came from state store tests where we
// setup at index 10 and then mutate at index 100. It can be
// modified by the caller later and makes it easier than having
// yet another argument in the common case.
CreateIndex: 10,
ModifyIndex: 10,
},
},
},
},
},
}
}
func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
events := make([]*pbsubscribe.Event, len(evs)+1)
events[0] = first
for i := range evs {
events[i+1] = evs[i]
}
return &pbsubscribe.Event{
Index: first.Index,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{Events: events},
},
}
}