mirror of https://github.com/k3s-io/k3s
fix: failed to close kubelet->API connections on heartbeat failure
parent
4ccdc8b71b
commit
bbeb6f9df0
|
@ -133,6 +133,7 @@ go_library(
|
||||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
|
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/util/certificate:go_default_library",
|
"//staging/src/k8s.io/client-go/util/certificate:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/util/connrotation:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/util/keyutil:go_default_library",
|
"//staging/src/k8s.io/client-go/util/keyutil:go_default_library",
|
||||||
"//staging/src/k8s.io/cloud-provider:go_default_library",
|
"//staging/src/k8s.io/cloud-provider:go_default_library",
|
||||||
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
|
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
|
||||||
|
|
|
@ -54,6 +54,7 @@ import (
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
certutil "k8s.io/client-go/util/cert"
|
certutil "k8s.io/client-go/util/cert"
|
||||||
"k8s.io/client-go/util/certificate"
|
"k8s.io/client-go/util/certificate"
|
||||||
|
"k8s.io/client-go/util/connrotation"
|
||||||
"k8s.io/client-go/util/keyutil"
|
"k8s.io/client-go/util/keyutil"
|
||||||
cloudprovider "k8s.io/cloud-provider"
|
cloudprovider "k8s.io/cloud-provider"
|
||||||
cliflag "k8s.io/component-base/cli/flag"
|
cliflag "k8s.io/component-base/cli/flag"
|
||||||
|
@ -560,6 +561,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if closeAllConns == nil {
|
||||||
|
return errors.New("closeAllConns must be a valid function other than nil")
|
||||||
|
}
|
||||||
kubeDeps.OnHeartbeatFailure = closeAllConns
|
kubeDeps.OnHeartbeatFailure = closeAllConns
|
||||||
|
|
||||||
kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
|
kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
|
||||||
|
@ -795,8 +799,21 @@ func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
kubeClientConfigOverrides(s, clientConfig)
|
kubeClientConfigOverrides(s, clientConfig)
|
||||||
|
closeAllConns, err := updateDialer(clientConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
return clientConfig, closeAllConns, nil
|
||||||
|
}
|
||||||
|
|
||||||
return clientConfig, nil, nil
|
// updateDialer instruments a restconfig with a dial. the returned function allows forcefully closing all active connections.
|
||||||
|
func updateDialer(clientConfig *restclient.Config) (func(), error) {
|
||||||
|
if clientConfig.Transport != nil || clientConfig.Dial != nil {
|
||||||
|
return nil, fmt.Errorf("there is already a transport or dialer configured")
|
||||||
|
}
|
||||||
|
d := connrotation.NewDialer((&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext)
|
||||||
|
clientConfig.Dial = d.DialContext
|
||||||
|
return d.CloseAll, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildClientCertificateManager creates a certificate manager that will use certConfig to request a client certificate
|
// buildClientCertificateManager creates a certificate manager that will use certConfig to request a client certificate
|
||||||
|
|
Loading…
Reference in New Issue