mirror of https://github.com/k3s-io/k3s
Revert "Merge pull request #31297 from mikedanese/revert-kubelet"
This reverts the revert of #30090 and #31282.pull/6/head
parent
c958d3d4fd
commit
2e989a3c38
|
@ -40,7 +40,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/resource"
|
"k8s.io/kubernetes/pkg/api/resource"
|
||||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||||
kubeExternal "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
|
v1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
|
||||||
"k8s.io/kubernetes/pkg/capabilities"
|
"k8s.io/kubernetes/pkg/capabilities"
|
||||||
"k8s.io/kubernetes/pkg/client/chaosclient"
|
"k8s.io/kubernetes/pkg/client/chaosclient"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
|
@ -64,6 +64,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/server"
|
"k8s.io/kubernetes/pkg/kubelet/server"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
utilconfig "k8s.io/kubernetes/pkg/util/config"
|
utilconfig "k8s.io/kubernetes/pkg/util/config"
|
||||||
"k8s.io/kubernetes/pkg/util/configz"
|
"k8s.io/kubernetes/pkg/util/configz"
|
||||||
"k8s.io/kubernetes/pkg/util/crypto"
|
"k8s.io/kubernetes/pkg/util/crypto"
|
||||||
|
@ -73,7 +74,7 @@ import (
|
||||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||||
"k8s.io/kubernetes/pkg/util/oom"
|
"k8s.io/kubernetes/pkg/util/oom"
|
||||||
"k8s.io/kubernetes/pkg/util/rlimit"
|
"k8s.io/kubernetes/pkg/util/rlimit"
|
||||||
"k8s.io/kubernetes/pkg/util/runtime"
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/version"
|
"k8s.io/kubernetes/pkg/version"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
|
@ -307,6 +308,117 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getKubeClient(s *options.KubeletServer) (*clientset.Clientset, error) {
|
||||||
|
clientConfig, err := CreateAPIServerClientConfig(s)
|
||||||
|
if err == nil {
|
||||||
|
kubeClient, err := clientset.NewForConfig(clientConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return kubeClient, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tries to download the kubelet-<node-name> configmap from "kube-system" namespace via the API server and returns a JSON string or error
|
||||||
|
func getRemoteKubeletConfig(s *options.KubeletServer, kcfg *KubeletConfig) (string, error) {
|
||||||
|
// TODO(mtaufen): should probably cache clientset and pass into this function rather than regenerate on every request
|
||||||
|
kubeClient, err := getKubeClient(s)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
configmap, err := func() (*api.ConfigMap, error) {
|
||||||
|
var nodename string
|
||||||
|
hostname := nodeutil.GetHostname(s.HostnameOverride)
|
||||||
|
|
||||||
|
if kcfg != nil && kcfg.Cloud != nil {
|
||||||
|
instances, ok := kcfg.Cloud.Instances()
|
||||||
|
if !ok {
|
||||||
|
err = fmt.Errorf("failed to get instances from cloud provider, can't determine nodename.")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
nodename, err = instances.CurrentNodeName(hostname)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// look for kubelet-<node-name> configmap from "kube-system"
|
||||||
|
configmap, err := kubeClient.CoreClient.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", nodename))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return configmap, nil
|
||||||
|
}
|
||||||
|
// No cloud provider yet, so can't get the nodename via Cloud.Instances().CurrentNodeName(hostname), try just using the hostname
|
||||||
|
configmap, err := kubeClient.CoreClient.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", hostname))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cloud provider was nil, and attempt to use hostname to find config resulted in: %v", err)
|
||||||
|
}
|
||||||
|
return configmap, nil
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// When we create the KubeletConfiguration configmap, we put a json string
|
||||||
|
// representation of the config in a `kubelet.config` key.
|
||||||
|
jsonstr, ok := configmap.Data["kubelet.config"]
|
||||||
|
if !ok {
|
||||||
|
return "", fmt.Errorf("KubeletConfiguration configmap did not contain a value with key `kubelet.config`")
|
||||||
|
}
|
||||||
|
|
||||||
|
return jsonstr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func startKubeletConfigSyncLoop(s *options.KubeletServer, currentKC string) {
|
||||||
|
glog.Infof("Starting Kubelet configuration sync loop")
|
||||||
|
go func() {
|
||||||
|
wait.PollInfinite(30*time.Second, func() (bool, error) {
|
||||||
|
glog.Infof("Checking API server for new Kubelet configuration.")
|
||||||
|
remoteKC, err := getRemoteKubeletConfig(s, nil)
|
||||||
|
if err == nil {
|
||||||
|
// Detect new config by comparing with the last JSON string we extracted.
|
||||||
|
if remoteKC != currentKC {
|
||||||
|
glog.Info("Found new Kubelet configuration via API server, restarting!")
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.Infof("Did not find a configuration for this Kubelet via API server: %v", err)
|
||||||
|
}
|
||||||
|
return false, nil // Always return (false, nil) so we poll forever.
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to check for config on the API server, return that config if we get it, and start
|
||||||
|
// a background thread that checks for updates to configs.
|
||||||
|
func initKubeletConfigSync(s *options.KubeletServer) (*componentconfig.KubeletConfiguration, error) {
|
||||||
|
jsonstr, err := getRemoteKubeletConfig(s, nil)
|
||||||
|
if err == nil {
|
||||||
|
// We will compare future API server config against the config we just got (jsonstr):
|
||||||
|
startKubeletConfigSyncLoop(s, jsonstr)
|
||||||
|
|
||||||
|
// Convert json from API server to external type struct, and convert that to internal type struct
|
||||||
|
extKC := v1alpha1.KubeletConfiguration{}
|
||||||
|
err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(jsonstr), &extKC)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
kc := componentconfig.KubeletConfiguration{}
|
||||||
|
err = api.Scheme.Convert(&extKC, &kc, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &kc, nil
|
||||||
|
} else {
|
||||||
|
// Couldn't get a configuration from the API server yet.
|
||||||
|
// Restart as soon as anything comes back from the API server.
|
||||||
|
startKubeletConfigSyncLoop(s, "")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Run runs the specified KubeletServer for the given KubeletConfig. This should never exit.
|
// Run runs the specified KubeletServer for the given KubeletConfig. This should never exit.
|
||||||
// The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer.
|
// The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer.
|
||||||
// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults
|
// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults
|
||||||
|
@ -327,6 +439,22 @@ func checkPermissions() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setConfigz(cz *configz.Config, kc *componentconfig.KubeletConfiguration) {
|
||||||
|
tmp := v1alpha1.KubeletConfiguration{}
|
||||||
|
api.Scheme.Convert(kc, &tmp, nil)
|
||||||
|
cz.Set(tmp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func initConfigz(kc *componentconfig.KubeletConfiguration) (*configz.Config, error) {
|
||||||
|
cz, err := configz.New("componentconfig")
|
||||||
|
if err == nil {
|
||||||
|
setConfigz(cz, kc)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("unable to register configz: %s", err)
|
||||||
|
}
|
||||||
|
return cz, err
|
||||||
|
}
|
||||||
|
|
||||||
func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
|
func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
|
||||||
if s.ExitOnLockContention && s.LockFilePath == "" {
|
if s.ExitOnLockContention && s.LockFilePath == "" {
|
||||||
return errors.New("cannot exit on lock file contention: no lock file specified")
|
return errors.New("cannot exit on lock file contention: no lock file specified")
|
||||||
|
@ -345,18 +473,38 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if c, err := configz.New("componentconfig"); err == nil {
|
|
||||||
c.Set(s.KubeletConfiguration)
|
// Register current configuration with /configz endpoint
|
||||||
} else {
|
cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration)
|
||||||
glog.Errorf("unable to register configz: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if kcfg == nil {
|
if kcfg == nil {
|
||||||
|
if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() {
|
||||||
|
// Look for config on the API server. If it exists, replace s.KubeletConfiguration
|
||||||
|
// with it and continue. initKubeletConfigSync also starts the background thread that checks for new config.
|
||||||
|
|
||||||
|
// Don't do dynamic Kubelet configuration in runonce mode
|
||||||
|
if s.RunOnce == false {
|
||||||
|
// For now we only do dynamic config when kcfg is passed as nil, because we don't want to disturb
|
||||||
|
// any special relationship between the values in s (KubeletServer) and kcfg (KubeletConfig).
|
||||||
|
remoteKC, err := initKubeletConfigSync(s)
|
||||||
|
if err == nil {
|
||||||
|
// Update s (KubeletServer) with new config from API server
|
||||||
|
s.KubeletConfiguration = *remoteKC
|
||||||
|
// Ensure that /configz is up to date with the new config
|
||||||
|
if cfgzErr != nil {
|
||||||
|
glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr)
|
||||||
|
} else {
|
||||||
|
setConfigz(cfgz, &s.KubeletConfiguration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var kubeClient, eventClient *clientset.Clientset
|
var kubeClient, eventClient *clientset.Clientset
|
||||||
var autoDetectCloudProvider bool
|
var autoDetectCloudProvider bool
|
||||||
var cloud cloudprovider.Interface
|
var cloud cloudprovider.Interface
|
||||||
|
|
||||||
if s.CloudProvider == kubeExternal.AutoDetectCloudProvider {
|
if s.CloudProvider == v1alpha1.AutoDetectCloudProvider {
|
||||||
autoDetectCloudProvider = true
|
autoDetectCloudProvider = true
|
||||||
} else {
|
} else {
|
||||||
cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
||||||
|
@ -441,7 +589,8 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
|
||||||
glog.Error(err)
|
glog.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runtime.ReallyCrash = s.ReallyCrashForTesting
|
utilruntime.ReallyCrash = s.ReallyCrashForTesting
|
||||||
|
|
||||||
rand.Seed(time.Now().UTC().UnixNano())
|
rand.Seed(time.Now().UTC().UnixNano())
|
||||||
|
|
||||||
// TODO(vmarmol): Do this through container config.
|
// TODO(vmarmol): Do this through container config.
|
||||||
|
|
|
@ -40,6 +40,7 @@ const (
|
||||||
// AllAlpha=true,NewFeature=false will result in newFeature=false
|
// AllAlpha=true,NewFeature=false will result in newFeature=false
|
||||||
allAlphaGate = "AllAlpha"
|
allAlphaGate = "AllAlpha"
|
||||||
externalTrafficLocalOnly = "AllowExtTrafficLocalEndpoints"
|
externalTrafficLocalOnly = "AllowExtTrafficLocalEndpoints"
|
||||||
|
dynamicKubeletConfig = "DynamicKubeletConfig"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -48,6 +49,7 @@ var (
|
||||||
knownFeatures = map[string]featureSpec{
|
knownFeatures = map[string]featureSpec{
|
||||||
allAlphaGate: {false, alpha},
|
allAlphaGate: {false, alpha},
|
||||||
externalTrafficLocalOnly: {false, alpha},
|
externalTrafficLocalOnly: {false, alpha},
|
||||||
|
dynamicKubeletConfig: {false, alpha},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Special handling for a few gates.
|
// Special handling for a few gates.
|
||||||
|
@ -92,6 +94,7 @@ type FeatureGate interface {
|
||||||
ExternalTrafficLocalOnly() bool
|
ExternalTrafficLocalOnly() bool
|
||||||
|
|
||||||
// TODO: Define accessors for each non-API alpha feature.
|
// TODO: Define accessors for each non-API alpha feature.
|
||||||
|
DynamicKubeletConfig() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// featureGate implements FeatureGate as well as pflag.Value for flag parsing.
|
// featureGate implements FeatureGate as well as pflag.Value for flag parsing.
|
||||||
|
@ -165,6 +168,11 @@ func (f *featureGate) ExternalTrafficLocalOnly() bool {
|
||||||
return f.lookup(externalTrafficLocalOnly)
|
return f.lookup(externalTrafficLocalOnly)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DynamicKubeletConfig returns value for dynamicKubeletConfig
|
||||||
|
func (f *featureGate) DynamicKubeletConfig() bool {
|
||||||
|
return f.lookup(dynamicKubeletConfig)
|
||||||
|
}
|
||||||
|
|
||||||
func (f *featureGate) lookup(key string) bool {
|
func (f *featureGate) lookup(key string) bool {
|
||||||
defaultValue := f.known[key].enabled
|
defaultValue := f.known[key].enabled
|
||||||
if f.enabled != nil {
|
if f.enabled != nil {
|
||||||
|
|
|
@ -0,0 +1,195 @@
|
||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package e2e_node
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||||
|
"k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
|
||||||
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This test is marked [Disruptive] because the Kubelet temporarily goes down as part of of this test.
|
||||||
|
var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:dynamicKubeletConfig] [Serial] [Disruptive]", func() {
|
||||||
|
f := framework.NewDefaultFramework("dynamic-kubelet-configuration-test")
|
||||||
|
|
||||||
|
Context("When a configmap called `kubelet-<node-name>` is added to the `kube-system` namespace", func() {
|
||||||
|
It("The Kubelet on that node should restart to take up the new config", func() {
|
||||||
|
const (
|
||||||
|
restartGap = 40 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// Get the current KubeletConfiguration (known to be valid) by
|
||||||
|
// querying the configz endpoint for the current node.
|
||||||
|
resp := pollConfigz(2*time.Minute, 5*time.Second)
|
||||||
|
kubeCfg, err := decodeConfigz(resp)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
glog.Infof("KubeletConfiguration - Initial values: %+v", *kubeCfg)
|
||||||
|
|
||||||
|
// Change a safe value e.g. file check frequency.
|
||||||
|
// Make sure we're providing a value distinct from the current one.
|
||||||
|
oldFileCheckFrequency := kubeCfg.FileCheckFrequency.Duration
|
||||||
|
newFileCheckFrequency := 11 * time.Second
|
||||||
|
if kubeCfg.FileCheckFrequency.Duration == newFileCheckFrequency {
|
||||||
|
newFileCheckFrequency = 10 * time.Second
|
||||||
|
}
|
||||||
|
kubeCfg.FileCheckFrequency.Duration = newFileCheckFrequency
|
||||||
|
|
||||||
|
// Use the new config to create a new kube-<node-name> configmap in `kube-system` namespace.
|
||||||
|
_, err = createConfigMap(f, kubeCfg)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
|
// Give the Kubelet time to see that there is new config and restart. If we don't do this,
|
||||||
|
// the Kubelet will still have the old config when we poll, and the test will fail.
|
||||||
|
time.Sleep(restartGap)
|
||||||
|
|
||||||
|
// Use configz to get the new config.
|
||||||
|
resp = pollConfigz(2*time.Minute, 5*time.Second)
|
||||||
|
kubeCfg, err = decodeConfigz(resp)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
glog.Infof("KubeletConfiguration - After modification of FileCheckFrequency: %+v", *kubeCfg)
|
||||||
|
|
||||||
|
// We expect to see the new value in the new config.
|
||||||
|
Expect(kubeCfg.FileCheckFrequency.Duration).To(Equal(newFileCheckFrequency))
|
||||||
|
|
||||||
|
// Change the config back to what it originally was.
|
||||||
|
kubeCfg.FileCheckFrequency.Duration = oldFileCheckFrequency
|
||||||
|
_, err = updateConfigMap(f, kubeCfg)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
|
// Give the Kubelet time to see that there is new config and restart. If we don't do this,
|
||||||
|
// the Kubelet will still have the old config when we poll, and the test will fail.
|
||||||
|
time.Sleep(restartGap)
|
||||||
|
|
||||||
|
// User configz to get the new config.
|
||||||
|
resp = pollConfigz(2*time.Minute, 5*time.Second)
|
||||||
|
kubeCfg, err = decodeConfigz(resp)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
glog.Infof("KubeletConfiguration - After restoration of FileCheckFrequency: %+v", *kubeCfg)
|
||||||
|
|
||||||
|
// We expect to see the original value restored in the new config.
|
||||||
|
Expect(kubeCfg.FileCheckFrequency.Duration).To(Equal(oldFileCheckFrequency))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// This function either causes the test to fail, or it returns a status 200 response.
|
||||||
|
func pollConfigz(timeout time.Duration, pollInterval time.Duration) *http.Response {
|
||||||
|
endpoint := fmt.Sprintf("http://127.0.0.1:8080/api/v1/proxy/nodes/%s/configz", framework.TestContext.NodeName)
|
||||||
|
client := &http.Client{}
|
||||||
|
req, err := http.NewRequest("GET", endpoint, nil)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
req.Header.Add("Accept", "application/json")
|
||||||
|
|
||||||
|
var resp *http.Response
|
||||||
|
Eventually(func() bool {
|
||||||
|
resp, err = client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to get /configz, retrying. Error: %v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
glog.Errorf("/configz response status not 200, retrying. Response was: %+v", resp)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}, timeout, pollInterval).Should(Equal(true))
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decodes the http response from /configz and returns a componentconfig.KubeletConfiguration (internal type).
|
||||||
|
func decodeConfigz(resp *http.Response) (*componentconfig.KubeletConfiguration, error) {
|
||||||
|
// This hack because /configz reports the following structure:
|
||||||
|
// {"componentconfig": {the JSON representation of v1alpha1.KubeletConfiguration}}
|
||||||
|
type configzWrapper struct {
|
||||||
|
ComponentConfig v1alpha1.KubeletConfiguration `json:"componentconfig"`
|
||||||
|
}
|
||||||
|
|
||||||
|
configz := configzWrapper{}
|
||||||
|
kubeCfg := componentconfig.KubeletConfiguration{}
|
||||||
|
|
||||||
|
contentsBytes, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.Unmarshal(contentsBytes, &configz)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = api.Scheme.Convert(&configz.ComponentConfig, &kubeCfg, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &kubeCfg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Uses KubeletConfiguration to create a `kubelet-<node-name>` ConfigMap in the "kube-system" namespace.
|
||||||
|
func createConfigMap(f *framework.Framework, kubeCfg *componentconfig.KubeletConfiguration) (*api.ConfigMap, error) {
|
||||||
|
kubeCfgExt := v1alpha1.KubeletConfiguration{}
|
||||||
|
api.Scheme.Convert(kubeCfg, &kubeCfgExt, nil)
|
||||||
|
|
||||||
|
bytes, err := json.Marshal(kubeCfgExt)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
|
cmap, err := f.Client.ConfigMaps("kube-system").Create(&api.ConfigMap{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("kubelet-%s", framework.TestContext.NodeName),
|
||||||
|
},
|
||||||
|
Data: map[string]string{
|
||||||
|
"kubelet.config": string(bytes),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return cmap, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Similar to createConfigMap, except this updates an existing ConfigMap.
|
||||||
|
func updateConfigMap(f *framework.Framework, kubeCfg *componentconfig.KubeletConfiguration) (*api.ConfigMap, error) {
|
||||||
|
kubeCfgExt := v1alpha1.KubeletConfiguration{}
|
||||||
|
api.Scheme.Convert(kubeCfg, &kubeCfgExt, nil)
|
||||||
|
|
||||||
|
bytes, err := json.Marshal(kubeCfgExt)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
|
cmap, err := f.Client.ConfigMaps("kube-system").Update(&api.ConfigMap{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("kubelet-%s", framework.TestContext.NodeName),
|
||||||
|
},
|
||||||
|
Data: map[string]string{
|
||||||
|
"kubelet.config": string(bytes),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return cmap, nil
|
||||||
|
}
|
|
@ -87,7 +87,7 @@ func (e *E2EServices) Start() error {
|
||||||
"--manifest-path", framework.TestContext.ManifestPath,
|
"--manifest-path", framework.TestContext.ManifestPath,
|
||||||
"--eviction-hard", framework.TestContext.EvictionHard,
|
"--eviction-hard", framework.TestContext.EvictionHard,
|
||||||
)
|
)
|
||||||
e.services = newServer("services", startCmd, nil, getHealthCheckURLs(), servicesLogFile)
|
e.services = newServer("services", startCmd, nil, nil, getHealthCheckURLs(), servicesLogFile, false)
|
||||||
return e.services.start()
|
return e.services.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -335,7 +335,7 @@ func (es *e2eService) startNamespaceController() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (es *e2eService) startKubeletServer() (*server, error) {
|
func (es *e2eService) startKubeletServer() (*server, error) {
|
||||||
var killCommand *exec.Cmd
|
var killCommand, restartCommand *exec.Cmd
|
||||||
cmdArgs := []string{}
|
cmdArgs := []string{}
|
||||||
if systemdRun, err := exec.LookPath("systemd-run"); err == nil {
|
if systemdRun, err := exec.LookPath("systemd-run"); err == nil {
|
||||||
// On systemd services, detection of a service / unit works reliably while
|
// On systemd services, detection of a service / unit works reliably while
|
||||||
|
@ -343,8 +343,9 @@ func (es *e2eService) startKubeletServer() (*server, error) {
|
||||||
// Since kubelet will typically be run as a service it also makes more
|
// Since kubelet will typically be run as a service it also makes more
|
||||||
// sense to test it that way
|
// sense to test it that way
|
||||||
unitName := fmt.Sprintf("kubelet-%d.service", rand.Int31())
|
unitName := fmt.Sprintf("kubelet-%d.service", rand.Int31())
|
||||||
cmdArgs = append(cmdArgs, systemdRun, "--unit="+unitName, getKubeletServerBin())
|
cmdArgs = append(cmdArgs, systemdRun, "--unit="+unitName, "--remain-after-exit", getKubeletServerBin())
|
||||||
killCommand = exec.Command("sudo", "systemctl", "kill", unitName)
|
killCommand = exec.Command("sudo", "systemctl", "kill", unitName)
|
||||||
|
restartCommand = exec.Command("sudo", "systemctl", "restart", unitName)
|
||||||
es.logFiles["kubelet.log"] = logFileData{
|
es.logFiles["kubelet.log"] = logFileData{
|
||||||
journalctlCommand: []string{"-u", unitName},
|
journalctlCommand: []string{"-u", unitName},
|
||||||
}
|
}
|
||||||
|
@ -372,6 +373,7 @@ func (es *e2eService) startKubeletServer() (*server, error) {
|
||||||
"--pod-cidr=10.180.0.0/24", // Assign a fixed CIDR to the node because there is no node controller.
|
"--pod-cidr=10.180.0.0/24", // Assign a fixed CIDR to the node because there is no node controller.
|
||||||
"--eviction-hard", framework.TestContext.EvictionHard,
|
"--eviction-hard", framework.TestContext.EvictionHard,
|
||||||
"--eviction-pressure-transition-period", "30s",
|
"--eviction-pressure-transition-period", "30s",
|
||||||
|
"--feature-gates", "DynamicKubeletConfig=true", // TODO(mtaufen): Eventually replace with a value from the framework.TestContext
|
||||||
)
|
)
|
||||||
if framework.TestContext.CgroupsPerQOS {
|
if framework.TestContext.CgroupsPerQOS {
|
||||||
// TODO: enable this when the flag is stable and available in kubelet.
|
// TODO: enable this when the flag is stable and available in kubelet.
|
||||||
|
@ -394,8 +396,10 @@ func (es *e2eService) startKubeletServer() (*server, error) {
|
||||||
"kubelet",
|
"kubelet",
|
||||||
cmd,
|
cmd,
|
||||||
killCommand,
|
killCommand,
|
||||||
|
restartCommand,
|
||||||
[]string{kubeletHealthCheckURL},
|
[]string{kubeletHealthCheckURL},
|
||||||
"kubelet.log")
|
"kubelet.log",
|
||||||
|
true)
|
||||||
return server, server.start()
|
return server, server.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -408,20 +412,32 @@ type server struct {
|
||||||
// killCommand is the command used to stop the server. It is not required. If it
|
// killCommand is the command used to stop the server. It is not required. If it
|
||||||
// is not specified, `sudo kill` will be used to stop the server.
|
// is not specified, `sudo kill` will be used to stop the server.
|
||||||
killCommand *exec.Cmd
|
killCommand *exec.Cmd
|
||||||
|
// restartCommand is the command used to restart the server. If provided, it will be used
|
||||||
|
// instead of startCommand when restarting the server.
|
||||||
|
restartCommand *exec.Cmd
|
||||||
// healthCheckUrls is the urls used to check whether the server is ready.
|
// healthCheckUrls is the urls used to check whether the server is ready.
|
||||||
healthCheckUrls []string
|
healthCheckUrls []string
|
||||||
// outFilename is the name of the log file. The stdout and stderr of the server
|
// outFilename is the name of the log file. The stdout and stderr of the server
|
||||||
// will be redirected to this file.
|
// will be redirected to this file.
|
||||||
outFilename string
|
outFilename string
|
||||||
|
// restartOnExit determines whether a restart loop is launched with the server
|
||||||
|
restartOnExit bool
|
||||||
|
// Writing to this channel, if it is not nil, stops the restart loop.
|
||||||
|
// When tearing down a server, you should check for this channel and write to it if it exists.
|
||||||
|
stopRestartingCh chan<- bool
|
||||||
|
// Read from this to confirm that the restart loop has stopped.
|
||||||
|
ackStopRestartingCh <-chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newServer(name string, start, kill *exec.Cmd, urls []string, filename string) *server {
|
func newServer(name string, start, kill, restart *exec.Cmd, urls []string, filename string, restartOnExit bool) *server {
|
||||||
return &server{
|
return &server{
|
||||||
name: name,
|
name: name,
|
||||||
startCommand: start,
|
startCommand: start,
|
||||||
killCommand: kill,
|
killCommand: kill,
|
||||||
|
restartCommand: restart,
|
||||||
healthCheckUrls: urls,
|
healthCheckUrls: urls,
|
||||||
outFilename: filename,
|
outFilename: filename,
|
||||||
|
restartOnExit: restartOnExit,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -434,8 +450,8 @@ func commandToString(c *exec.Cmd) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) String() string {
|
func (s *server) String() string {
|
||||||
return fmt.Sprintf("server %q start-command: `%s`, kill-command: `%s`, health-check: %v, output-file: %q", s.name,
|
return fmt.Sprintf("server %q start-command: `%s`, kill-command: `%s`, restart-command: `%s`, health-check: %v, output-file: %q", s.name,
|
||||||
commandToString(s.startCommand), commandToString(s.killCommand), s.healthCheckUrls, s.outFilename)
|
commandToString(s.startCommand), commandToString(s.killCommand), commandToString(s.restartCommand), s.healthCheckUrls, s.outFilename)
|
||||||
}
|
}
|
||||||
|
|
||||||
// readinessCheck checks whether services are ready via the health check urls. Once there is
|
// readinessCheck checks whether services are ready via the health check urls. Once there is
|
||||||
|
@ -481,8 +497,23 @@ func readinessCheck(urls []string, errCh <-chan error) error {
|
||||||
return fmt.Errorf("e2e service readiness check timeout %v", *serverStartTimeout)
|
return fmt.Errorf("e2e service readiness check timeout %v", *serverStartTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Note: restartOnExit == true requires len(s.healthCheckUrls) > 0 to work properly.
|
||||||
func (s *server) start() error {
|
func (s *server) start() error {
|
||||||
errCh := make(chan error)
|
errCh := make(chan error)
|
||||||
|
|
||||||
|
var stopRestartingCh, ackStopRestartingCh chan bool
|
||||||
|
if s.restartOnExit {
|
||||||
|
if len(s.healthCheckUrls) == 0 {
|
||||||
|
return fmt.Errorf("Tried to start %s which has s.restartOnExit == true, but no health check urls provided.", s)
|
||||||
|
}
|
||||||
|
|
||||||
|
stopRestartingCh = make(chan bool)
|
||||||
|
ackStopRestartingCh = make(chan bool)
|
||||||
|
|
||||||
|
s.stopRestartingCh = stopRestartingCh
|
||||||
|
s.ackStopRestartingCh = ackStopRestartingCh
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(errCh)
|
defer close(errCh)
|
||||||
|
|
||||||
|
@ -496,10 +527,9 @@ func (s *server) start() error {
|
||||||
defer outfile.Close()
|
defer outfile.Close()
|
||||||
defer outfile.Sync()
|
defer outfile.Sync()
|
||||||
|
|
||||||
cmd := s.startCommand
|
|
||||||
// Set the command to write the output file
|
// Set the command to write the output file
|
||||||
cmd.Stdout = outfile
|
s.startCommand.Stdout = outfile
|
||||||
cmd.Stderr = outfile
|
s.startCommand.Stderr = outfile
|
||||||
|
|
||||||
// Death of this test process should kill the server as well.
|
// Death of this test process should kill the server as well.
|
||||||
attrs := &syscall.SysProcAttr{}
|
attrs := &syscall.SysProcAttr{}
|
||||||
|
@ -511,14 +541,96 @@ func (s *server) start() error {
|
||||||
errCh <- fmt.Errorf("failed to set Pdeathsig field (non-linux build)")
|
errCh <- fmt.Errorf("failed to set Pdeathsig field (non-linux build)")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cmd.SysProcAttr = attrs
|
s.startCommand.SysProcAttr = attrs
|
||||||
|
|
||||||
// Run the command
|
// Start the command
|
||||||
err = cmd.Run()
|
err = s.startCommand.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errCh <- fmt.Errorf("failed to run server start command %q: %v", commandToString(cmd), err)
|
errCh <- fmt.Errorf("failed to run %s: %v", s, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if !s.restartOnExit {
|
||||||
|
// If we aren't planning on restarting, ok to Wait() here to release resources.
|
||||||
|
// Otherwise, we Wait() in the restart loop.
|
||||||
|
err = s.startCommand.Wait()
|
||||||
|
if err != nil {
|
||||||
|
errCh <- fmt.Errorf("failed to run %s: %v", s, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// New stuff
|
||||||
|
usedStartCmd := true
|
||||||
|
for {
|
||||||
|
// Wait for an initial health check to pass, so that we are sure the server started.
|
||||||
|
err := readinessCheck(s.healthCheckUrls, nil)
|
||||||
|
if err != nil {
|
||||||
|
if usedStartCmd {
|
||||||
|
s.startCommand.Wait() // Release resources if necessary.
|
||||||
|
}
|
||||||
|
// This should not happen, immediately stop the e2eService process.
|
||||||
|
glog.Fatalf("restart loop readinessCheck failed for %s", s)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initial health check passed, wait until a health check fails again.
|
||||||
|
stillAlive:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-stopRestartingCh:
|
||||||
|
ackStopRestartingCh <- true
|
||||||
|
return
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
for _, url := range s.healthCheckUrls {
|
||||||
|
resp, err := http.Get(url)
|
||||||
|
if err != nil || resp.StatusCode != http.StatusOK {
|
||||||
|
break stillAlive
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if usedStartCmd {
|
||||||
|
s.startCommand.Wait() // Release resources from last cmd
|
||||||
|
usedStartCmd = false
|
||||||
|
}
|
||||||
|
if s.restartCommand != nil {
|
||||||
|
// Always make a fresh copy of restartCommand before running, we may have to restart multiple times
|
||||||
|
s.restartCommand = &exec.Cmd{
|
||||||
|
Path: s.restartCommand.Path,
|
||||||
|
Args: s.restartCommand.Args,
|
||||||
|
Env: s.restartCommand.Env,
|
||||||
|
Dir: s.restartCommand.Dir,
|
||||||
|
Stdin: s.restartCommand.Stdin,
|
||||||
|
Stdout: s.restartCommand.Stdout,
|
||||||
|
Stderr: s.restartCommand.Stderr,
|
||||||
|
ExtraFiles: s.restartCommand.ExtraFiles,
|
||||||
|
SysProcAttr: s.restartCommand.SysProcAttr,
|
||||||
|
}
|
||||||
|
err = s.restartCommand.Run() // Run and wait for exit. This command is assumed to have short duration, e.g. systemctl restart
|
||||||
|
if err != nil {
|
||||||
|
// This should not happen, immediately stop the e2eService process.
|
||||||
|
glog.Fatalf("restarting %s with restartCommand failed. Error: %v.", s, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
s.startCommand = &exec.Cmd{
|
||||||
|
Path: s.startCommand.Path,
|
||||||
|
Args: s.startCommand.Args,
|
||||||
|
Env: s.startCommand.Env,
|
||||||
|
Dir: s.startCommand.Dir,
|
||||||
|
Stdin: s.startCommand.Stdin,
|
||||||
|
Stdout: s.startCommand.Stdout,
|
||||||
|
Stderr: s.startCommand.Stderr,
|
||||||
|
ExtraFiles: s.startCommand.ExtraFiles,
|
||||||
|
SysProcAttr: s.startCommand.SysProcAttr,
|
||||||
|
}
|
||||||
|
err = s.startCommand.Start()
|
||||||
|
usedStartCmd = true
|
||||||
|
if err != nil {
|
||||||
|
// This should not happen, immediately stop the e2eService process.
|
||||||
|
glog.Fatalf("restarting %s with startCommand failed. Error: %v.", s, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return readinessCheck(s.healthCheckUrls, errCh)
|
return readinessCheck(s.healthCheckUrls, errCh)
|
||||||
|
@ -528,6 +640,12 @@ func (s *server) kill() error {
|
||||||
name := s.name
|
name := s.name
|
||||||
cmd := s.startCommand
|
cmd := s.startCommand
|
||||||
|
|
||||||
|
// If s has a restart loop, turn it off.
|
||||||
|
if s.restartOnExit {
|
||||||
|
s.stopRestartingCh <- true
|
||||||
|
<-s.ackStopRestartingCh
|
||||||
|
}
|
||||||
|
|
||||||
if s.killCommand != nil {
|
if s.killCommand != nil {
|
||||||
return s.killCommand.Run()
|
return s.killCommand.Run()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue