mirror of https://github.com/k3s-io/k3s
Refactor supervisor listener startup and add metrics
* Refactor agent supervisor listener startup and authn/authz to use upstream
auth delegators to perform for SubjectAccessReview for access to
metrics.
* Convert spegel and pprof handlers over to new structure.
* Promote bind-address to agent flag to allow setting supervisor bind
address for both agent and server.
* Promote enable-pprof to agent flag to allow profiling agents. Access
to the pprof endpoint now requires client cert auth, similar to the
spegel registry api endpoint.
* Add prometheus metrics handler.
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit ff679fb3ab
)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/10288/head
parent
c9f3efbe11
commit
7ef30a2c60
|
@ -524,12 +524,14 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
|
|||
SELinux: envInfo.EnableSELinux,
|
||||
ContainerRuntimeEndpoint: envInfo.ContainerRuntimeEndpoint,
|
||||
ImageServiceEndpoint: envInfo.ImageServiceEndpoint,
|
||||
EnablePProf: envInfo.EnablePProf,
|
||||
EmbeddedRegistry: controlConfig.EmbeddedRegistry,
|
||||
FlannelBackend: controlConfig.FlannelBackend,
|
||||
FlannelIPv6Masq: controlConfig.FlannelIPv6Masq,
|
||||
FlannelExternalIP: controlConfig.FlannelExternalIP,
|
||||
EgressSelectorMode: controlConfig.EgressSelectorMode,
|
||||
ServerHTTPSPort: controlConfig.HTTPSPort,
|
||||
SupervisorMetrics: controlConfig.SupervisorMetrics,
|
||||
Token: info.String(),
|
||||
}
|
||||
nodeConfig.FlannelIface = flannelIface
|
||||
|
@ -592,13 +594,18 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
|
|||
nodeConfig.Containerd.Template = filepath.Join(envInfo.DataDir, "agent", "etc", "containerd", "config.toml.tmpl")
|
||||
nodeConfig.Certificate = servingCert
|
||||
|
||||
nodeConfig.AgentConfig.NodeIPs = nodeIPs
|
||||
listenAddress, _, _, err := util.GetDefaultAddresses(nodeIPs[0])
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot configure IPv4/IPv6 node-ip")
|
||||
if envInfo.BindAddress != "" {
|
||||
nodeConfig.AgentConfig.ListenAddress = envInfo.BindAddress
|
||||
} else {
|
||||
listenAddress, _, _, err := util.GetDefaultAddresses(nodeIPs[0])
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot configure IPv4/IPv6 node-ip")
|
||||
}
|
||||
nodeConfig.AgentConfig.ListenAddress = listenAddress
|
||||
}
|
||||
|
||||
nodeConfig.AgentConfig.NodeIP = nodeIPs[0].String()
|
||||
nodeConfig.AgentConfig.ListenAddress = listenAddress
|
||||
nodeConfig.AgentConfig.NodeIPs = nodeIPs
|
||||
nodeConfig.AgentConfig.NodeExternalIPs = nodeExternalIPs
|
||||
|
||||
// if configured, set NodeExternalIP to the first IPv4 address, for legacy clients
|
||||
|
@ -689,6 +696,8 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
|
|||
nodeConfig.AgentConfig.ImageCredProvConfig = envInfo.ImageCredProvConfig
|
||||
nodeConfig.AgentConfig.DisableCCM = controlConfig.DisableCCM
|
||||
nodeConfig.AgentConfig.DisableNPC = controlConfig.DisableNPC
|
||||
nodeConfig.AgentConfig.MinTLSVersion = controlConfig.MinTLSVersion
|
||||
nodeConfig.AgentConfig.CipherSuites = controlConfig.CipherSuites
|
||||
nodeConfig.AgentConfig.Rootless = envInfo.Rootless
|
||||
nodeConfig.AgentConfig.PodManifests = filepath.Join(envInfo.DataDir, "agent", DefaultPodManifestPath)
|
||||
nodeConfig.AgentConfig.ProtectKernelDefaults = envInfo.ProtectKernelDefaults
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
package https
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/generated/clientset/versioned/scheme"
|
||||
"github.com/k3s-io/k3s/pkg/util"
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/options"
|
||||
)
|
||||
|
||||
// RouterFunc provides a hook for components to register additional routes to a request router
|
||||
type RouterFunc func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error)
|
||||
|
||||
var once sync.Once
|
||||
var router *mux.Router
|
||||
var err error
|
||||
|
||||
// Start returns a router with authn/authz filters applied.
|
||||
// The first time it is called, the router is created and a new HTTPS listener is started if the handler is nil.
|
||||
// Subsequent calls will return the same router.
|
||||
func Start(ctx context.Context, nodeConfig *config.Node, runtime *config.ControlRuntime) (*mux.Router, error) {
|
||||
once.Do(func() {
|
||||
router = mux.NewRouter().SkipClean(true)
|
||||
config := server.Config{}
|
||||
|
||||
if runtime == nil {
|
||||
// If we do not have an existing handler, set up a new listener
|
||||
tcp, lerr := util.ListenWithLoopback(ctx, nodeConfig.AgentConfig.ListenAddress, strconv.Itoa(nodeConfig.ServerHTTPSPort))
|
||||
if lerr != nil {
|
||||
err = lerr
|
||||
return
|
||||
}
|
||||
|
||||
serving := options.NewSecureServingOptions()
|
||||
serving.Listener = tcp
|
||||
serving.CipherSuites = nodeConfig.AgentConfig.CipherSuites
|
||||
serving.MinTLSVersion = nodeConfig.AgentConfig.MinTLSVersion
|
||||
serving.ServerCert = options.GeneratableKeyCert{
|
||||
CertKey: options.CertKey{
|
||||
CertFile: nodeConfig.AgentConfig.ServingKubeletCert,
|
||||
KeyFile: nodeConfig.AgentConfig.ServingKubeletKey,
|
||||
},
|
||||
}
|
||||
if aerr := serving.ApplyTo(&config.SecureServing); aerr != nil {
|
||||
err = aerr
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// If we have an existing handler, wrap it
|
||||
router.NotFoundHandler = runtime.Handler
|
||||
runtime.Handler = router
|
||||
}
|
||||
|
||||
authn := options.NewDelegatingAuthenticationOptions()
|
||||
authn.DisableAnonymous = true
|
||||
authn.SkipInClusterLookup = true
|
||||
authn.ClientCert = options.ClientCertAuthenticationOptions{
|
||||
ClientCA: nodeConfig.AgentConfig.ClientCA,
|
||||
}
|
||||
authn.RemoteKubeConfigFile = nodeConfig.AgentConfig.KubeConfigKubelet
|
||||
if applyErr := authn.ApplyTo(&config.Authentication, config.SecureServing, nil); applyErr != nil {
|
||||
err = applyErr
|
||||
return
|
||||
}
|
||||
|
||||
authz := options.NewDelegatingAuthorizationOptions()
|
||||
authz.AlwaysAllowPaths = []string{"/v2", "/debug/pprof", "/v1-" + version.Program + "/p2p"}
|
||||
authz.RemoteKubeConfigFile = nodeConfig.AgentConfig.KubeConfigKubelet
|
||||
if applyErr := authz.ApplyTo(&config.Authorization); applyErr != nil {
|
||||
err = applyErr
|
||||
return
|
||||
}
|
||||
|
||||
router.Use(filterChain(config.Authentication.Authenticator, config.Authorization.Authorizer))
|
||||
|
||||
if config.SecureServing != nil {
|
||||
_, _, err = config.SecureServing.Serve(router, 0, ctx.Done())
|
||||
}
|
||||
})
|
||||
|
||||
return router, err
|
||||
}
|
||||
|
||||
// filterChain runs the kubernetes authn/authz filter chain using the mux middleware API
|
||||
func filterChain(authn authenticator.Request, authz authorizer.Authorizer) mux.MiddlewareFunc {
|
||||
return func(handler http.Handler) http.Handler {
|
||||
requestInfoResolver := &apirequest.RequestInfoFactory{}
|
||||
failedHandler := genericapifilters.Unauthorized(scheme.Codecs)
|
||||
handler = genericapifilters.WithAuthorization(handler, authz, scheme.Codecs)
|
||||
handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil, nil)
|
||||
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
|
||||
handler = genericapifilters.WithCacheControl(handler)
|
||||
return handler
|
||||
}
|
||||
}
|
|
@ -19,25 +19,25 @@ import (
|
|||
|
||||
"github.com/cloudnativelabs/kube-router/v2/pkg/controllers/netpol"
|
||||
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
|
||||
"github.com/cloudnativelabs/kube-router/v2/pkg/metrics"
|
||||
krmetrics "github.com/cloudnativelabs/kube-router/v2/pkg/metrics"
|
||||
"github.com/cloudnativelabs/kube-router/v2/pkg/options"
|
||||
"github.com/cloudnativelabs/kube-router/v2/pkg/utils"
|
||||
"github.com/cloudnativelabs/kube-router/v2/pkg/version"
|
||||
"github.com/coreos/go-iptables/iptables"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/metrics"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
v1core "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// ensure that kube-router exposes metrics through the same registry used by Kubernetes components
|
||||
metrics.DefaultRegisterer = legacyregistry.Registerer()
|
||||
metrics.DefaultGatherer = legacyregistry.DefaultGatherer
|
||||
krmetrics.DefaultRegisterer = metrics.DefaultRegisterer
|
||||
krmetrics.DefaultGatherer = metrics.DefaultGatherer
|
||||
}
|
||||
|
||||
// Run creates and starts a new instance of the kube-router network policy controller
|
||||
|
@ -156,7 +156,7 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
|
|||
}
|
||||
|
||||
// Start kube-router metrics controller to avoid complaints about metrics heartbeat missing
|
||||
mc, err := metrics.NewMetricsController(krConfig)
|
||||
mc, err := krmetrics.NewMetricsController(krConfig)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -188,13 +188,13 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
|
|||
}
|
||||
|
||||
// metricsRunCheck is a stub version of mc.Run() that doesn't start up a dedicated http server.
|
||||
func metricsRunCheck(mc *metrics.Controller, healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
||||
func metricsRunCheck(mc *krmetrics.Controller, healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
||||
t := time.NewTicker(3 * time.Second)
|
||||
defer wg.Done()
|
||||
|
||||
// register metrics for this controller
|
||||
metrics.BuildInfo.WithLabelValues(runtime.Version(), version.Version).Set(1)
|
||||
metrics.DefaultRegisterer.MustRegister(metrics.BuildInfo)
|
||||
krmetrics.BuildInfo.WithLabelValues(runtime.Version(), version.Version).Set(1)
|
||||
krmetrics.DefaultRegisterer.MustRegister(krmetrics.BuildInfo)
|
||||
|
||||
for {
|
||||
healthcheck.SendHeartBeat(healthChan, "MC")
|
||||
|
|
|
@ -27,7 +27,9 @@ import (
|
|||
"github.com/k3s-io/k3s/pkg/daemons/agent"
|
||||
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/executor"
|
||||
"github.com/k3s-io/k3s/pkg/metrics"
|
||||
"github.com/k3s-io/k3s/pkg/nodeconfig"
|
||||
"github.com/k3s-io/k3s/pkg/profile"
|
||||
"github.com/k3s-io/k3s/pkg/rootless"
|
||||
"github.com/k3s-io/k3s/pkg/spegel"
|
||||
"github.com/k3s-io/k3s/pkg/util"
|
||||
|
@ -113,6 +115,18 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
|
|||
}
|
||||
}
|
||||
|
||||
if nodeConfig.SupervisorMetrics {
|
||||
if err := metrics.DefaultMetrics.Start(ctx, nodeConfig); err != nil {
|
||||
return errors.Wrap(err, "failed to serve metrics")
|
||||
}
|
||||
}
|
||||
|
||||
if nodeConfig.EnablePProf {
|
||||
if err := profile.DefaultProfiler.Start(ctx, nodeConfig); err != nil {
|
||||
return errors.Wrap(err, "failed to serve pprof")
|
||||
}
|
||||
}
|
||||
|
||||
if err := setupCriCtlConfig(cfg, nodeConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/control/deps"
|
||||
"github.com/k3s-io/k3s/pkg/metrics"
|
||||
"github.com/k3s-io/k3s/pkg/util"
|
||||
"github.com/k3s-io/k3s/pkg/util/services"
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
|
@ -22,18 +23,9 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultRegisterer and DefaultGatherer are the implementations of the
|
||||
// prometheus Registerer and Gatherer interfaces that all metrics operations
|
||||
// will use. They are variables so that packages that embed this library can
|
||||
// replace them at runtime, instead of having to pass around specific
|
||||
// registries.
|
||||
DefaultRegisterer = legacyregistry.Registerer()
|
||||
DefaultGatherer = legacyregistry.DefaultGatherer
|
||||
|
||||
// Check certificates twice an hour. Kubernetes events have a TTL of 1 hour by default,
|
||||
// so similar events should be aggregated and refreshed by the event recorder as long
|
||||
// as they are created within the TTL period.
|
||||
|
@ -50,7 +42,7 @@ var (
|
|||
// Setup starts the certificate expiration monitor
|
||||
func Setup(ctx context.Context, nodeConfig *daemonconfig.Node, dataDir string) error {
|
||||
logrus.Debugf("Starting %s with monitoring period %s", controllerName, certCheckInterval)
|
||||
DefaultRegisterer.MustRegister(certificateExpirationSeconds)
|
||||
metrics.DefaultRegisterer.MustRegister(certificateExpirationSeconds)
|
||||
|
||||
client, err := util.GetClientSet(nodeConfig.AgentConfig.KubeConfigKubelet)
|
||||
if err != nil {
|
||||
|
|
|
@ -1,20 +1,22 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/k3s-io/k3s/pkg/agent"
|
||||
"github.com/k3s-io/k3s/pkg/authenticator"
|
||||
"github.com/k3s-io/k3s/pkg/agent/https"
|
||||
"github.com/k3s-io/k3s/pkg/cli/cmds"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/datadir"
|
||||
k3smetrics "github.com/k3s-io/k3s/pkg/metrics"
|
||||
"github.com/k3s-io/k3s/pkg/proctitle"
|
||||
"github.com/k3s-io/k3s/pkg/profile"
|
||||
"github.com/k3s-io/k3s/pkg/spegel"
|
||||
"github.com/k3s-io/k3s/pkg/util"
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
|
@ -22,7 +24,6 @@ import (
|
|||
"github.com/rancher/wrangler/pkg/signals"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/urfave/cli"
|
||||
apiauth "k8s.io/apiserver/pkg/authentication/authenticator"
|
||||
)
|
||||
|
||||
func Run(ctx *cli.Context) error {
|
||||
|
@ -108,33 +109,22 @@ func Run(ctx *cli.Context) error {
|
|||
// Until the agent is run and retrieves config from the server, we won't know
|
||||
// if the embedded registry is enabled. If it is not enabled, these are not
|
||||
// used as the registry is never started.
|
||||
conf := spegel.DefaultRegistry
|
||||
conf.Bootstrapper = spegel.NewAgentBootstrapper(cfg.ServerURL, cfg.Token, cfg.DataDir)
|
||||
conf.HandlerFunc = func(conf *spegel.Config, router *mux.Router) error {
|
||||
// Create and bind a new authenticator using the configured client CA
|
||||
authArgs := []string{"--client-ca-file=" + conf.ClientCAFile}
|
||||
auth, err := authenticator.FromArgs(authArgs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conf.AuthFunc = func() apiauth.Request {
|
||||
return auth
|
||||
}
|
||||
registry := spegel.DefaultRegistry
|
||||
registry.Bootstrapper = spegel.NewAgentBootstrapper(cfg.ServerURL, cfg.Token, cfg.DataDir)
|
||||
registry.Router = func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error) {
|
||||
return https.Start(ctx, nodeConfig, nil)
|
||||
}
|
||||
|
||||
// Create a new server and listen on the configured port
|
||||
server := &http.Server{
|
||||
Handler: router,
|
||||
Addr: ":" + conf.RegistryPort,
|
||||
TLSConfig: &tls.Config{
|
||||
ClientAuth: tls.RequestClientCert,
|
||||
},
|
||||
}
|
||||
go func() {
|
||||
if err := server.ListenAndServeTLS(conf.ServerCertFile, conf.ServerKeyFile); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
logrus.Fatalf("registry server failed: %v", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
// same deal for metrics - these are not used if the extra metrics listener is not enabled.
|
||||
metrics := k3smetrics.DefaultMetrics
|
||||
metrics.Router = func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error) {
|
||||
return https.Start(ctx, nodeConfig, nil)
|
||||
}
|
||||
|
||||
// and for pprof as well
|
||||
pprof := profile.DefaultProfiler
|
||||
pprof.Router = func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error) {
|
||||
return https.Start(ctx, nodeConfig, nil)
|
||||
}
|
||||
|
||||
return agent.Run(contextCtx, cfg)
|
||||
|
|
|
@ -20,6 +20,7 @@ type Agent struct {
|
|||
LBServerPort int
|
||||
ResolvConf string
|
||||
DataDir string
|
||||
BindAddress string
|
||||
NodeIP cli.StringSlice
|
||||
NodeExternalIP cli.StringSlice
|
||||
NodeName string
|
||||
|
@ -36,6 +37,7 @@ type Agent struct {
|
|||
VPNAuth string
|
||||
VPNAuthFile string
|
||||
Debug bool
|
||||
EnablePProf bool
|
||||
Rootless bool
|
||||
RootlessAlreadyUnshared bool
|
||||
WithNodeID bool
|
||||
|
@ -226,6 +228,16 @@ var (
|
|||
Usage: "(agent/containerd) Disables containerd's fallback default registry endpoint when a mirror is configured for that registry",
|
||||
Destination: &AgentConfig.ContainerdNoDefault,
|
||||
}
|
||||
EnablePProfFlag = &cli.BoolFlag{
|
||||
Name: "enable-pprof",
|
||||
Usage: "(experimental) Enable pprof endpoint on supervisor port",
|
||||
Destination: &AgentConfig.EnablePProf,
|
||||
}
|
||||
BindAddressFlag = &cli.StringFlag{
|
||||
Name: "bind-address",
|
||||
Usage: "(listener) " + version.Program + " bind address (default: 0.0.0.0)",
|
||||
Destination: &AgentConfig.BindAddress,
|
||||
}
|
||||
)
|
||||
|
||||
func NewAgentCommand(action func(ctx *cli.Context) error) cli.Command {
|
||||
|
@ -278,6 +290,7 @@ func NewAgentCommand(action func(ctx *cli.Context) error) cli.Command {
|
|||
DisableDefaultRegistryEndpointFlag,
|
||||
AirgapExtraRegistryFlag,
|
||||
NodeIPFlag,
|
||||
BindAddressFlag,
|
||||
NodeExternalIPFlag,
|
||||
ResolvConfFlag,
|
||||
FlannelIfaceFlag,
|
||||
|
@ -286,6 +299,7 @@ func NewAgentCommand(action func(ctx *cli.Context) error) cli.Command {
|
|||
ExtraKubeletArgs,
|
||||
ExtraKubeProxyArgs,
|
||||
// Experimental flags
|
||||
EnablePProfFlag,
|
||||
&cli.BoolFlag{
|
||||
Name: "rootless",
|
||||
Usage: "(experimental) Run rootless",
|
||||
|
|
|
@ -48,8 +48,6 @@ type Server struct {
|
|||
HelmJobImage string
|
||||
TLSSan cli.StringSlice
|
||||
TLSSanSecurity bool
|
||||
BindAddress string
|
||||
EnablePProf bool
|
||||
ExtraAPIArgs cli.StringSlice
|
||||
ExtraEtcdArgs cli.StringSlice
|
||||
ExtraSchedulerArgs cli.StringSlice
|
||||
|
@ -87,6 +85,7 @@ type Server struct {
|
|||
EncryptSkip bool
|
||||
SystemDefaultRegistry string
|
||||
StartupHooks []StartupHook
|
||||
SupervisorMetrics bool
|
||||
EtcdSnapshotName string
|
||||
EtcdDisableSnapshots bool
|
||||
EtcdExposeMetrics bool
|
||||
|
@ -178,11 +177,7 @@ var ServerFlags = []cli.Flag{
|
|||
VModule,
|
||||
LogFile,
|
||||
AlsoLogToStderr,
|
||||
&cli.StringFlag{
|
||||
Name: "bind-address",
|
||||
Usage: "(listener) " + version.Program + " bind address (default: 0.0.0.0)",
|
||||
Destination: &ServerConfig.BindAddress,
|
||||
},
|
||||
BindAddressFlag,
|
||||
&cli.IntFlag{
|
||||
Name: "https-listen-port",
|
||||
Usage: "(listener) HTTPS listen port",
|
||||
|
@ -493,9 +488,14 @@ var ServerFlags = []cli.Flag{
|
|||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "embedded-registry",
|
||||
Usage: "(experimental/components) Enable embedded distributed container registry; requires use of embedded containerd",
|
||||
Usage: "(experimental/components) Enable embedded distributed container registry; requires use of embedded containerd; when enabled agents will also listen on the supervisor port",
|
||||
Destination: &ServerConfig.EmbeddedRegistry,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "supervisor-metrics",
|
||||
Usage: "(experimental/components) Enable serving " + version.Program + " internal metrics on the supervisor port; when enabled agents will also listen on the supervisor port",
|
||||
Destination: &ServerConfig.SupervisorMetrics,
|
||||
},
|
||||
NodeNameFlag,
|
||||
WithNodeIDFlag,
|
||||
NodeLabels,
|
||||
|
@ -534,11 +534,7 @@ var ServerFlags = []cli.Flag{
|
|||
Destination: &ServerConfig.EncryptSecrets,
|
||||
},
|
||||
// Experimental flags
|
||||
&cli.BoolFlag{
|
||||
Name: "enable-pprof",
|
||||
Usage: "(experimental) Enable pprof endpoint on supervisor port",
|
||||
Destination: &ServerConfig.EnablePProf,
|
||||
},
|
||||
EnablePProfFlag,
|
||||
&cli.BoolFlag{
|
||||
Name: "rootless",
|
||||
Usage: "(experimental) Run rootless",
|
||||
|
|
|
@ -12,13 +12,16 @@ import (
|
|||
systemd "github.com/coreos/go-systemd/v22/daemon"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/k3s-io/k3s/pkg/agent"
|
||||
"github.com/k3s-io/k3s/pkg/agent/https"
|
||||
"github.com/k3s-io/k3s/pkg/agent/loadbalancer"
|
||||
"github.com/k3s-io/k3s/pkg/cli/cmds"
|
||||
"github.com/k3s-io/k3s/pkg/clientaccess"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/datadir"
|
||||
"github.com/k3s-io/k3s/pkg/etcd"
|
||||
k3smetrics "github.com/k3s-io/k3s/pkg/metrics"
|
||||
"github.com/k3s-io/k3s/pkg/proctitle"
|
||||
"github.com/k3s-io/k3s/pkg/profile"
|
||||
"github.com/k3s-io/k3s/pkg/rootless"
|
||||
"github.com/k3s-io/k3s/pkg/server"
|
||||
"github.com/k3s-io/k3s/pkg/spegel"
|
||||
|
@ -30,7 +33,6 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
"github.com/urfave/cli"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||
kubeapiserverflag "k8s.io/component-base/cli/flag"
|
||||
"k8s.io/kubernetes/pkg/controlplane/apiserver/options"
|
||||
utilsnet "k8s.io/utils/net"
|
||||
|
@ -136,12 +138,11 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
|
|||
serverConfig.ControlConfig.ServiceLBNamespace = cfg.ServiceLBNamespace
|
||||
serverConfig.ControlConfig.SANs = util.SplitStringSlice(cfg.TLSSan)
|
||||
serverConfig.ControlConfig.SANSecurity = cfg.TLSSanSecurity
|
||||
serverConfig.ControlConfig.BindAddress = cfg.BindAddress
|
||||
serverConfig.ControlConfig.BindAddress = cmds.AgentConfig.BindAddress
|
||||
serverConfig.ControlConfig.SupervisorPort = cfg.SupervisorPort
|
||||
serverConfig.ControlConfig.HTTPSPort = cfg.HTTPSPort
|
||||
serverConfig.ControlConfig.APIServerPort = cfg.APIServerPort
|
||||
serverConfig.ControlConfig.APIServerBindAddress = cfg.APIServerBindAddress
|
||||
serverConfig.ControlConfig.EnablePProf = cfg.EnablePProf
|
||||
serverConfig.ControlConfig.ExtraAPIArgs = cfg.ExtraAPIArgs
|
||||
serverConfig.ControlConfig.ExtraControllerArgs = cfg.ExtraControllerArgs
|
||||
serverConfig.ControlConfig.ExtraEtcdArgs = cfg.ExtraEtcdArgs
|
||||
|
@ -174,6 +175,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
|
|||
serverConfig.ControlConfig.EncryptSecrets = cfg.EncryptSecrets
|
||||
serverConfig.ControlConfig.EtcdExposeMetrics = cfg.EtcdExposeMetrics
|
||||
serverConfig.ControlConfig.EtcdDisableSnapshots = cfg.EtcdDisableSnapshots
|
||||
serverConfig.ControlConfig.SupervisorMetrics = cfg.SupervisorMetrics
|
||||
serverConfig.ControlConfig.VLevel = cmds.LogConfig.VLevel
|
||||
serverConfig.ControlConfig.VModule = cmds.LogConfig.VModule
|
||||
|
||||
|
@ -406,6 +408,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
|
|||
}
|
||||
|
||||
tlsMinVersionArg := getArgValueFromList("tls-min-version", serverConfig.ControlConfig.ExtraAPIArgs)
|
||||
serverConfig.ControlConfig.MinTLSVersion = tlsMinVersionArg
|
||||
serverConfig.ControlConfig.TLSMinVersion, err = kubeapiserverflag.TLSVersion(tlsMinVersionArg)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "invalid tls-min-version")
|
||||
|
@ -435,6 +438,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
|
|||
}
|
||||
serverConfig.ControlConfig.ExtraAPIArgs = append(serverConfig.ControlConfig.ExtraAPIArgs, "tls-cipher-suites="+strings.Join(tlsCipherSuites, ","))
|
||||
}
|
||||
serverConfig.ControlConfig.CipherSuites = tlsCipherSuites
|
||||
serverConfig.ControlConfig.TLSCipherSuites, err = kubeapiserverflag.TLSCipherSuites(tlsCipherSuites)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "invalid tls-cipher-suites")
|
||||
|
@ -556,28 +560,36 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
|
|||
go getAPIAddressFromEtcd(ctx, serverConfig, agentConfig)
|
||||
}
|
||||
|
||||
// Until the agent is run and retrieves config from the server, we won't know
|
||||
// if the embedded registry is enabled. If it is not enabled, these are not
|
||||
// used as the registry is never started.
|
||||
registry := spegel.DefaultRegistry
|
||||
registry.Bootstrapper = spegel.NewChainingBootstrapper(
|
||||
spegel.NewServerBootstrapper(&serverConfig.ControlConfig),
|
||||
spegel.NewAgentBootstrapper(cfg.ServerURL, token, agentConfig.DataDir),
|
||||
spegel.NewSelfBootstrapper(),
|
||||
)
|
||||
registry.Router = func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error) {
|
||||
return https.Start(ctx, nodeConfig, serverConfig.ControlConfig.Runtime)
|
||||
}
|
||||
|
||||
// same deal for metrics - these are not used if the extra metrics listener is not enabled.
|
||||
metrics := k3smetrics.DefaultMetrics
|
||||
metrics.Router = func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error) {
|
||||
return https.Start(ctx, nodeConfig, serverConfig.ControlConfig.Runtime)
|
||||
}
|
||||
|
||||
// and for pprof as well
|
||||
pprof := profile.DefaultProfiler
|
||||
pprof.Router = func(ctx context.Context, nodeConfig *config.Node) (*mux.Router, error) {
|
||||
return https.Start(ctx, nodeConfig, serverConfig.ControlConfig.Runtime)
|
||||
}
|
||||
|
||||
if cfg.DisableAgent {
|
||||
agentConfig.ContainerRuntimeEndpoint = "/dev/null"
|
||||
return agent.RunStandalone(ctx, agentConfig)
|
||||
}
|
||||
|
||||
if cfg.EmbeddedRegistry {
|
||||
conf := spegel.DefaultRegistry
|
||||
conf.Bootstrapper = spegel.NewChainingBootstrapper(
|
||||
spegel.NewServerBootstrapper(&serverConfig.ControlConfig),
|
||||
spegel.NewAgentBootstrapper(cfg.ServerURL, token, agentConfig.DataDir),
|
||||
spegel.NewSelfBootstrapper(),
|
||||
)
|
||||
conf.HandlerFunc = func(_ *spegel.Config, router *mux.Router) error {
|
||||
router.NotFoundHandler = serverConfig.ControlConfig.Runtime.Handler
|
||||
serverConfig.ControlConfig.Runtime.Handler = router
|
||||
return nil
|
||||
}
|
||||
conf.AuthFunc = func() authenticator.Request {
|
||||
return serverConfig.ControlConfig.Runtime.Authenticator
|
||||
}
|
||||
}
|
||||
|
||||
return agent.Run(ctx, agentConfig)
|
||||
}
|
||||
|
||||
|
|
|
@ -4,17 +4,16 @@ import (
|
|||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/util"
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
"github.com/rancher/dynamiclistener"
|
||||
"github.com/rancher/dynamiclistener/factory"
|
||||
|
@ -24,7 +23,6 @@ import (
|
|||
"github.com/rancher/wrangler/pkg/generated/controllers/core"
|
||||
"github.com/sirupsen/logrus"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilsnet "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
// newListener returns a new TCP listener and HTTP request handler using dynamiclistener.
|
||||
|
@ -43,11 +41,7 @@ func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler,
|
|||
os.Remove(filepath.Join(c.config.DataDir, "tls/dynamic-cert.json"))
|
||||
}
|
||||
}
|
||||
ip := c.config.BindAddress
|
||||
if utilsnet.IsIPv6String(ip) {
|
||||
ip = fmt.Sprintf("[%s]", ip)
|
||||
}
|
||||
tcp, err := dynamiclistener.NewTCPListener(ip, c.config.SupervisorPort)
|
||||
tcp, err := util.ListenWithLoopback(ctx, c.config.BindAddress, strconv.Itoa(c.config.SupervisorPort))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -114,17 +108,6 @@ func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if c.config.EnablePProf {
|
||||
mux := mux.NewRouter().SkipClean(true)
|
||||
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
||||
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
||||
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
||||
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
||||
mux.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
|
||||
mux.NotFoundHandler = handler
|
||||
handler = mux
|
||||
}
|
||||
|
||||
// Create a HTTP server with the registered request handlers, using logrus for logging
|
||||
server := http.Server{
|
||||
Handler: handler,
|
||||
|
|
|
@ -41,6 +41,8 @@ type Node struct {
|
|||
ImageServiceEndpoint string
|
||||
NoFlannel bool
|
||||
SELinux bool
|
||||
EnablePProf bool
|
||||
SupervisorMetrics bool
|
||||
EmbeddedRegistry bool
|
||||
FlannelBackend string
|
||||
FlannelConfFile string
|
||||
|
@ -128,6 +130,8 @@ type Agent struct {
|
|||
AirgapExtraRegistry []string
|
||||
DisableCCM bool
|
||||
DisableNPC bool
|
||||
MinTLSVersion string
|
||||
CipherSuites []string
|
||||
Rootless bool
|
||||
ProtectKernelDefaults bool
|
||||
DisableServiceLB bool
|
||||
|
@ -159,6 +163,7 @@ type CriticalControlArgs struct {
|
|||
EgressSelectorMode string `cli:"egress-selector-mode"`
|
||||
ServiceIPRange *net.IPNet `cli:"service-cidr"`
|
||||
ServiceIPRanges []*net.IPNet `cli:"service-cidr"`
|
||||
SupervisorMetrics bool `cli:"supervisor-metrics"`
|
||||
}
|
||||
|
||||
type Control struct {
|
||||
|
@ -191,7 +196,6 @@ type Control struct {
|
|||
DisableServiceLB bool
|
||||
Rootless bool
|
||||
ServiceLBNamespace string
|
||||
EnablePProf bool
|
||||
ExtraAPIArgs []string
|
||||
ExtraControllerArgs []string
|
||||
ExtraCloudControllerArgs []string
|
||||
|
@ -208,8 +212,10 @@ type Control struct {
|
|||
ClusterResetRestorePath string
|
||||
EncryptForce bool
|
||||
EncryptSkip bool
|
||||
TLSMinVersion uint16
|
||||
TLSCipherSuites []uint16
|
||||
MinTLSVersion string
|
||||
CipherSuites []string
|
||||
TLSMinVersion uint16 `json:"-"`
|
||||
TLSCipherSuites []uint16 `json:"-"`
|
||||
EtcdSnapshotName string `json:"-"`
|
||||
EtcdDisableSnapshots bool `json:"-"`
|
||||
EtcdExposeMetrics bool `json:"-"`
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/k3s-io/k3s/pkg/agent/https"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
)
|
||||
|
||||
// DefaultRegisterer is the implementation of the
|
||||
// prometheus Registerer interface that all metrics operations
|
||||
// will use.
|
||||
var DefaultRegisterer = legacyregistry.Registerer()
|
||||
|
||||
// DefaultGatherer is the implementation of the
|
||||
// prometheus Gatherere interface that all metrics operations
|
||||
// will use.
|
||||
var DefaultGatherer = legacyregistry.DefaultGatherer
|
||||
|
||||
// DefaultMetrics is the default instance of a Metrics server
|
||||
var DefaultMetrics = &Config{
|
||||
Router: func(context.Context, *config.Node) (*mux.Router, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
},
|
||||
}
|
||||
|
||||
// Config holds fields for the metrics listener
|
||||
type Config struct {
|
||||
// Router will be called to add the metrics API handler to an existing router.
|
||||
Router https.RouterFunc
|
||||
}
|
||||
|
||||
// Start starts binds the metrics API to an existing HTTP router.
|
||||
func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
|
||||
mRouter, err := c.Router(ctx, nodeConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mRouter.Handle("/metrics", promhttp.HandlerFor(DefaultGatherer, promhttp.HandlerOpts{}))
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package profile
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http/pprof"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/k3s-io/k3s/pkg/agent/https"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
)
|
||||
|
||||
// DefaultProfiler the default instance of a performance profiling server
|
||||
var DefaultProfiler = &Config{
|
||||
Router: func(context.Context, *config.Node) (*mux.Router, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
},
|
||||
}
|
||||
|
||||
// Config holds fields for the pprof listener
|
||||
type Config struct {
|
||||
// Router will be called to add the pprof API handler to an existing router.
|
||||
Router https.RouterFunc
|
||||
}
|
||||
|
||||
// Start starts binds the pprof API to an existing HTTP router.
|
||||
func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
|
||||
mRouter, err := c.Router(ctx, nodeConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mRouter.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
||||
mRouter.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
||||
mRouter.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
||||
mRouter.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
||||
mRouter.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
|
||||
return nil
|
||||
}
|
|
@ -13,13 +13,12 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/containerd/containerd/remotes/docker"
|
||||
"github.com/k3s-io/k3s/pkg/agent/https"
|
||||
"github.com/k3s-io/k3s/pkg/clientaccess"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
"github.com/rancher/dynamiclistener/cert"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||
"k8s.io/apiserver/pkg/authentication/request/union"
|
||||
"k8s.io/utils/ptr"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
|
@ -43,11 +42,8 @@ import (
|
|||
// DefaultRegistry is the default instance of a Spegel distributed registry
|
||||
var DefaultRegistry = &Config{
|
||||
Bootstrapper: NewSelfBootstrapper(),
|
||||
HandlerFunc: func(_ *Config, _ *mux.Router) error {
|
||||
return errors.New("not implemented")
|
||||
},
|
||||
AuthFunc: func() authenticator.Request {
|
||||
return union.New(nil)
|
||||
Router: func(context.Context, *config.Node) (*mux.Router, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -60,9 +56,6 @@ var (
|
|||
resolveLatestTag = false
|
||||
)
|
||||
|
||||
type authFunc func() authenticator.Request
|
||||
type handlerFunc func(config *Config, router *mux.Router) error
|
||||
|
||||
// Config holds fields for a distributed registry
|
||||
type Config struct {
|
||||
ClientCAFile string
|
||||
|
@ -89,10 +82,7 @@ type Config struct {
|
|||
Bootstrapper routing.Bootstrapper
|
||||
|
||||
// HandlerFunc will be called to add the registry API handler to an existing router.
|
||||
HandlerFunc handlerFunc
|
||||
|
||||
// Authenticator will be called to retrieve an authenticator used to validate the request to the registry API.
|
||||
AuthFunc authFunc
|
||||
Router https.RouterFunc
|
||||
}
|
||||
|
||||
// These values are not currently configurable
|
||||
|
@ -237,13 +227,12 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
|
|||
// Track images available in containerd and publish via p2p router
|
||||
go state.Track(ctx, ociClient, router, resolveLatestTag)
|
||||
|
||||
mRouter := mux.NewRouter().SkipClean(true)
|
||||
mRouter.Use(c.authMiddleware())
|
||||
mRouter.PathPrefix("/v2").Handler(regSvr.Handler)
|
||||
mRouter.PathPrefix("/v1-" + version.Program + "/p2p").Handler(c.peerInfo())
|
||||
if err := c.HandlerFunc(c, mRouter); err != nil {
|
||||
mRouter, err := c.Router(ctx, nodeConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mRouter.PathPrefix("/v2").Handler(regSvr.Handler)
|
||||
mRouter.PathPrefix("/v1-" + version.Program + "/p2p").Handler(c.peerInfo())
|
||||
|
||||
// Wait up to 5 seconds for the p2p network to find peers. This will return
|
||||
// immediately if the node is bootstrapping from itself.
|
||||
|
@ -269,16 +258,3 @@ func (c *Config) peerInfo() http.HandlerFunc {
|
|||
fmt.Fprintf(resp, "%s/p2p/%s", info.Addrs[0].String(), info.ID.String())
|
||||
})
|
||||
}
|
||||
|
||||
// authMiddleware calls the configured authenticator to gate access to the registry API
|
||||
func (c *Config) authMiddleware() mux.MiddlewareFunc {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
|
||||
if _, ok, err := c.AuthFunc().AuthenticateRequest(req); !ok || err != nil {
|
||||
http.Error(resp, "Unauthorized", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
next.ServeHTTP(resp, req)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
111
pkg/util/net.go
111
pkg/util/net.go
|
@ -1,12 +1,15 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/rancher/wrangler/pkg/merr"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/urfave/cli"
|
||||
apinet "k8s.io/apimachinery/pkg/util/net"
|
||||
|
@ -319,3 +322,111 @@ func getIPFromInterface(ifaceName string) (string, error) {
|
|||
|
||||
return "", fmt.Errorf("can't find ip for interface %s", ifaceName)
|
||||
}
|
||||
|
||||
type multiListener struct {
|
||||
listeners []net.Listener
|
||||
closing chan struct{}
|
||||
conns chan acceptRes
|
||||
}
|
||||
|
||||
type acceptRes struct {
|
||||
conn net.Conn
|
||||
err error
|
||||
}
|
||||
|
||||
// explicit interface check
|
||||
var _ net.Listener = &multiListener{}
|
||||
|
||||
var loopbacks = []string{"127.0.0.1", "::1"}
|
||||
|
||||
// ListenWithLoopback listens on the given address, as well as on IPv4 and IPv6 loopback addresses.
|
||||
// If the address is a wildcard, the listener is return unwrapped.
|
||||
func ListenWithLoopback(ctx context.Context, addr string, port string) (net.Listener, error) {
|
||||
lc := &net.ListenConfig{
|
||||
KeepAlive: 3 * time.Minute,
|
||||
Control: permitReuse,
|
||||
}
|
||||
l, err := lc.Listen(ctx, "tcp", net.JoinHostPort(addr, port))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If we're listening on a wildcard address, we don't need to wrap with the other loopback addresses
|
||||
switch addr {
|
||||
case "", "::", "0.0.0.0":
|
||||
return l, nil
|
||||
}
|
||||
|
||||
ml := &multiListener{
|
||||
listeners: []net.Listener{l},
|
||||
closing: make(chan struct{}),
|
||||
conns: make(chan acceptRes),
|
||||
}
|
||||
|
||||
for _, laddr := range loopbacks {
|
||||
if laddr == addr {
|
||||
continue
|
||||
}
|
||||
if l, err := lc.Listen(ctx, "tcp", net.JoinHostPort(laddr, port)); err == nil {
|
||||
ml.listeners = append(ml.listeners, l)
|
||||
} else {
|
||||
logrus.Debugf("Failed to listen on %s: %v", net.JoinHostPort(laddr, port), err)
|
||||
}
|
||||
}
|
||||
|
||||
for i := range ml.listeners {
|
||||
go ml.accept(ml.listeners[i])
|
||||
}
|
||||
|
||||
return ml, nil
|
||||
}
|
||||
|
||||
// Addr returns the address of the non-loopback address that this multiListener is listening on
|
||||
func (ml *multiListener) Addr() net.Addr {
|
||||
return ml.listeners[0].Addr()
|
||||
}
|
||||
|
||||
// Close closes all the listeners
|
||||
func (ml *multiListener) Close() error {
|
||||
close(ml.closing)
|
||||
var errs merr.Errors
|
||||
for i := range ml.listeners {
|
||||
err := ml.listeners[i].Close()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return merr.NewErrors(errs)
|
||||
}
|
||||
|
||||
// Accept returns a Conn/err pair from one of the waiting listeners
|
||||
func (ml *multiListener) Accept() (net.Conn, error) {
|
||||
select {
|
||||
case res, ok := <-ml.conns:
|
||||
if ok {
|
||||
return res.conn, res.err
|
||||
}
|
||||
return nil, fmt.Errorf("connection channel closed")
|
||||
case <-ml.closing:
|
||||
return nil, fmt.Errorf("listener closed")
|
||||
}
|
||||
}
|
||||
|
||||
// accept runs a loop, accepting connections and trying to send on the result channel
|
||||
func (ml *multiListener) accept(listener net.Listener) {
|
||||
for {
|
||||
conn, err := listener.Accept()
|
||||
r := acceptRes{
|
||||
conn: conn,
|
||||
err: err,
|
||||
}
|
||||
select {
|
||||
case ml.conns <- r:
|
||||
case <-ml.closing:
|
||||
if r.err == nil {
|
||||
r.conn.Close()
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// permitReuse enables port and address sharing on the socket
|
||||
func permitReuse(network, addr string, conn syscall.RawConn) error {
|
||||
return conn.Control(func(fd uintptr) {
|
||||
syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1)
|
||||
syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEADDR, 1)
|
||||
})
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
//go:build windows
|
||||
// +build windows
|
||||
|
||||
package util
|
||||
|
||||
import "syscall"
|
||||
|
||||
// permitReuse is a no-op; port and address reuse is not supported on Windows
|
||||
func permitReuse(network, addr string, conn syscall.RawConn) error {
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue