diff --git a/pkg/kubelet/container/container_reference_manager.go b/pkg/kubelet/container/container_reference_manager.go index 0bcec8861e..0cb3097b0f 100644 --- a/pkg/kubelet/container/container_reference_manager.go +++ b/pkg/kubelet/container/container_reference_manager.go @@ -46,8 +46,6 @@ func (c *RefManager) SetRef(id string, ref *api.ObjectReference) { } // ClearRef forgets the given container id and its associated container reference. -// TODO(yifan): This is currently never called. Consider to remove this function, -// or figure out when to clear the references. func (c *RefManager) ClearRef(id string) { c.Lock() defer c.Unlock() diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 4f51b492f2..d4511314d4 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -1297,6 +1297,7 @@ func (dm *DockerManager) killContainer(containerID types.UID, container *api.Con } else { // TODO: pass reason down here, and state, or move this call up the stack. dm.recorder.Eventf(ref, "Killing", "Killing with docker id %v", util.ShortenString(ID, 12)) + dm.containerRefManager.ClearRef(ID) } return err } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 87255b66a4..7c8a9249e5 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -17,6 +17,7 @@ limitations under the License. package rkt import ( + "bytes" "encoding/json" "fmt" "io" @@ -183,11 +184,13 @@ func (r *runtime) buildCommand(args ...string) *exec.Cmd { func (r *runtime) runCommand(args ...string) ([]string, error) { glog.V(4).Info("rkt: Run command:", args) - output, err := r.buildCommand(args...).Output() - if err != nil { - return nil, err + var stdout, stderr bytes.Buffer + cmd := r.buildCommand(args...) + cmd.Stdout, cmd.Stderr = &stdout, &stderr + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("failed to run %v: %v\nstdout: %v\nstderr: %v", args, err, stdout.String(), stderr.String()) } - return strings.Split(strings.TrimSpace(string(output)), "\n"), nil + return strings.Split(strings.TrimSpace(stdout.String()), "\n"), nil } // makePodServiceFileName constructs the unit file name for a pod using its UID. @@ -532,16 +535,17 @@ func serviceFilePath(serviceName string) string { // 1. Invoke 'rkt prepare' to prepare the pod, and get the rkt pod uuid. // 2. Creates the unit file and save it under systemdUnitDir. // -// On success, it will return a string that represents name of the unit file. -func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, error) { +// On success, it will return a string that represents name of the unit file +// and the runtime pod. +func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *kubecontainer.Pod, error) { // Generate the pod manifest from the pod spec. manifest, err := r.makePodManifest(pod, pullSecrets) if err != nil { - return "", err + return "", nil, err } manifestFile, err := ioutil.TempFile("", fmt.Sprintf("manifest-%s-", pod.Name)) if err != nil { - return "", err + return "", nil, err } defer func() { manifestFile.Close() @@ -552,31 +556,31 @@ func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, er data, err := json.Marshal(manifest) if err != nil { - return "", err + return "", nil, err } // Since File.Write returns error if the written length is less than len(data), // so check error is enough for us. if _, err := manifestFile.Write(data); err != nil { - return "", err + return "", nil, err } // Run 'rkt prepare' to get the rkt UUID. cmds := []string{"prepare", "--quiet", "--pod-manifest", manifestFile.Name()} output, err := r.runCommand(cmds...) if err != nil { - return "", err + return "", nil, err } if len(output) != 1 { - return "", fmt.Errorf("invalid output from 'rkt prepare': %v", output) + return "", nil, fmt.Errorf("invalid output from 'rkt prepare': %v", output) } uuid := output[0] glog.V(4).Infof("'rkt prepare' returns %q", uuid) // Create systemd service file for the rkt pod. - p := apiPodToruntimePod(uuid, pod) - b, err := json.Marshal(p) + runtimePod := apiPodToruntimePod(uuid, pod) + b, err := json.Marshal(runtimePod) if err != nil { - return "", err + return "", nil, err } var runPrepared string @@ -614,21 +618,57 @@ func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, er glog.V(4).Infof("rkt: Creating service file %q for pod %q", serviceName, pod.Name) serviceFile, err := os.Create(serviceFilePath(serviceName)) if err != nil { - return "", err + return "", nil, err } defer serviceFile.Close() _, err = io.Copy(serviceFile, unit.Serialize(units)) if err != nil { - return "", err + return "", nil, err } if needReload { if err := r.systemd.Reload(); err != nil { - return "", err + return "", nil, err } } - return serviceName, nil + return serviceName, runtimePod, nil +} + +// generateEvents is a helper function that generates some container +// life cycle events for containers in a pod. +func (r *runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, failure error) { + // Set up container references. + for _, c := range runtimePod.Containers { + containerID := string(c.ID) + id, err := parseContainerID(containerID) + if err != nil { + glog.Warningf("Invalid container ID %q", containerID) + continue + } + + ref, ok := r.containerRefManager.GetRef(containerID) + if !ok { + glog.Warningf("No ref for container %q", containerID) + continue + } + + // Note that 'rkt id' is the pod id. + uuid := util.ShortenString(id.uuid, 8) + switch reason { + case "Created": + r.recorder.Eventf(ref, "Created", "Created with rkt id %v", uuid) + case "Started": + r.recorder.Eventf(ref, "Started", "Started with rkt id %v", uuid) + case "Failed": + r.recorder.Eventf(ref, "Failed", "Failed to start with rkt id %v with error %v", uuid, failure) + case "Killing": + r.recorder.Eventf(ref, "Killing", "Killing with rkt id %v", uuid) + default: + glog.Errorf("rkt: Unexpected event %q", reason) + } + } + return } // RunPod first creates the unit file for a pod, and then @@ -636,19 +676,42 @@ func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, er func (r *runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { glog.V(4).Infof("Rkt starts to run pod: name %q.", pod.Name) - name, err := r.preparePod(pod, pullSecrets) - if err != nil { - return err + name, runtimePod, prepareErr := r.preparePod(pod, pullSecrets) + + // Set container references and generate events. + // If preparedPod fails, then send out 'failed' events for each container. + // Otherwise, store the container references so we can use them later to send events. + for i, c := range pod.Spec.Containers { + ref, err := kubecontainer.GenerateContainerRef(pod, &c) + if err != nil { + glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, c.Name, err) + continue + } + if prepareErr != nil { + r.recorder.Eventf(ref, "Failed", "Failed to create rkt container with error: %v", prepareErr) + continue + } + containerID := string(runtimePod.Containers[i].ID) + r.containerRefManager.SetRef(containerID, ref) } + if prepareErr != nil { + return prepareErr + } + + r.generateEvents(runtimePod, "Created", nil) + // TODO(yifan): This is the old version of go-systemd. Should update when libcontainer updates // its version of go-systemd. // RestartUnit has the same effect as StartUnit if the unit is not running, besides it can restart // a unit if the unit file is changed and reloaded. - _, err = r.systemd.RestartUnit(name, "replace") - if err != nil { + if _, err := r.systemd.RestartUnit(name, "replace"); err != nil { + r.generateEvents(runtimePod, "Failed", err) return err } + + r.generateEvents(runtimePod, "Started", nil) + return nil } @@ -726,9 +789,17 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { } // KillPod invokes 'systemctl kill' to kill the unit that runs the pod. +// TODO(yifan): Handle network plugin. func (r *runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error { glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name) + serviceName := makePodServiceFileName(runningPod.ID) + r.generateEvents(&runningPod, "Killing", nil) + for _, c := range runningPod.Containers { + id := string(c.ID) + r.containerRefManager.ClearRef(id) + } + // TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout. r.systemd.KillUnit(serviceName, int32(syscall.SIGKILL)) // Remove the systemd service file as well. @@ -948,7 +1019,6 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus } if restartPod { - // TODO(yifan): Handle network plugin. if err := r.KillPod(pod, runningPod); err != nil { return err }