You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/xds/testing.go

222 lines
5.4 KiB

package xds
import (
"context"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyauth "github.com/envoyproxy/go-control-plane/envoy/service/auth/v2"
envoytype "github.com/envoyproxy/go-control-plane/envoy/type"
"github.com/mitchellh/go-testing-interface"
"google.golang.org/grpc/metadata"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/xds/proxysupport"
)
// TestADSStream mocks
// discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer to allow
// testing ADS handler.
type TestADSStream struct {
ctx context.Context
sendCh chan *envoy.DiscoveryResponse
recvCh chan *envoy.DiscoveryRequest
}
// NewTestADSStream makes a new TestADSStream
func NewTestADSStream(t testing.T, ctx context.Context) *TestADSStream {
return &TestADSStream{
ctx: ctx,
sendCh: make(chan *envoy.DiscoveryResponse, 1),
recvCh: make(chan *envoy.DiscoveryRequest, 1),
}
}
// Send implements ADSStream
func (s *TestADSStream) Send(r *envoy.DiscoveryResponse) error {
s.sendCh <- r
return nil
}
// Recv implements ADSStream
func (s *TestADSStream) Recv() (*envoy.DiscoveryRequest, error) {
r := <-s.recvCh
if r == nil {
return nil, io.EOF
}
return r, nil
}
// SetHeader implements ADSStream
func (s *TestADSStream) SetHeader(metadata.MD) error {
return nil
}
// SendHeader implements ADSStream
func (s *TestADSStream) SendHeader(metadata.MD) error {
return nil
}
// SetTrailer implements ADSStream
func (s *TestADSStream) SetTrailer(metadata.MD) {
}
// Context implements ADSStream
func (s *TestADSStream) Context() context.Context {
return s.ctx
}
// SendMsg implements ADSStream
func (s *TestADSStream) SendMsg(m interface{}) error {
return nil
}
// RecvMsg implements ADSStream
func (s *TestADSStream) RecvMsg(m interface{}) error {
return nil
}
type configState struct {
lastNonce, lastVersion, acceptedVersion string
}
// TestEnvoy is a helper to simulate Envoy ADS requests.
type TestEnvoy struct {
sync.Mutex
stream *TestADSStream
proxyID string
token string
state map[string]configState
ctx context.Context
cancel func()
}
// NewTestEnvoy creates a TestEnvoy instance.
func NewTestEnvoy(t testing.T, proxyID, token string) *TestEnvoy {
ctx, cancel := context.WithCancel(context.Background())
// If a token is given, attach it to the context in the same way gRPC attaches
// metadata in calls and stream contexts.
if token != "" {
ctx = metadata.NewIncomingContext(ctx,
metadata.Pairs("x-consul-token", token))
}
return &TestEnvoy{
stream: NewTestADSStream(t, ctx),
state: make(map[string]configState),
ctx: ctx,
cancel: cancel,
proxyID: proxyID,
token: token,
}
}
func hexString(v uint64) string {
if v == 0 {
return ""
}
return fmt.Sprintf("%08x", v)
}
func stringToEnvoyVersion(vs string) (*envoytype.SemanticVersion, bool) {
parts := strings.Split(vs, ".")
if len(parts) != 3 {
return nil, false
}
major, err := strconv.Atoi(parts[0])
if err != nil {
return nil, false
}
minor, err := strconv.Atoi(parts[1])
if err != nil {
return nil, false
}
patch, err := strconv.Atoi(parts[2])
if err != nil {
return nil, false
}
return &envoytype.SemanticVersion{
MajorNumber: uint32(major),
MinorNumber: uint32(minor),
Patch: uint32(patch),
}, true
}
// SendReq sends a request from the test server.
func (e *TestEnvoy) SendReq(t testing.T, typeURL string, version, nonce uint64) {
e.Lock()
defer e.Unlock()
ev, valid := stringToEnvoyVersion(proxysupport.EnvoyVersions[0])
if !valid {
t.Fatal("envoy version is not valid: %s", proxysupport.EnvoyVersions[0])
}
req := &envoy.DiscoveryRequest{
VersionInfo: hexString(version),
Node: &envoycore.Node{
Id: e.proxyID,
Cluster: e.proxyID,
UserAgentName: "envoy",
UserAgentVersionType: &envoycore.Node_UserAgentBuildVersion{
UserAgentBuildVersion: &envoycore.BuildVersion{
Version: ev,
},
},
},
ResponseNonce: hexString(nonce),
TypeUrl: typeURL,
}
select {
case e.stream.recvCh <- req:
case <-time.After(50 * time.Millisecond):
t.Fatalf("send to stream blocked for too long")
}
}
// Close closes the client and cancels it's request context.
func (e *TestEnvoy) Close() error {
e.Lock()
defer e.Unlock()
// unblock the recv chan to simulate recv error when client disconnects
if e.stream != nil && e.stream.recvCh != nil {
close(e.stream.recvCh)
e.stream.recvCh = nil
}
if e.cancel != nil {
e.cancel()
}
return nil
}
// TestCheckRequest creates an envoyauth.CheckRequest with the source and
// destination service names.
func TestCheckRequest(t testing.T, source, dest string) *envoyauth.CheckRequest {
return &envoyauth.CheckRequest{
Attributes: &envoyauth.AttributeContext{
Source: makeAttributeContextPeer(t, source),
Destination: makeAttributeContextPeer(t, dest),
},
}
}
func makeAttributeContextPeer(t testing.T, svc string) *envoyauth.AttributeContext_Peer {
spiffeID := connect.TestSpiffeIDService(t, svc)
return &envoyauth.AttributeContext_Peer{
// We don't care about IP for now might later though
Address: makeAddress("10.0.0.1", 1234),
// Note we don't set Service since that is an advisory only mechanism in
// Envoy triggered by self-declared headers. We rely on the actual TLS Peer
// identity.
Principal: spiffeID.URI().String(),
}
}