mirror of https://github.com/hashicorp/consul
181 lines
4.4 KiB
Go
181 lines
4.4 KiB
Go
package xds
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
|
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
|
envoyauth "github.com/envoyproxy/go-control-plane/envoy/service/auth/v2"
|
|
"github.com/mitchellh/go-testing-interface"
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
"github.com/hashicorp/consul/agent/connect"
|
|
)
|
|
|
|
// TestADSStream mocks
|
|
// discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer to allow
|
|
// testing ADS handler.
|
|
type TestADSStream struct {
|
|
ctx context.Context
|
|
sendCh chan *envoy.DiscoveryResponse
|
|
recvCh chan *envoy.DiscoveryRequest
|
|
}
|
|
|
|
// NewTestADSStream makes a new TestADSStream
|
|
func NewTestADSStream(t testing.T, ctx context.Context) *TestADSStream {
|
|
return &TestADSStream{
|
|
ctx: ctx,
|
|
sendCh: make(chan *envoy.DiscoveryResponse, 1),
|
|
recvCh: make(chan *envoy.DiscoveryRequest, 1),
|
|
}
|
|
}
|
|
|
|
// Send implements ADSStream
|
|
func (s *TestADSStream) Send(r *envoy.DiscoveryResponse) error {
|
|
s.sendCh <- r
|
|
return nil
|
|
}
|
|
|
|
// Recv implements ADSStream
|
|
func (s *TestADSStream) Recv() (*envoy.DiscoveryRequest, error) {
|
|
r := <-s.recvCh
|
|
if r == nil {
|
|
return nil, io.EOF
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
// SetHeader implements ADSStream
|
|
func (s *TestADSStream) SetHeader(metadata.MD) error {
|
|
return nil
|
|
}
|
|
|
|
// SendHeader implements ADSStream
|
|
func (s *TestADSStream) SendHeader(metadata.MD) error {
|
|
return nil
|
|
}
|
|
|
|
// SetTrailer implements ADSStream
|
|
func (s *TestADSStream) SetTrailer(metadata.MD) {
|
|
}
|
|
|
|
// Context implements ADSStream
|
|
func (s *TestADSStream) Context() context.Context {
|
|
return s.ctx
|
|
}
|
|
|
|
// SendMsg implements ADSStream
|
|
func (s *TestADSStream) SendMsg(m interface{}) error {
|
|
return nil
|
|
}
|
|
|
|
// RecvMsg implements ADSStream
|
|
func (s *TestADSStream) RecvMsg(m interface{}) error {
|
|
return nil
|
|
}
|
|
|
|
type configState struct {
|
|
lastNonce, lastVersion, acceptedVersion string
|
|
}
|
|
|
|
// TestEnvoy is a helper to simulate Envoy ADS requests.
|
|
type TestEnvoy struct {
|
|
sync.Mutex
|
|
stream *TestADSStream
|
|
proxyID string
|
|
token string
|
|
state map[string]configState
|
|
ctx context.Context
|
|
cancel func()
|
|
}
|
|
|
|
// NewTestEnvoy creates a TestEnvoy instance.
|
|
func NewTestEnvoy(t testing.T, proxyID, token string) *TestEnvoy {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
// If a token is given, attach it to the context in the same way gRPC attaches
|
|
// metadata in calls and stream contexts.
|
|
if token != "" {
|
|
ctx = metadata.NewIncomingContext(ctx,
|
|
metadata.Pairs("x-consul-token", token))
|
|
}
|
|
return &TestEnvoy{
|
|
stream: NewTestADSStream(t, ctx),
|
|
state: make(map[string]configState),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
proxyID: proxyID,
|
|
token: token,
|
|
}
|
|
}
|
|
|
|
func hexString(v uint64) string {
|
|
if v == 0 {
|
|
return ""
|
|
}
|
|
return fmt.Sprintf("%08x", v)
|
|
}
|
|
|
|
// SendReq sends a request from the test server.
|
|
func (e *TestEnvoy) SendReq(t testing.T, typeURL string, version, nonce uint64) {
|
|
e.Lock()
|
|
defer e.Unlock()
|
|
|
|
req := &envoy.DiscoveryRequest{
|
|
VersionInfo: hexString(version),
|
|
Node: &envoycore.Node{
|
|
Id: e.proxyID,
|
|
Cluster: e.proxyID,
|
|
},
|
|
ResponseNonce: hexString(nonce),
|
|
TypeUrl: typeURL,
|
|
}
|
|
select {
|
|
case e.stream.recvCh <- req:
|
|
case <-time.After(50 * time.Millisecond):
|
|
t.Fatalf("send to stream blocked for too long")
|
|
}
|
|
}
|
|
|
|
// Close closes the client and cancels it's request context.
|
|
func (e *TestEnvoy) Close() error {
|
|
e.Lock()
|
|
defer e.Unlock()
|
|
|
|
// unblock the recv chan to simulate recv error when client disconnects
|
|
if e.stream != nil && e.stream.recvCh != nil {
|
|
close(e.stream.recvCh)
|
|
e.stream.recvCh = nil
|
|
}
|
|
if e.cancel != nil {
|
|
e.cancel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TestCheckRequest creates an envoyauth.CheckRequest with the source and
|
|
// destination service names.
|
|
func TestCheckRequest(t testing.T, source, dest string) *envoyauth.CheckRequest {
|
|
return &envoyauth.CheckRequest{
|
|
Attributes: &envoyauth.AttributeContext{
|
|
Source: makeAttributeContextPeer(t, source),
|
|
Destination: makeAttributeContextPeer(t, dest),
|
|
},
|
|
}
|
|
}
|
|
|
|
func makeAttributeContextPeer(t testing.T, svc string) *envoyauth.AttributeContext_Peer {
|
|
spiffeID := connect.TestSpiffeIDService(t, svc)
|
|
return &envoyauth.AttributeContext_Peer{
|
|
// We don't care about IP for now might later though
|
|
Address: makeAddress("10.0.0.1", 1234),
|
|
// Note we don't set Service since that is an advisory only mechanism in
|
|
// Envoy triggered by self-declared headers. We rely on the actual TLS Peer
|
|
// identity.
|
|
Principal: spiffeID.URI().String(),
|
|
}
|
|
}
|