|
|
@ -24,6 +24,7 @@ import ( |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
const ControllerName = "consul.io/xds-controller" |
|
|
|
const ControllerName = "consul.io/xds-controller" |
|
|
|
|
|
|
|
|
|
|
|
const defaultTenancy = "default" |
|
|
|
const defaultTenancy = "default" |
|
|
|
|
|
|
|
|
|
|
|
func Controller(endpointsMapper *bimapper.Mapper, updater ProxyUpdater, fetcher TrustBundleFetcher, leafCertManager *leafcert.Manager, leafMapper *LeafMapper, leafCancels *LeafCancels, datacenter string) controller.Controller { |
|
|
|
func Controller(endpointsMapper *bimapper.Mapper, updater ProxyUpdater, fetcher TrustBundleFetcher, leafCertManager *leafcert.Manager, leafMapper *LeafMapper, leafCancels *LeafCancels, datacenter string) controller.Controller { |
|
|
@ -158,39 +159,52 @@ func (r *xdsReconciler) Reconcile(ctx context.Context, rt controller.Runtime, re |
|
|
|
|
|
|
|
|
|
|
|
// Step 1: Resolve the reference by looking up the ServiceEndpoints.
|
|
|
|
// Step 1: Resolve the reference by looking up the ServiceEndpoints.
|
|
|
|
// serviceEndpoints will not be nil unless there is an error.
|
|
|
|
// serviceEndpoints will not be nil unless there is an error.
|
|
|
|
serviceEndpoints, err := getServiceEndpoints(ctx, rt, endpointRef.Id) |
|
|
|
//
|
|
|
|
if err != nil { |
|
|
|
// TODO(rb/v2): note we should expose a flag on the endpointRef indicating if the user
|
|
|
|
rt.Logger.Error("error reading service endpoint", "id", endpointRef.Id, "error", err) |
|
|
|
// wants the absence of an Endpoints to imply returning a slice of no data, vs failing outright.
|
|
|
|
// Set the status.
|
|
|
|
// In xdsv1 we call this the "allowEmpty" semantic. Here we are assuming "allowEmpty=true"
|
|
|
|
statusCondition = status.ConditionRejectedErrorReadingEndpoints(status.KeyFromID(endpointRef.Id), err.Error()) |
|
|
|
var psEndpoints *pbproxystate.Endpoints |
|
|
|
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition) |
|
|
|
if endpointRef.Id != nil { |
|
|
|
|
|
|
|
serviceEndpoints, err := getServiceEndpoints(ctx, rt, endpointRef.Id) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
rt.Logger.Error("error reading service endpoint", "id", endpointRef.Id, "error", err) |
|
|
|
|
|
|
|
// Set the status.
|
|
|
|
|
|
|
|
statusCondition = status.ConditionRejectedErrorReadingEndpoints(status.KeyFromID(endpointRef.Id), err.Error()) |
|
|
|
|
|
|
|
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition) |
|
|
|
|
|
|
|
|
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Step 2: Translate it into pbproxystate.Endpoints.
|
|
|
|
// Step 2: Translate it into pbproxystate.Endpoints.
|
|
|
|
psEndpoints, err := generateProxyStateEndpoints(serviceEndpoints, endpointRef.Port) |
|
|
|
psEndpoints, err = generateProxyStateEndpoints(serviceEndpoints, endpointRef.Port) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
rt.Logger.Error("error translating service endpoints to proxy state endpoints", "endpoint", endpointRef.Id, "error", err) |
|
|
|
rt.Logger.Error("error translating service endpoints to proxy state endpoints", "endpoint", endpointRef.Id, "error", err) |
|
|
|
|
|
|
|
|
|
|
|
// Set the status.
|
|
|
|
// Set the status.
|
|
|
|
statusCondition = status.ConditionRejectedCreatingProxyStateEndpoints(status.KeyFromID(endpointRef.Id), err.Error()) |
|
|
|
statusCondition = status.ConditionRejectedCreatingProxyStateEndpoints(status.KeyFromID(endpointRef.Id), err.Error()) |
|
|
|
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition) |
|
|
|
status.WriteStatusIfChanged(ctx, rt, pstResource, statusCondition) |
|
|
|
|
|
|
|
|
|
|
|
return err |
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
psEndpoints = &pbproxystate.Endpoints{} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Step 3: Add the endpoints to ProxyState.
|
|
|
|
// Step 3: Add the endpoints to ProxyState.
|
|
|
|
proxyStateTemplate.Template.ProxyState.Endpoints[xdsClusterName] = psEndpoints |
|
|
|
proxyStateTemplate.Template.ProxyState.Endpoints[xdsClusterName] = psEndpoints |
|
|
|
|
|
|
|
|
|
|
|
// Track all the endpoints that are used by this ProxyStateTemplate, so we can use this for step 4.
|
|
|
|
if endpointRef.Id != nil { |
|
|
|
endpointResourceRef := resource.Reference(endpointRef.Id, "") |
|
|
|
// Track all the endpoints that are used by this ProxyStateTemplate, so we can use this for step 4.
|
|
|
|
endpointsInProxyStateTemplate = append(endpointsInProxyStateTemplate, endpointResourceRef) |
|
|
|
endpointResourceRef := resource.Reference(endpointRef.Id, "") |
|
|
|
|
|
|
|
endpointsInProxyStateTemplate = append(endpointsInProxyStateTemplate, endpointResourceRef) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Step 4: Track relationships between ProxyStateTemplates and ServiceEndpoints.
|
|
|
|
// Step 4: Track relationships between ProxyStateTemplates and ServiceEndpoints.
|
|
|
|
r.endpointsMapper.TrackItem(req.ID, endpointsInProxyStateTemplate) |
|
|
|
r.endpointsMapper.TrackItem(req.ID, endpointsInProxyStateTemplate) |
|
|
|
|
|
|
|
if len(endpointsInProxyStateTemplate) == 0 { |
|
|
|
|
|
|
|
r.endpointsMapper.UntrackItem(req.ID) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Iterate through leaf certificate references.
|
|
|
|
// Iterate through leaf certificate references.
|
|
|
|
// For each leaf certificate reference, the controller should:
|
|
|
|
// For each leaf certificate reference, the controller should:
|
|
|
|