mirror of https://github.com/hashicorp/consul
378 lines
11 KiB
Go
378 lines
11 KiB
Go
|
// Copyright 2018 Envoyproxy Authors
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
// Package server provides an implementation of a streaming xDS server.
|
||
|
package server
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"strconv"
|
||
|
"sync/atomic"
|
||
|
|
||
|
"github.com/gogo/protobuf/proto"
|
||
|
"github.com/gogo/protobuf/types"
|
||
|
"google.golang.org/grpc/codes"
|
||
|
"google.golang.org/grpc/status"
|
||
|
|
||
|
"github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||
|
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
|
||
|
"github.com/envoyproxy/go-control-plane/pkg/cache"
|
||
|
)
|
||
|
|
||
|
// Server is a collection of handlers for streaming discovery requests.
|
||
|
type Server interface {
|
||
|
v2.EndpointDiscoveryServiceServer
|
||
|
v2.ClusterDiscoveryServiceServer
|
||
|
v2.RouteDiscoveryServiceServer
|
||
|
v2.ListenerDiscoveryServiceServer
|
||
|
discovery.AggregatedDiscoveryServiceServer
|
||
|
|
||
|
// Fetch is the universal fetch method.
|
||
|
Fetch(context.Context, *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error)
|
||
|
}
|
||
|
|
||
|
// Callbacks is a collection of callbacks inserted into the server operation.
|
||
|
// The callbacks are invoked synchronously.
|
||
|
type Callbacks interface {
|
||
|
// OnStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS).
|
||
|
OnStreamOpen(int64, string)
|
||
|
// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
|
||
|
OnStreamClosed(int64)
|
||
|
// OnStreamRequest is called once a request is received on a stream.
|
||
|
OnStreamRequest(int64, *v2.DiscoveryRequest)
|
||
|
// OnStreamResponse is called immediately prior to sending a response on a stream.
|
||
|
OnStreamResponse(int64, *v2.DiscoveryRequest, *v2.DiscoveryResponse)
|
||
|
// OnFetchRequest is called for each Fetch request
|
||
|
OnFetchRequest(*v2.DiscoveryRequest)
|
||
|
// OnFetchResponse is called immediately prior to sending a response.
|
||
|
OnFetchResponse(*v2.DiscoveryRequest, *v2.DiscoveryResponse)
|
||
|
}
|
||
|
|
||
|
// NewServer creates handlers from a config watcher and an optional logger.
|
||
|
func NewServer(config cache.Cache, callbacks Callbacks) Server {
|
||
|
return &server{cache: config, callbacks: callbacks}
|
||
|
}
|
||
|
|
||
|
type server struct {
|
||
|
cache cache.Cache
|
||
|
callbacks Callbacks
|
||
|
|
||
|
// streamCount for counting bi-di streams
|
||
|
streamCount int64
|
||
|
}
|
||
|
|
||
|
type stream interface {
|
||
|
Send(*v2.DiscoveryResponse) error
|
||
|
Recv() (*v2.DiscoveryRequest, error)
|
||
|
}
|
||
|
|
||
|
// watches for all xDS resource types
|
||
|
type watches struct {
|
||
|
endpoints chan cache.Response
|
||
|
clusters chan cache.Response
|
||
|
routes chan cache.Response
|
||
|
listeners chan cache.Response
|
||
|
|
||
|
endpointCancel func()
|
||
|
clusterCancel func()
|
||
|
routeCancel func()
|
||
|
listenerCancel func()
|
||
|
|
||
|
endpointNonce string
|
||
|
clusterNonce string
|
||
|
routeNonce string
|
||
|
listenerNonce string
|
||
|
}
|
||
|
|
||
|
// Cancel all watches
|
||
|
func (values watches) Cancel() {
|
||
|
if values.endpointCancel != nil {
|
||
|
values.endpointCancel()
|
||
|
}
|
||
|
if values.clusterCancel != nil {
|
||
|
values.clusterCancel()
|
||
|
}
|
||
|
if values.routeCancel != nil {
|
||
|
values.routeCancel()
|
||
|
}
|
||
|
if values.listenerCancel != nil {
|
||
|
values.listenerCancel()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func createResponse(resp *cache.Response, typeURL string) (*v2.DiscoveryResponse, error) {
|
||
|
if resp == nil {
|
||
|
return nil, errors.New("missing response")
|
||
|
}
|
||
|
resources := make([]types.Any, len(resp.Resources))
|
||
|
for i := 0; i < len(resp.Resources); i++ {
|
||
|
data, err := proto.Marshal(resp.Resources[i])
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
resources[i] = types.Any{
|
||
|
TypeUrl: typeURL,
|
||
|
Value: data,
|
||
|
}
|
||
|
}
|
||
|
out := &v2.DiscoveryResponse{
|
||
|
VersionInfo: resp.Version,
|
||
|
Resources: resources,
|
||
|
TypeUrl: typeURL,
|
||
|
}
|
||
|
return out, nil
|
||
|
}
|
||
|
|
||
|
// process handles a bi-di stream request
|
||
|
func (s *server) process(stream stream, reqCh <-chan *v2.DiscoveryRequest, defaultTypeURL string) error {
|
||
|
// increment stream count
|
||
|
streamID := atomic.AddInt64(&s.streamCount, 1)
|
||
|
|
||
|
// unique nonce generator for req-resp pairs per xDS stream; the server
|
||
|
// ignores stale nonces. nonce is only modified within send() function.
|
||
|
var streamNonce int64
|
||
|
|
||
|
// a collection of watches per request type
|
||
|
var values watches
|
||
|
defer func() {
|
||
|
values.Cancel()
|
||
|
if s.callbacks != nil {
|
||
|
s.callbacks.OnStreamClosed(streamID)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
// sends a response by serializing to protobuf Any
|
||
|
send := func(resp cache.Response, typeURL string) (string, error) {
|
||
|
out, err := createResponse(&resp, typeURL)
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
|
||
|
// increment nonce
|
||
|
streamNonce = streamNonce + 1
|
||
|
out.Nonce = strconv.FormatInt(streamNonce, 10)
|
||
|
if s.callbacks != nil {
|
||
|
s.callbacks.OnStreamResponse(streamID, &resp.Request, out)
|
||
|
}
|
||
|
return out.Nonce, stream.Send(out)
|
||
|
}
|
||
|
|
||
|
if s.callbacks != nil {
|
||
|
s.callbacks.OnStreamOpen(streamID, defaultTypeURL)
|
||
|
}
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
// config watcher can send the requested resources types in any order
|
||
|
case resp, more := <-values.endpoints:
|
||
|
if !more {
|
||
|
return status.Errorf(codes.Unavailable, "endpoints watch failed")
|
||
|
}
|
||
|
nonce, err := send(resp, cache.EndpointType)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
values.endpointNonce = nonce
|
||
|
|
||
|
case resp, more := <-values.clusters:
|
||
|
if !more {
|
||
|
return status.Errorf(codes.Unavailable, "clusters watch failed")
|
||
|
}
|
||
|
nonce, err := send(resp, cache.ClusterType)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
values.clusterNonce = nonce
|
||
|
|
||
|
case resp, more := <-values.routes:
|
||
|
if !more {
|
||
|
return status.Errorf(codes.Unavailable, "routes watch failed")
|
||
|
}
|
||
|
nonce, err := send(resp, cache.RouteType)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
values.routeNonce = nonce
|
||
|
|
||
|
case resp, more := <-values.listeners:
|
||
|
if !more {
|
||
|
return status.Errorf(codes.Unavailable, "listeners watch failed")
|
||
|
}
|
||
|
nonce, err := send(resp, cache.ListenerType)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
values.listenerNonce = nonce
|
||
|
|
||
|
case req, more := <-reqCh:
|
||
|
// input stream ended or errored out
|
||
|
if !more {
|
||
|
return nil
|
||
|
}
|
||
|
if req == nil {
|
||
|
return status.Errorf(codes.Unavailable, "empty request")
|
||
|
}
|
||
|
|
||
|
// nonces can be reused across streams; we verify nonce only if nonce is not initialized
|
||
|
nonce := req.GetResponseNonce()
|
||
|
|
||
|
// type URL is required for ADS but is implicit for xDS
|
||
|
if defaultTypeURL == cache.AnyType {
|
||
|
if req.TypeUrl == "" {
|
||
|
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
|
||
|
}
|
||
|
} else if req.TypeUrl == "" {
|
||
|
req.TypeUrl = defaultTypeURL
|
||
|
}
|
||
|
|
||
|
if s.callbacks != nil {
|
||
|
s.callbacks.OnStreamRequest(streamID, req)
|
||
|
}
|
||
|
|
||
|
// cancel existing watches to (re-)request a newer version
|
||
|
switch {
|
||
|
case req.TypeUrl == cache.EndpointType && (values.endpointNonce == "" || values.endpointNonce == nonce):
|
||
|
if values.endpointCancel != nil {
|
||
|
values.endpointCancel()
|
||
|
}
|
||
|
values.endpoints, values.endpointCancel = s.cache.CreateWatch(*req)
|
||
|
case req.TypeUrl == cache.ClusterType && (values.clusterNonce == "" || values.clusterNonce == nonce):
|
||
|
if values.clusterCancel != nil {
|
||
|
values.clusterCancel()
|
||
|
}
|
||
|
values.clusters, values.clusterCancel = s.cache.CreateWatch(*req)
|
||
|
case req.TypeUrl == cache.RouteType && (values.routeNonce == "" || values.routeNonce == nonce):
|
||
|
if values.routeCancel != nil {
|
||
|
values.routeCancel()
|
||
|
}
|
||
|
values.routes, values.routeCancel = s.cache.CreateWatch(*req)
|
||
|
case req.TypeUrl == cache.ListenerType && (values.listenerNonce == "" || values.listenerNonce == nonce):
|
||
|
if values.listenerCancel != nil {
|
||
|
values.listenerCancel()
|
||
|
}
|
||
|
values.listeners, values.listenerCancel = s.cache.CreateWatch(*req)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// handler converts a blocking read call to channels and initiates stream processing
|
||
|
func (s *server) handler(stream stream, typeURL string) error {
|
||
|
// a channel for receiving incoming requests
|
||
|
reqCh := make(chan *v2.DiscoveryRequest)
|
||
|
reqStop := int32(0)
|
||
|
go func() {
|
||
|
for {
|
||
|
req, err := stream.Recv()
|
||
|
if atomic.LoadInt32(&reqStop) != 0 {
|
||
|
return
|
||
|
}
|
||
|
if err != nil {
|
||
|
close(reqCh)
|
||
|
return
|
||
|
}
|
||
|
reqCh <- req
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
err := s.process(stream, reqCh, typeURL)
|
||
|
|
||
|
// prevents writing to a closed channel if send failed on blocked recv
|
||
|
// TODO(kuat) figure out how to unblock recv through gRPC API
|
||
|
atomic.StoreInt32(&reqStop, 1)
|
||
|
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (s *server) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
|
||
|
return s.handler(stream, cache.AnyType)
|
||
|
}
|
||
|
|
||
|
func (s *server) StreamEndpoints(stream v2.EndpointDiscoveryService_StreamEndpointsServer) error {
|
||
|
return s.handler(stream, cache.EndpointType)
|
||
|
}
|
||
|
|
||
|
func (s *server) StreamClusters(stream v2.ClusterDiscoveryService_StreamClustersServer) error {
|
||
|
return s.handler(stream, cache.ClusterType)
|
||
|
}
|
||
|
|
||
|
func (s *server) StreamRoutes(stream v2.RouteDiscoveryService_StreamRoutesServer) error {
|
||
|
return s.handler(stream, cache.RouteType)
|
||
|
}
|
||
|
|
||
|
func (s *server) StreamListeners(stream v2.ListenerDiscoveryService_StreamListenersServer) error {
|
||
|
return s.handler(stream, cache.ListenerType)
|
||
|
}
|
||
|
|
||
|
// Fetch is the universal fetch method.
|
||
|
func (s *server) Fetch(ctx context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) {
|
||
|
if s.callbacks != nil {
|
||
|
s.callbacks.OnFetchRequest(req)
|
||
|
}
|
||
|
resp, err := s.cache.Fetch(ctx, *req)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
out, err := createResponse(resp, req.TypeUrl)
|
||
|
if s.callbacks != nil {
|
||
|
s.callbacks.OnFetchResponse(req, out)
|
||
|
}
|
||
|
return out, err
|
||
|
}
|
||
|
|
||
|
func (s *server) FetchEndpoints(ctx context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) {
|
||
|
if req == nil {
|
||
|
return nil, status.Errorf(codes.Unavailable, "empty request")
|
||
|
}
|
||
|
req.TypeUrl = cache.EndpointType
|
||
|
return s.Fetch(ctx, req)
|
||
|
}
|
||
|
|
||
|
func (s *server) FetchClusters(ctx context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) {
|
||
|
if req == nil {
|
||
|
return nil, status.Errorf(codes.Unavailable, "empty request")
|
||
|
}
|
||
|
req.TypeUrl = cache.ClusterType
|
||
|
return s.Fetch(ctx, req)
|
||
|
}
|
||
|
|
||
|
func (s *server) FetchRoutes(ctx context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) {
|
||
|
if req == nil {
|
||
|
return nil, status.Errorf(codes.Unavailable, "empty request")
|
||
|
}
|
||
|
req.TypeUrl = cache.RouteType
|
||
|
return s.Fetch(ctx, req)
|
||
|
}
|
||
|
|
||
|
func (s *server) FetchListeners(ctx context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) {
|
||
|
if req == nil {
|
||
|
return nil, status.Errorf(codes.Unavailable, "empty request")
|
||
|
}
|
||
|
req.TypeUrl = cache.ListenerType
|
||
|
return s.Fetch(ctx, req)
|
||
|
}
|
||
|
|
||
|
func (s *server) IncrementalAggregatedResources(_ discovery.AggregatedDiscoveryService_IncrementalAggregatedResourcesServer) error {
|
||
|
return errors.New("not implemented")
|
||
|
}
|
||
|
|
||
|
func (s *server) IncrementalClusters(_ v2.ClusterDiscoveryService_IncrementalClustersServer) error {
|
||
|
return errors.New("not implemented")
|
||
|
}
|
||
|
|
||
|
func (s *server) IncrementalRoutes(_ v2.RouteDiscoveryService_IncrementalRoutesServer) error {
|
||
|
return errors.New("not implemented")
|
||
|
}
|