diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 8ea2cac360..47890f8e47 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -40,7 +40,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/apis/componentconfig" - kubeExternal "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" + v1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" "k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/client/chaosclient" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -64,6 +64,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/server" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/runtime" utilconfig "k8s.io/kubernetes/pkg/util/config" "k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/crypto" @@ -73,7 +74,7 @@ import ( nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/rlimit" - "k8s.io/kubernetes/pkg/util/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" @@ -307,6 +308,117 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) { }, nil } +func getKubeClient(s *options.KubeletServer) (*clientset.Clientset, error) { + clientConfig, err := CreateAPIServerClientConfig(s) + if err == nil { + kubeClient, err := clientset.NewForConfig(clientConfig) + if err != nil { + return nil, err + } + return kubeClient, nil + } + return nil, err +} + +// Tries to download the kubelet- configmap from "kube-system" namespace via the API server and returns a JSON string or error +func getRemoteKubeletConfig(s *options.KubeletServer, kcfg *KubeletConfig) (string, error) { + // TODO(mtaufen): should probably cache clientset and pass into this function rather than regenerate on every request + kubeClient, err := getKubeClient(s) + if err != nil { + return "", err + } + + configmap, err := func() (*api.ConfigMap, error) { + var nodename string + hostname := nodeutil.GetHostname(s.HostnameOverride) + + if kcfg != nil && kcfg.Cloud != nil { + instances, ok := kcfg.Cloud.Instances() + if !ok { + err = fmt.Errorf("failed to get instances from cloud provider, can't determine nodename.") + return nil, err + } + nodename, err = instances.CurrentNodeName(hostname) + if err != nil { + err = fmt.Errorf("error fetching current instance name from cloud provider: %v", err) + return nil, err + } + // look for kubelet- configmap from "kube-system" + configmap, err := kubeClient.CoreClient.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", nodename)) + if err != nil { + return nil, err + } + return configmap, nil + } + // No cloud provider yet, so can't get the nodename via Cloud.Instances().CurrentNodeName(hostname), try just using the hostname + configmap, err := kubeClient.CoreClient.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", hostname)) + if err != nil { + return nil, fmt.Errorf("cloud provider was nil, and attempt to use hostname to find config resulted in: %v", err) + } + return configmap, nil + }() + if err != nil { + return "", err + } + + // When we create the KubeletConfiguration configmap, we put a json string + // representation of the config in a `kubelet.config` key. + jsonstr, ok := configmap.Data["kubelet.config"] + if !ok { + return "", fmt.Errorf("KubeletConfiguration configmap did not contain a value with key `kubelet.config`") + } + + return jsonstr, nil +} + +func startKubeletConfigSyncLoop(s *options.KubeletServer, currentKC string) { + glog.Infof("Starting Kubelet configuration sync loop") + go func() { + wait.PollInfinite(30*time.Second, func() (bool, error) { + glog.Infof("Checking API server for new Kubelet configuration.") + remoteKC, err := getRemoteKubeletConfig(s, nil) + if err == nil { + // Detect new config by comparing with the last JSON string we extracted. + if remoteKC != currentKC { + glog.Info("Found new Kubelet configuration via API server, restarting!") + os.Exit(0) + } + } else { + glog.Infof("Did not find a configuration for this Kubelet via API server: %v", err) + } + return false, nil // Always return (false, nil) so we poll forever. + }) + }() +} + +// Try to check for config on the API server, return that config if we get it, and start +// a background thread that checks for updates to configs. +func initKubeletConfigSync(s *options.KubeletServer) (*componentconfig.KubeletConfiguration, error) { + jsonstr, err := getRemoteKubeletConfig(s, nil) + if err == nil { + // We will compare future API server config against the config we just got (jsonstr): + startKubeletConfigSyncLoop(s, jsonstr) + + // Convert json from API server to external type struct, and convert that to internal type struct + extKC := v1alpha1.KubeletConfiguration{} + err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(jsonstr), &extKC) + if err != nil { + return nil, err + } + kc := componentconfig.KubeletConfiguration{} + err = api.Scheme.Convert(&extKC, &kc, nil) + if err != nil { + return nil, err + } + return &kc, nil + } else { + // Couldn't get a configuration from the API server yet. + // Restart as soon as anything comes back from the API server. + startKubeletConfigSyncLoop(s, "") + return nil, err + } +} + // Run runs the specified KubeletServer for the given KubeletConfig. This should never exit. // The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer. // Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults @@ -327,6 +439,22 @@ func checkPermissions() error { return nil } +func setConfigz(cz *configz.Config, kc *componentconfig.KubeletConfiguration) { + tmp := v1alpha1.KubeletConfiguration{} + api.Scheme.Convert(kc, &tmp, nil) + cz.Set(tmp) +} + +func initConfigz(kc *componentconfig.KubeletConfiguration) (*configz.Config, error) { + cz, err := configz.New("componentconfig") + if err == nil { + setConfigz(cz, kc) + } else { + glog.Errorf("unable to register configz: %s", err) + } + return cz, err +} + func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { if s.ExitOnLockContention && s.LockFilePath == "" { return errors.New("cannot exit on lock file contention: no lock file specified") @@ -345,18 +473,38 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { } } } - if c, err := configz.New("componentconfig"); err == nil { - c.Set(s.KubeletConfiguration) - } else { - glog.Errorf("unable to register configz: %s", err) - } + + // Register current configuration with /configz endpoint + cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration) if kcfg == nil { + if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() { + // Look for config on the API server. If it exists, replace s.KubeletConfiguration + // with it and continue. initKubeletConfigSync also starts the background thread that checks for new config. + + // Don't do dynamic Kubelet configuration in runonce mode + if s.RunOnce == false { + // For now we only do dynamic config when kcfg is passed as nil, because we don't want to disturb + // any special relationship between the values in s (KubeletServer) and kcfg (KubeletConfig). + remoteKC, err := initKubeletConfigSync(s) + if err == nil { + // Update s (KubeletServer) with new config from API server + s.KubeletConfiguration = *remoteKC + // Ensure that /configz is up to date with the new config + if cfgzErr != nil { + glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr) + } else { + setConfigz(cfgz, &s.KubeletConfiguration) + } + } + } + } + var kubeClient, eventClient *clientset.Clientset var autoDetectCloudProvider bool var cloud cloudprovider.Interface - if s.CloudProvider == kubeExternal.AutoDetectCloudProvider { + if s.CloudProvider == v1alpha1.AutoDetectCloudProvider { autoDetectCloudProvider = true } else { cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) @@ -441,7 +589,8 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { glog.Error(err) } - runtime.ReallyCrash = s.ReallyCrashForTesting + utilruntime.ReallyCrash = s.ReallyCrashForTesting + rand.Seed(time.Now().UTC().UnixNano()) // TODO(vmarmol): Do this through container config. diff --git a/pkg/util/config/feature_gate.go b/pkg/util/config/feature_gate.go index af80923cbf..70f68d3581 100644 --- a/pkg/util/config/feature_gate.go +++ b/pkg/util/config/feature_gate.go @@ -40,6 +40,7 @@ const ( // AllAlpha=true,NewFeature=false will result in newFeature=false allAlphaGate = "AllAlpha" externalTrafficLocalOnly = "AllowExtTrafficLocalEndpoints" + dynamicKubeletConfig = "DynamicKubeletConfig" ) var ( @@ -48,6 +49,7 @@ var ( knownFeatures = map[string]featureSpec{ allAlphaGate: {false, alpha}, externalTrafficLocalOnly: {false, alpha}, + dynamicKubeletConfig: {false, alpha}, } // Special handling for a few gates. @@ -92,6 +94,7 @@ type FeatureGate interface { ExternalTrafficLocalOnly() bool // TODO: Define accessors for each non-API alpha feature. + DynamicKubeletConfig() bool } // featureGate implements FeatureGate as well as pflag.Value for flag parsing. @@ -165,6 +168,11 @@ func (f *featureGate) ExternalTrafficLocalOnly() bool { return f.lookup(externalTrafficLocalOnly) } +// DynamicKubeletConfig returns value for dynamicKubeletConfig +func (f *featureGate) DynamicKubeletConfig() bool { + return f.lookup(dynamicKubeletConfig) +} + func (f *featureGate) lookup(key string) bool { defaultValue := f.known[key].enabled if f.enabled != nil { diff --git a/test/e2e_node/dynamic_kubelet_configuration_test.go b/test/e2e_node/dynamic_kubelet_configuration_test.go new file mode 100644 index 0000000000..a3f1a3464e --- /dev/null +++ b/test/e2e_node/dynamic_kubelet_configuration_test.go @@ -0,0 +1,195 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e_node + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/componentconfig" + "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +// This test is marked [Disruptive] because the Kubelet temporarily goes down as part of of this test. +var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:dynamicKubeletConfig] [Serial] [Disruptive]", func() { + f := framework.NewDefaultFramework("dynamic-kubelet-configuration-test") + + Context("When a configmap called `kubelet-` is added to the `kube-system` namespace", func() { + It("The Kubelet on that node should restart to take up the new config", func() { + const ( + restartGap = 40 * time.Second + ) + + // Get the current KubeletConfiguration (known to be valid) by + // querying the configz endpoint for the current node. + resp := pollConfigz(2*time.Minute, 5*time.Second) + kubeCfg, err := decodeConfigz(resp) + framework.ExpectNoError(err) + glog.Infof("KubeletConfiguration - Initial values: %+v", *kubeCfg) + + // Change a safe value e.g. file check frequency. + // Make sure we're providing a value distinct from the current one. + oldFileCheckFrequency := kubeCfg.FileCheckFrequency.Duration + newFileCheckFrequency := 11 * time.Second + if kubeCfg.FileCheckFrequency.Duration == newFileCheckFrequency { + newFileCheckFrequency = 10 * time.Second + } + kubeCfg.FileCheckFrequency.Duration = newFileCheckFrequency + + // Use the new config to create a new kube- configmap in `kube-system` namespace. + _, err = createConfigMap(f, kubeCfg) + framework.ExpectNoError(err) + + // Give the Kubelet time to see that there is new config and restart. If we don't do this, + // the Kubelet will still have the old config when we poll, and the test will fail. + time.Sleep(restartGap) + + // Use configz to get the new config. + resp = pollConfigz(2*time.Minute, 5*time.Second) + kubeCfg, err = decodeConfigz(resp) + framework.ExpectNoError(err) + glog.Infof("KubeletConfiguration - After modification of FileCheckFrequency: %+v", *kubeCfg) + + // We expect to see the new value in the new config. + Expect(kubeCfg.FileCheckFrequency.Duration).To(Equal(newFileCheckFrequency)) + + // Change the config back to what it originally was. + kubeCfg.FileCheckFrequency.Duration = oldFileCheckFrequency + _, err = updateConfigMap(f, kubeCfg) + framework.ExpectNoError(err) + + // Give the Kubelet time to see that there is new config and restart. If we don't do this, + // the Kubelet will still have the old config when we poll, and the test will fail. + time.Sleep(restartGap) + + // User configz to get the new config. + resp = pollConfigz(2*time.Minute, 5*time.Second) + kubeCfg, err = decodeConfigz(resp) + framework.ExpectNoError(err) + glog.Infof("KubeletConfiguration - After restoration of FileCheckFrequency: %+v", *kubeCfg) + + // We expect to see the original value restored in the new config. + Expect(kubeCfg.FileCheckFrequency.Duration).To(Equal(oldFileCheckFrequency)) + }) + }) +}) + +// This function either causes the test to fail, or it returns a status 200 response. +func pollConfigz(timeout time.Duration, pollInterval time.Duration) *http.Response { + endpoint := fmt.Sprintf("http://127.0.0.1:8080/api/v1/proxy/nodes/%s/configz", framework.TestContext.NodeName) + client := &http.Client{} + req, err := http.NewRequest("GET", endpoint, nil) + framework.ExpectNoError(err) + req.Header.Add("Accept", "application/json") + + var resp *http.Response + Eventually(func() bool { + resp, err = client.Do(req) + if err != nil { + glog.Errorf("Failed to get /configz, retrying. Error: %v", err) + return false + } + if resp.StatusCode != 200 { + glog.Errorf("/configz response status not 200, retrying. Response was: %+v", resp) + return false + } + return true + }, timeout, pollInterval).Should(Equal(true)) + return resp +} + +// Decodes the http response from /configz and returns a componentconfig.KubeletConfiguration (internal type). +func decodeConfigz(resp *http.Response) (*componentconfig.KubeletConfiguration, error) { + // This hack because /configz reports the following structure: + // {"componentconfig": {the JSON representation of v1alpha1.KubeletConfiguration}} + type configzWrapper struct { + ComponentConfig v1alpha1.KubeletConfiguration `json:"componentconfig"` + } + + configz := configzWrapper{} + kubeCfg := componentconfig.KubeletConfiguration{} + + contentsBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + err = json.Unmarshal(contentsBytes, &configz) + if err != nil { + return nil, err + } + + err = api.Scheme.Convert(&configz.ComponentConfig, &kubeCfg, nil) + if err != nil { + return nil, err + } + + return &kubeCfg, nil +} + +// Uses KubeletConfiguration to create a `kubelet-` ConfigMap in the "kube-system" namespace. +func createConfigMap(f *framework.Framework, kubeCfg *componentconfig.KubeletConfiguration) (*api.ConfigMap, error) { + kubeCfgExt := v1alpha1.KubeletConfiguration{} + api.Scheme.Convert(kubeCfg, &kubeCfgExt, nil) + + bytes, err := json.Marshal(kubeCfgExt) + framework.ExpectNoError(err) + + cmap, err := f.Client.ConfigMaps("kube-system").Create(&api.ConfigMap{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("kubelet-%s", framework.TestContext.NodeName), + }, + Data: map[string]string{ + "kubelet.config": string(bytes), + }, + }) + if err != nil { + return nil, err + } + return cmap, nil +} + +// Similar to createConfigMap, except this updates an existing ConfigMap. +func updateConfigMap(f *framework.Framework, kubeCfg *componentconfig.KubeletConfiguration) (*api.ConfigMap, error) { + kubeCfgExt := v1alpha1.KubeletConfiguration{} + api.Scheme.Convert(kubeCfg, &kubeCfgExt, nil) + + bytes, err := json.Marshal(kubeCfgExt) + framework.ExpectNoError(err) + + cmap, err := f.Client.ConfigMaps("kube-system").Update(&api.ConfigMap{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("kubelet-%s", framework.TestContext.NodeName), + }, + Data: map[string]string{ + "kubelet.config": string(bytes), + }, + }) + if err != nil { + return nil, err + } + return cmap, nil +} diff --git a/test/e2e_node/e2e_service.go b/test/e2e_node/e2e_service.go index 28f8dfd3b5..b45e86bbd0 100644 --- a/test/e2e_node/e2e_service.go +++ b/test/e2e_node/e2e_service.go @@ -87,7 +87,7 @@ func (e *E2EServices) Start() error { "--manifest-path", framework.TestContext.ManifestPath, "--eviction-hard", framework.TestContext.EvictionHard, ) - e.services = newServer("services", startCmd, nil, getHealthCheckURLs(), servicesLogFile) + e.services = newServer("services", startCmd, nil, nil, getHealthCheckURLs(), servicesLogFile, false) return e.services.start() } @@ -335,7 +335,7 @@ func (es *e2eService) startNamespaceController() error { } func (es *e2eService) startKubeletServer() (*server, error) { - var killCommand *exec.Cmd + var killCommand, restartCommand *exec.Cmd cmdArgs := []string{} if systemdRun, err := exec.LookPath("systemd-run"); err == nil { // On systemd services, detection of a service / unit works reliably while @@ -343,8 +343,9 @@ func (es *e2eService) startKubeletServer() (*server, error) { // Since kubelet will typically be run as a service it also makes more // sense to test it that way unitName := fmt.Sprintf("kubelet-%d.service", rand.Int31()) - cmdArgs = append(cmdArgs, systemdRun, "--unit="+unitName, getKubeletServerBin()) + cmdArgs = append(cmdArgs, systemdRun, "--unit="+unitName, "--remain-after-exit", getKubeletServerBin()) killCommand = exec.Command("sudo", "systemctl", "kill", unitName) + restartCommand = exec.Command("sudo", "systemctl", "restart", unitName) es.logFiles["kubelet.log"] = logFileData{ journalctlCommand: []string{"-u", unitName}, } @@ -372,6 +373,7 @@ func (es *e2eService) startKubeletServer() (*server, error) { "--pod-cidr=10.180.0.0/24", // Assign a fixed CIDR to the node because there is no node controller. "--eviction-hard", framework.TestContext.EvictionHard, "--eviction-pressure-transition-period", "30s", + "--feature-gates", "DynamicKubeletConfig=true", // TODO(mtaufen): Eventually replace with a value from the framework.TestContext ) if framework.TestContext.CgroupsPerQOS { // TODO: enable this when the flag is stable and available in kubelet. @@ -394,8 +396,10 @@ func (es *e2eService) startKubeletServer() (*server, error) { "kubelet", cmd, killCommand, + restartCommand, []string{kubeletHealthCheckURL}, - "kubelet.log") + "kubelet.log", + true) return server, server.start() } @@ -408,20 +412,32 @@ type server struct { // killCommand is the command used to stop the server. It is not required. If it // is not specified, `sudo kill` will be used to stop the server. killCommand *exec.Cmd + // restartCommand is the command used to restart the server. If provided, it will be used + // instead of startCommand when restarting the server. + restartCommand *exec.Cmd // healthCheckUrls is the urls used to check whether the server is ready. healthCheckUrls []string // outFilename is the name of the log file. The stdout and stderr of the server // will be redirected to this file. outFilename string + // restartOnExit determines whether a restart loop is launched with the server + restartOnExit bool + // Writing to this channel, if it is not nil, stops the restart loop. + // When tearing down a server, you should check for this channel and write to it if it exists. + stopRestartingCh chan<- bool + // Read from this to confirm that the restart loop has stopped. + ackStopRestartingCh <-chan bool } -func newServer(name string, start, kill *exec.Cmd, urls []string, filename string) *server { +func newServer(name string, start, kill, restart *exec.Cmd, urls []string, filename string, restartOnExit bool) *server { return &server{ name: name, startCommand: start, killCommand: kill, + restartCommand: restart, healthCheckUrls: urls, outFilename: filename, + restartOnExit: restartOnExit, } } @@ -434,8 +450,8 @@ func commandToString(c *exec.Cmd) string { } func (s *server) String() string { - return fmt.Sprintf("server %q start-command: `%s`, kill-command: `%s`, health-check: %v, output-file: %q", s.name, - commandToString(s.startCommand), commandToString(s.killCommand), s.healthCheckUrls, s.outFilename) + return fmt.Sprintf("server %q start-command: `%s`, kill-command: `%s`, restart-command: `%s`, health-check: %v, output-file: %q", s.name, + commandToString(s.startCommand), commandToString(s.killCommand), commandToString(s.restartCommand), s.healthCheckUrls, s.outFilename) } // readinessCheck checks whether services are ready via the health check urls. Once there is @@ -481,8 +497,23 @@ func readinessCheck(urls []string, errCh <-chan error) error { return fmt.Errorf("e2e service readiness check timeout %v", *serverStartTimeout) } +// Note: restartOnExit == true requires len(s.healthCheckUrls) > 0 to work properly. func (s *server) start() error { errCh := make(chan error) + + var stopRestartingCh, ackStopRestartingCh chan bool + if s.restartOnExit { + if len(s.healthCheckUrls) == 0 { + return fmt.Errorf("Tried to start %s which has s.restartOnExit == true, but no health check urls provided.", s) + } + + stopRestartingCh = make(chan bool) + ackStopRestartingCh = make(chan bool) + + s.stopRestartingCh = stopRestartingCh + s.ackStopRestartingCh = ackStopRestartingCh + } + go func() { defer close(errCh) @@ -496,10 +527,9 @@ func (s *server) start() error { defer outfile.Close() defer outfile.Sync() - cmd := s.startCommand // Set the command to write the output file - cmd.Stdout = outfile - cmd.Stderr = outfile + s.startCommand.Stdout = outfile + s.startCommand.Stderr = outfile // Death of this test process should kill the server as well. attrs := &syscall.SysProcAttr{} @@ -511,14 +541,96 @@ func (s *server) start() error { errCh <- fmt.Errorf("failed to set Pdeathsig field (non-linux build)") return } - cmd.SysProcAttr = attrs + s.startCommand.SysProcAttr = attrs - // Run the command - err = cmd.Run() + // Start the command + err = s.startCommand.Start() if err != nil { - errCh <- fmt.Errorf("failed to run server start command %q: %v", commandToString(cmd), err) + errCh <- fmt.Errorf("failed to run %s: %v", s, err) return } + if !s.restartOnExit { + // If we aren't planning on restarting, ok to Wait() here to release resources. + // Otherwise, we Wait() in the restart loop. + err = s.startCommand.Wait() + if err != nil { + errCh <- fmt.Errorf("failed to run %s: %v", s, err) + return + } + } else { + // New stuff + usedStartCmd := true + for { + // Wait for an initial health check to pass, so that we are sure the server started. + err := readinessCheck(s.healthCheckUrls, nil) + if err != nil { + if usedStartCmd { + s.startCommand.Wait() // Release resources if necessary. + } + // This should not happen, immediately stop the e2eService process. + glog.Fatalf("restart loop readinessCheck failed for %s", s) + } + + // Initial health check passed, wait until a health check fails again. + stillAlive: + for { + select { + case <-stopRestartingCh: + ackStopRestartingCh <- true + return + case <-time.After(time.Second): + for _, url := range s.healthCheckUrls { + resp, err := http.Get(url) + if err != nil || resp.StatusCode != http.StatusOK { + break stillAlive + } + } + } + } + + if usedStartCmd { + s.startCommand.Wait() // Release resources from last cmd + usedStartCmd = false + } + if s.restartCommand != nil { + // Always make a fresh copy of restartCommand before running, we may have to restart multiple times + s.restartCommand = &exec.Cmd{ + Path: s.restartCommand.Path, + Args: s.restartCommand.Args, + Env: s.restartCommand.Env, + Dir: s.restartCommand.Dir, + Stdin: s.restartCommand.Stdin, + Stdout: s.restartCommand.Stdout, + Stderr: s.restartCommand.Stderr, + ExtraFiles: s.restartCommand.ExtraFiles, + SysProcAttr: s.restartCommand.SysProcAttr, + } + err = s.restartCommand.Run() // Run and wait for exit. This command is assumed to have short duration, e.g. systemctl restart + if err != nil { + // This should not happen, immediately stop the e2eService process. + glog.Fatalf("restarting %s with restartCommand failed. Error: %v.", s, err) + } + } else { + s.startCommand = &exec.Cmd{ + Path: s.startCommand.Path, + Args: s.startCommand.Args, + Env: s.startCommand.Env, + Dir: s.startCommand.Dir, + Stdin: s.startCommand.Stdin, + Stdout: s.startCommand.Stdout, + Stderr: s.startCommand.Stderr, + ExtraFiles: s.startCommand.ExtraFiles, + SysProcAttr: s.startCommand.SysProcAttr, + } + err = s.startCommand.Start() + usedStartCmd = true + if err != nil { + // This should not happen, immediately stop the e2eService process. + glog.Fatalf("restarting %s with startCommand failed. Error: %v.", s, err) + } + } + } + } }() return readinessCheck(s.healthCheckUrls, errCh) @@ -528,6 +640,12 @@ func (s *server) kill() error { name := s.name cmd := s.startCommand + // If s has a restart loop, turn it off. + if s.restartOnExit { + s.stopRestartingCh <- true + <-s.ackStopRestartingCh + } + if s.killCommand != nil { return s.killCommand.Run() }