mirror of https://github.com/hashicorp/consul
Let MonitorHCPManager handle lifecycle instead of link controller
parent
c76de7ee04
commit
a276e68078
|
@ -21,6 +21,11 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/fullstorydev/grpchan/inprocgrpc"
|
||||
"go.etcd.io/bbolt"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
||||
"github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
@ -31,11 +36,6 @@ import (
|
|||
walmetrics "github.com/hashicorp/raft-wal/metrics"
|
||||
"github.com/hashicorp/raft-wal/verifier"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"go.etcd.io/bbolt"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/blockingquery"
|
||||
|
@ -53,6 +53,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/xdscapacity"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap"
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
logdrop "github.com/hashicorp/consul/agent/log-drop"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
|
@ -889,6 +890,33 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
|
|||
// to enable RPC forwarding.
|
||||
s.grpcLeaderForwarder = flat.LeaderForwarder
|
||||
|
||||
// Start watching HCP Link resource. This creates a channel that we can use to
|
||||
// start and stop HCP manager when appropriate. This needs to be created after
|
||||
// the GRPC services are set up in order for the resource service client to
|
||||
// function. This uses the insecure grpc channel so that it doesn't need to
|
||||
// present a valid ACL token.
|
||||
//
|
||||
// If this fails, HCP linking will not work, but to avoid crashing Consul, we log
|
||||
// the error and continue on.
|
||||
hcpLinkWatchCh, err := hcpctl.NewLinkWatch(
|
||||
&lib.StopChannelContext{StopCh: shutdownCh},
|
||||
logger.Named("hcp-link-watcher"),
|
||||
pbresource.NewResourceServiceClient(s.insecureSafeGRPCChan),
|
||||
)
|
||||
if err != nil {
|
||||
s.logger.Error("HCP Link watcher failed to start. HCP Link functionality is disabled", "error", err)
|
||||
} else {
|
||||
go hcp.MonitorHCPLink(
|
||||
&lib.StopChannelContext{StopCh: shutdownCh},
|
||||
logger.Named("hcp-link-watcher"),
|
||||
s.hcpManager,
|
||||
hcpLinkWatchCh,
|
||||
hcpclient.NewClient,
|
||||
bootstrap.LoadManagementToken,
|
||||
flat.HCP.DataDir,
|
||||
)
|
||||
}
|
||||
|
||||
s.controllerManager = controller.NewManager(
|
||||
// Usage of the insecure + unsafe grpc chan is required for the controller
|
||||
// manager. It must be unauthorized so that controllers do not need to
|
||||
|
|
|
@ -8,13 +8,13 @@ import (
|
|||
"crypto/tls"
|
||||
"strings"
|
||||
|
||||
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
|
||||
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
|
||||
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap"
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
|
@ -54,16 +54,20 @@ func LinkController(
|
|||
// on the leader.
|
||||
// https://hashicorp.atlassian.net/browse/CC-7364
|
||||
WithPlacement(controller.PlacementEachServer).
|
||||
WithInitializer(&linkInitializer{
|
||||
cloudConfig: cfg,
|
||||
}).
|
||||
WithReconciler(&linkReconciler{
|
||||
resourceApisEnabled: resourceApisEnabled,
|
||||
hcpAllowV2ResourceApis: hcpAllowV2ResourceApis,
|
||||
hcpClientFn: hcpClientFn,
|
||||
dataDir: dataDir,
|
||||
hcpManager: hcpManager,
|
||||
})
|
||||
WithInitializer(
|
||||
&linkInitializer{
|
||||
cloudConfig: cfg,
|
||||
},
|
||||
).
|
||||
WithReconciler(
|
||||
&linkReconciler{
|
||||
resourceApisEnabled: resourceApisEnabled,
|
||||
hcpAllowV2ResourceApis: hcpAllowV2ResourceApis,
|
||||
hcpClientFn: hcpClientFn,
|
||||
dataDir: dataDir,
|
||||
hcpManager: hcpManager,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
type linkReconciler struct {
|
||||
|
@ -178,43 +182,22 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r
|
|||
rt.Logger.Error("error marshalling link data", "error", err)
|
||||
return err
|
||||
}
|
||||
_, err = rt.Client.Write(ctx, &pbresource.WriteRequest{Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Name: types.LinkName,
|
||||
Type: pbhcp.LinkType,
|
||||
},
|
||||
Metadata: res.Metadata,
|
||||
Data: updatedData,
|
||||
}})
|
||||
_, err = rt.Client.Write(
|
||||
ctx, &pbresource.WriteRequest{Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Name: types.LinkName,
|
||||
Type: pbhcp.LinkType,
|
||||
},
|
||||
Metadata: res.Metadata,
|
||||
Data: updatedData,
|
||||
}},
|
||||
)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error updating link", "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Load the management token if access is not set to read-only. Read-only clusters
|
||||
// will not have a management token provided by HCP.
|
||||
var token string
|
||||
if accessLevel != pbhcp.AccessLevel_ACCESS_LEVEL_GLOBAL_READ_ONLY {
|
||||
token, err = bootstrap.LoadManagementToken(ctx, rt.Logger, hcpClient, r.dataDir)
|
||||
if err != nil {
|
||||
linkingFailed(ctx, rt, res, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Update the HCP manager configuration with the link values
|
||||
cfg.ManagementToken = token
|
||||
r.hcpManager.UpdateConfig(hcpClient, cfg)
|
||||
|
||||
// Start the manager
|
||||
err = r.hcpManager.Start(ctx)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error starting HCP manager", "error", err)
|
||||
linkingFailed(ctx, rt, res, err)
|
||||
return err
|
||||
}
|
||||
|
||||
newStatus = &pbresource.Status{
|
||||
ObservedGeneration: res.Generation,
|
||||
Conditions: []*pbresource.Condition{ConditionLinked(link.ResourceId)},
|
||||
|
@ -233,17 +216,20 @@ func (i *linkInitializer) Initialize(ctx context.Context, rt controller.Runtime)
|
|||
}
|
||||
|
||||
// Construct a link resource to reflect the configuration
|
||||
data, err := anypb.New(&pbhcp.Link{
|
||||
ResourceId: i.cloudConfig.ResourceID,
|
||||
ClientId: i.cloudConfig.ClientID,
|
||||
ClientSecret: i.cloudConfig.ClientSecret,
|
||||
})
|
||||
data, err := anypb.New(
|
||||
&pbhcp.Link{
|
||||
ResourceId: i.cloudConfig.ResourceID,
|
||||
ClientId: i.cloudConfig.ClientID,
|
||||
ClientSecret: i.cloudConfig.ClientSecret,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the link resource for a configuration-based link
|
||||
_, err = rt.Client.Write(ctx,
|
||||
_, err = rt.Client.Write(
|
||||
ctx,
|
||||
&pbresource.WriteRequest{
|
||||
Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
|
|
|
@ -18,9 +18,6 @@ import (
|
|||
func cleanup(rt controller.Runtime, hcpManager hcp.Manager, dataDir string) error {
|
||||
rt.Logger.Trace("cleaning up link resource")
|
||||
|
||||
rt.Logger.Debug("stopping HCP manager")
|
||||
hcpManager.Stop()
|
||||
|
||||
if dataDir != "" {
|
||||
hcpConfigDir := filepath.Join(dataDir, bootstrap.SubDir)
|
||||
rt.Logger.Debug("deleting hcp-config dir", "dir", hcpConfigDir)
|
||||
|
|
Loading…
Reference in New Issue