k3s/pkg/spegel/spegel.go

261 lines
8.4 KiB
Go
Raw Normal View History

package spegel
import (
"context"
"fmt"
"log"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"time"
"github.com/containerd/containerd/remotes/docker"
"github.com/k3s-io/k3s/pkg/agent/https"
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/dynamiclistener/cert"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/ptr"
"github.com/go-logr/logr"
"github.com/go-logr/stdr"
"github.com/gorilla/mux"
leveldb "github.com/ipfs/go-ds-leveldb"
ipfslog "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spegel-org/spegel/pkg/metrics"
"github.com/spegel-org/spegel/pkg/oci"
"github.com/spegel-org/spegel/pkg/registry"
"github.com/spegel-org/spegel/pkg/routing"
"github.com/spegel-org/spegel/pkg/state"
"k8s.io/component-base/metrics/legacyregistry"
)
// DefaultRegistry is the default instance of a Spegel distributed registry
var DefaultRegistry = &Config{
Bootstrapper: NewSelfBootstrapper(),
Router: func(context.Context, *config.Node) (*mux.Router, error) {
return nil, errors.New("not implemented")
},
}
var (
P2pAddressAnnotation = "p2p." + version.Program + ".cattle.io/node-address"
P2pEnabledLabel = "p2p." + version.Program + ".cattle.io/enabled"
P2pPortEnv = version.ProgramUpper + "_P2P_PORT"
P2pEnableLatestEnv = version.ProgramUpper + "_P2P_ENABLE_LATEST"
resolveLatestTag = false
)
// Config holds fields for a distributed registry
type Config struct {
ClientCAFile string
ClientCertFile string
ClientKeyFile string
ServerCAFile string
ServerCertFile string
ServerKeyFile string
// ExternalAddress is the address for other nodes to connect to the registry API.
ExternalAddress string
// InternalAddress is the address for the local containerd instance to connect to the registry API.
InternalAddress string
// RegistryPort is the port for the registry API.
RegistryPort string
// PSK is the preshared key required to join the p2p network.
PSK []byte
// Bootstrapper is the bootstrapper that will be used to discover p2p peers.
Bootstrapper routing.Bootstrapper
// HandlerFunc will be called to add the registry API handler to an existing router.
Router https.RouterFunc
}
// These values are not currently configurable
const (
resolveRetries = 0
resolveTimeout = time.Second * 5
registryNamespace = "k8s.io"
defaultRouterPort = "5001"
)
func init() {
// ensure that spegel exposes metrics through the same registry used by Kubernetes components
metrics.DefaultRegisterer = legacyregistry.Registerer()
metrics.DefaultGatherer = legacyregistry.DefaultGatherer
}
// Start starts the embedded p2p router, and binds the registry API to an existing HTTP router.
func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
localAddr := net.JoinHostPort(c.InternalAddress, c.RegistryPort)
// distribute images for all configured mirrors. there doesn't need to be a
// configured endpoint, just having a key for the registry will do.
urls := []url.URL{}
registries := []string{}
for host := range nodeConfig.AgentConfig.Registry.Mirrors {
if host == localAddr {
continue
}
if u, err := url.Parse("https://" + host); err != nil || docker.IsLocalhost(host) {
logrus.Errorf("Distributed registry mirror skipping invalid registry: %s", host)
} else {
urls = append(urls, *u)
registries = append(registries, host)
}
}
if len(registries) == 0 {
logrus.Errorf("Not starting distributed registry mirror: no registries configured for distributed mirroring")
return nil
}
logrus.Infof("Starting distributed registry mirror at https://%s:%s/v2 for registries %v",
c.ExternalAddress, c.RegistryPort, registries)
// set up the various logging logging frameworks
level := ipfslog.LevelInfo
if logrus.IsLevelEnabled(logrus.DebugLevel) {
level = ipfslog.LevelDebug
stdlog := log.New(logrus.StandardLogger().Writer(), "spegel ", log.LstdFlags)
logger := stdr.NewWithOptions(stdlog, stdr.Options{Verbosity: ptr.To(10)})
ctx = logr.NewContext(ctx, logger)
}
ipfslog.SetAllLoggers(level)
// Get containerd client
ociOpts := []oci.Option{oci.WithContentPath(filepath.Join(nodeConfig.Containerd.Root, "io.containerd.content.v1.content"))}
ociClient, err := oci.NewContainerd(nodeConfig.Containerd.Address, registryNamespace, nodeConfig.Containerd.Registry, urls, ociOpts...)
if err != nil {
return errors.Wrap(err, "failed to create OCI client")
}
// create or load persistent private key
keyFile := filepath.Join(nodeConfig.Containerd.Opt, "peer.key")
keyBytes, _, err := cert.LoadOrGenerateKeyFile(keyFile, false)
if err != nil {
return errors.Wrap(err, "failed to load or generate p2p private key")
}
privKey, err := cert.ParsePrivateKeyPEM(keyBytes)
if err != nil {
return errors.Wrap(err, "failed to parse p2p private key")
}
p2pKey, _, err := crypto.KeyPairFromStdKey(privKey)
if err != nil {
return errors.Wrap(err, "failed to convert p2p private key")
}
// create a peerstore to allow persisting nodes across restarts
peerFile := filepath.Join(nodeConfig.Containerd.Opt, "peerstore.db")
ds, err := leveldb.NewDatastore(peerFile, nil)
if err != nil {
return errors.Wrap(err, "failed to create peerstore datastore")
}
ps, err := pstoreds.NewPeerstore(ctx, ds, pstoreds.DefaultOpts())
if err != nil {
return errors.Wrap(err, "failed to create peerstore")
}
// get latest tag configuration override
if env := os.Getenv(P2pEnableLatestEnv); env != "" {
if b, err := strconv.ParseBool(env); err != nil {
logrus.Warnf("Invalid %s value; using default %v", P2pEnableLatestEnv, resolveLatestTag)
} else {
resolveLatestTag = b
}
}
// get port and start p2p router
routerPort := defaultRouterPort
if env := os.Getenv(P2pPortEnv); env != "" {
if i, err := strconv.Atoi(env); i == 0 || err != nil {
logrus.Warnf("Invalid %s value; using default %v", P2pPortEnv, defaultRouterPort)
} else {
routerPort = env
}
}
routerAddr := net.JoinHostPort(c.ExternalAddress, routerPort)
logrus.Infof("Starting distributed registry P2P node at %s", routerAddr)
opts := []libp2p.Option{
libp2p.Identity(p2pKey),
libp2p.Peerstore(ps),
libp2p.PrivateNetwork(c.PSK),
}
router, err := routing.NewP2PRouter(ctx, routerAddr, c.Bootstrapper, c.RegistryPort, opts...)
if err != nil {
return errors.Wrap(err, "failed to create p2p router")
}
go router.Run(ctx)
caCert, err := os.ReadFile(c.ServerCAFile)
if err != nil {
return errors.Wrap(err, "failed to read server CA")
}
client := clientaccess.GetHTTPClient(caCert, c.ClientCertFile, c.ClientKeyFile)
metrics.Register()
registryOpts := []registry.Option{
registry.WithLocalAddress(localAddr),
registry.WithResolveLatestTag(resolveLatestTag),
registry.WithResolveRetries(resolveRetries),
registry.WithResolveTimeout(resolveTimeout),
registry.WithTransport(client.Transport),
registry.WithLogger(logr.FromContextOrDiscard(ctx)),
}
reg := registry.NewRegistry(ociClient, router, registryOpts...)
regSvr := reg.Server(":" + c.RegistryPort)
// Close router on shutdown
go func() {
<-ctx.Done()
router.Close()
}()
// Track images available in containerd and publish via p2p router
go state.Track(ctx, ociClient, router, resolveLatestTag)
mRouter, err := c.Router(ctx, nodeConfig)
if err != nil {
return err
}
mRouter.PathPrefix("/v2").Handler(regSvr.Handler)
mRouter.PathPrefix("/v1-" + version.Program + "/p2p").Handler(c.peerInfo())
// Wait up to 5 seconds for the p2p network to find peers. This will return
// immediately if the node is bootstrapping from itself.
[release-1.29] Backports for 2024-08 release cycle (#10665) * Use pagination when retrieving etcd snapshot list Signed-off-by: Brad Davidson <brad.davidson@rancher.com> (cherry picked from commit c2216a62ad92b55feb835e92d55b95e952ecd596) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Update secretsencrypt pagination Make secretsencrypt page size and iteration consistent with other paginators Signed-off-by: Brad Davidson <brad.davidson@rancher.com> (cherry picked from commit 891e72f90fa7735c64692212fb757b83588484d6) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Cap length of generated name used for servicelb daemonset Signed-off-by: Brad Davidson <brad.davidson@rancher.com> (cherry picked from commit 21611c566561827eed45a0e81dcbee0699b88380) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Fix ipv6 sysctl required by non-ipv6 LoadBalancer service This is a partial revert of 095ecdb0346c038b0c16c39f6f66ad4f67ad10b9, with the workaround moved into klipper-lb. Signed-off-by: Brad Davidson <brad.davidson@rancher.com> (cherry picked from commit d4c3422a85ccfe2f00218e88050d072df2e50577) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * remove deprecated use of wait functions Signed-off-by: Will <will7989@hotmail.com> (cherry picked from commit e4f3cc7b54ae2be481184c2312644c51f094cf79) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Update pkg/secretsencrypt/config.go Co-authored-by: Brad Davidson <brad@oatmail.org> Signed-off-by: Will Andrews <will7989@hotmail.com> (cherry picked from commit 3ec086f6f7f0d9a1aaa357b0a59dfd06f2650030) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Update pkg/cluster/managed.go Co-authored-by: Derek Nola <derek.nola@suse.com> Signed-off-by: Will Andrews <will7989@hotmail.com> (cherry picked from commit e2179aa957a02d4b357bef9aabb163f043471023) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Wire lasso metrics up to common gatherer Signed-off-by: Brad Davidson <brad.davidson@rancher.com> (cherry picked from commit e168438d4439a27a89ca462cc8a62495b7473499) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> * Fix cloudprovider controller name Looking at metrics revealed the cloudprovider controller name was anempty string. Signed-off-by: Brad Davidson <brad.davidson@rancher.com> (cherry picked from commit bffdf463e1e1380d13d95b0fdc1e8644a57ec0a3) Signed-off-by: Brad Davidson <brad.davidson@rancher.com> --------- Signed-off-by: Brad Davidson <brad.davidson@rancher.com> Signed-off-by: Will <will7989@hotmail.com> Signed-off-by: Will Andrews <will7989@hotmail.com> Co-authored-by: Will <will7989@hotmail.com> Co-authored-by: Derek Nola <derek.nola@suse.com>
2024-08-05 16:35:07 +00:00
_ = wait.PollUntilContextTimeout(ctx, time.Second, resolveTimeout, true, func(_ context.Context) (bool, error) {
return router.Ready()
})
return nil
}
// peerInfo sends a peer address retrieved from the bootstrapper via HTTP
func (c *Config) peerInfo() http.HandlerFunc {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
client, _, _ := net.SplitHostPort(req.RemoteAddr)
info, err := c.Bootstrapper.Get()
if err != nil {
http.Error(resp, "Internal Error", http.StatusInternalServerError)
return
}
logrus.Debugf("Serving p2p peer addr %s to client at %s", info, client)
resp.WriteHeader(http.StatusOK)
resp.Header().Set("Content-Type", "text/plain")
fmt.Fprintf(resp, "%s/p2p/%s", info.Addrs[0].String(), info.ID.String())
})
}