k3s/pkg/spegel/bootstrap.go

202 lines
5.5 KiB
Go
Raw Normal View History

package spegel
import (
"context"
"math/rand"
"os"
"path/filepath"
"strings"
"time"
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/rancher/wrangler/v3/pkg/merr"
"github.com/sirupsen/logrus"
"github.com/spegel-org/spegel/pkg/routing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
)
// explicit interface checks
var _ routing.Bootstrapper = &selfBootstrapper{}
var _ routing.Bootstrapper = &agentBootstrapper{}
var _ routing.Bootstrapper = &serverBootstrapper{}
var _ routing.Bootstrapper = &chainingBootstrapper{}
type selfBootstrapper struct {
id string
}
// NewSelfBootstrapper returns a stub p2p bootstrapper that just returns its own ID
func NewSelfBootstrapper() routing.Bootstrapper {
return &selfBootstrapper{}
}
func (s *selfBootstrapper) Run(_ context.Context, id string) error {
s.id = id
return nil
}
func (s *selfBootstrapper) Get() (*peer.AddrInfo, error) {
return peer.AddrInfoFromString(s.id)
}
type agentBootstrapper struct {
server string
token string
clientCert string
clientKey string
}
// NewAgentBootstrapper returns a p2p bootstrapper that retrieves a peer address from its server
func NewAgentBootstrapper(server, token, dataDir string) routing.Bootstrapper {
return &agentBootstrapper{
clientCert: filepath.Join(dataDir, "agent", "client-kubelet.crt"),
clientKey: filepath.Join(dataDir, "agent", "client-kubelet.key"),
server: server,
token: token,
}
}
func (c *agentBootstrapper) Run(_ context.Context, _ string) error {
return nil
}
func (c *agentBootstrapper) Get() (*peer.AddrInfo, error) {
if c.server == "" || c.token == "" {
return nil, errors.New("cannot get addresses without server and token")
}
withCert := clientaccess.WithClientCertificate(c.clientCert, c.clientKey)
info, err := clientaccess.ParseAndValidateToken(c.server, c.token, withCert)
if err != nil {
return nil, err
}
addr, err := info.Get("/v1-" + version.Program + "/p2p")
if err != nil {
return nil, err
}
addrInfo, err := peer.AddrInfoFromString(string(addr))
return addrInfo, err
}
type serverBootstrapper struct {
controlConfig *config.Control
}
// NewServerBootstrapper returns a p2p bootstrapper that returns an address from a random other cluster member.
func NewServerBootstrapper(controlConfig *config.Control) routing.Bootstrapper {
return &serverBootstrapper{
controlConfig: controlConfig,
}
}
func (s *serverBootstrapper) Run(_ context.Context, id string) error {
s.controlConfig.Runtime.ClusterControllerStarts["spegel-p2p"] = func(ctx context.Context) {
nodes := s.controlConfig.Runtime.Core.Core().V1().Node()
[release-1.30] Backports for 2024-08 release cycle (#10664) * Use pagination when retrieving etcd snapshot list Signed-off-by: Brad Davidson <brad.davidson@rancher.com> (cherry picked from commit c2216a62ad92b55feb835e92d55b95e952ecd596) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Update secretsencrypt pagination Make secretsencrypt page size and iteration consistent with other paginators Signed-off-by: Brad Davidson <brad.davidson@rancher.com> (cherry picked from commit 891e72f90fa7735c64692212fb757b83588484d6) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Cap length of generated name used for servicelb daemonset Signed-off-by: Brad Davidson <brad.davidson@rancher.com> (cherry picked from commit 21611c566561827eed45a0e81dcbee0699b88380) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Fix ipv6 sysctl required by non-ipv6 LoadBalancer service This is a partial revert of 095ecdb0346c038b0c16c39f6f66ad4f67ad10b9, with the workaround moved into klipper-lb. Signed-off-by: Brad Davidson <brad.davidson@rancher.com> (cherry picked from commit d4c3422a85ccfe2f00218e88050d072df2e50577) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * remove deprecated use of wait functions Signed-off-by: Will <will7989@hotmail.com> (cherry picked from commit e4f3cc7b54ae2be481184c2312644c51f094cf79) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Update pkg/secretsencrypt/config.go Co-authored-by: Brad Davidson <brad@oatmail.org> Signed-off-by: Will Andrews <will7989@hotmail.com> (cherry picked from commit 3ec086f6f7f0d9a1aaa357b0a59dfd06f2650030) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Update pkg/cluster/managed.go Co-authored-by: Derek Nola <derek.nola@suse.com> Signed-off-by: Will Andrews <will7989@hotmail.com> (cherry picked from commit e2179aa957a02d4b357bef9aabb163f043471023) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Wire lasso metrics up to common gatherer Signed-off-by: Brad Davidson <brad.davidson@rancher.com> (cherry picked from commit e168438d4439a27a89ca462cc8a62495b7473499) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Fix cloudprovider controller name Looking at metrics revealed the cloudprovider controller name was anempty string. Signed-off-by: Brad Davidson <brad.davidson@rancher.com> (cherry picked from commit bffdf463e1e1380d13d95b0fdc1e8644a57ec0a3) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> --------- Signed-off-by: Brad Davidson <brad.davidson@rancher.com> Signed-off-by: Will <will7989@hotmail.com> Signed-off-by: Will Andrews <will7989@hotmail.com> Co-authored-by: Will <will7989@hotmail.com> Co-authored-by: Derek Nola <derek.nola@suse.com>
2024-08-05 16:35:00 +00:00
_ = wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
return false, nil
}
node, err := nodes.Get(nodeName, metav1.GetOptions{})
if err != nil {
return false, nil
}
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
node.Annotations[P2pAddressAnnotation] = id
if node.Labels == nil {
node.Labels = map[string]string{}
}
node.Labels[P2pEnabledLabel] = "true"
if _, err = nodes.Update(node); err != nil {
return false, nil
}
logrus.Infof("Node P2P address annotations and labels added: %s", id)
return true, nil
})
}
return nil
}
func (s *serverBootstrapper) Get() (addrInfo *peer.AddrInfo, err error) {
if s.controlConfig.Runtime.Core == nil {
return nil, util.ErrCoreNotReady
}
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
return nil, errors.New("node name not set")
}
nodes := s.controlConfig.Runtime.Core.Core().V1().Node()
labelSelector := labels.Set{P2pEnabledLabel: "true"}.String()
nodeList, err := nodes.List(metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
return nil, err
}
for _, i := range rand.Perm(len(nodeList.Items)) {
node := nodeList.Items[i]
if node.Name == nodeName {
// don't return our own address
continue
}
if find, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady); find == -1 || condition.Status != v1.ConditionTrue {
// don't return the address of a not-ready node
continue
}
if val, ok := node.Annotations[P2pAddressAnnotation]; ok {
for _, addr := range strings.Split(val, ",") {
if info, err := peer.AddrInfoFromString(addr); err == nil {
return info, nil
}
}
}
}
return nil, errors.New("no ready p2p peers found")
}
type chainingBootstrapper struct {
bootstrappers []routing.Bootstrapper
}
// NewChainingBootstrapper returns a p2p bootstrapper that passes through to a list of bootstrappers.
func NewChainingBootstrapper(bootstrappers ...routing.Bootstrapper) routing.Bootstrapper {
return &chainingBootstrapper{
bootstrappers: bootstrappers,
}
}
func (c *chainingBootstrapper) Run(ctx context.Context, id string) error {
errs := merr.Errors{}
for _, b := range c.bootstrappers {
if err := b.Run(ctx, id); err != nil {
errs = append(errs, err)
}
}
return merr.NewErrors(errs...)
}
func (c *chainingBootstrapper) Get() (*peer.AddrInfo, error) {
errs := merr.Errors{}
for _, b := range c.bootstrappers {
addr, err := b.Get()
if err == nil {
return addr, nil
}
errs = append(errs, err)
}
return nil, merr.NewErrors(errs...)
}