|
|
|
@ -190,8 +190,8 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
|
|
|
|
var nonce uint64
|
|
|
|
|
|
|
|
|
|
// xDS works with versions of configs. Internally we don't have a consistent
|
|
|
|
|
// version. We could just hash the config since versions don't have to be
|
|
|
|
|
// ordered as far as I can tell, but it's cheaper just to increment a counter
|
|
|
|
|
// version. We could hash the config since versions don't have to be
|
|
|
|
|
// ordered as far as I can tell, but it is cheaper to increment a counter
|
|
|
|
|
// every time we observe a new config since the upstream proxycfg package only
|
|
|
|
|
// delivers updates when there are actual changes.
|
|
|
|
|
var configVersion uint64
|
|
|
|
@ -209,12 +209,12 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
|
|
|
|
|
|
|
|
|
// Configure handlers for each type of request
|
|
|
|
|
handlers := map[string]*xDSType{
|
|
|
|
|
EndpointType: &xDSType{
|
|
|
|
|
EndpointType: {
|
|
|
|
|
typeURL: EndpointType,
|
|
|
|
|
resources: s.endpointsFromSnapshot,
|
|
|
|
|
stream: stream,
|
|
|
|
|
},
|
|
|
|
|
ClusterType: &xDSType{
|
|
|
|
|
ClusterType: {
|
|
|
|
|
typeURL: ClusterType,
|
|
|
|
|
resources: s.clustersFromSnapshot,
|
|
|
|
|
stream: stream,
|
|
|
|
@ -223,12 +223,12 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
|
|
|
|
return cfgSnap.Kind == structs.ServiceKindMeshGateway
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
RouteType: &xDSType{
|
|
|
|
|
RouteType: {
|
|
|
|
|
typeURL: RouteType,
|
|
|
|
|
resources: routesFromSnapshot,
|
|
|
|
|
stream: stream,
|
|
|
|
|
},
|
|
|
|
|
ListenerType: &xDSType{
|
|
|
|
|
ListenerType: {
|
|
|
|
|
typeURL: ListenerType,
|
|
|
|
|
resources: s.listenersFromSnapshot,
|
|
|
|
|
stream: stream,
|
|
|
|
@ -245,8 +245,7 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
|
|
|
|
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
token := tokenFromStream(stream)
|
|
|
|
|
rule, err := s.ResolveToken(token)
|
|
|
|
|
rule, err := s.ResolveToken(tokenFromContext(stream.Context()))
|
|
|
|
|
|
|
|
|
|
if acl.IsErrNotFound(err) {
|
|
|
|
|
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
|
|
|
|
@ -397,7 +396,7 @@ func (t *xDSType) SendIfNew(cfgSnap *proxycfg.ConfigSnapshot, version uint64, no
|
|
|
|
|
// Already sent this version
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
resources, err := t.resources(cfgSnap, tokenFromStream(t.stream))
|
|
|
|
|
resources, err := t.resources(cfgSnap, tokenFromContext(t.stream.Context()))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -436,10 +435,6 @@ func (t *xDSType) SendIfNew(cfgSnap *proxycfg.ConfigSnapshot, version uint64, no
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func tokenFromStream(stream ADSStream) string {
|
|
|
|
|
return tokenFromContext(stream.Context())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func tokenFromContext(ctx context.Context) string {
|
|
|
|
|
md, ok := metadata.FromIncomingContext(ctx)
|
|
|
|
|
if !ok {
|
|
|
|
|