Merge pull request #13061 from yifan-gu/rkt_patch_events

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-09-01 04:10:18 -07:00
commit 49475c3431
3 changed files with 96 additions and 27 deletions

View File

@ -46,8 +46,6 @@ func (c *RefManager) SetRef(id string, ref *api.ObjectReference) {
}
// ClearRef forgets the given container id and its associated container reference.
// TODO(yifan): This is currently never called. Consider to remove this function,
// or figure out when to clear the references.
func (c *RefManager) ClearRef(id string) {
c.Lock()
defer c.Unlock()

View File

@ -1297,6 +1297,7 @@ func (dm *DockerManager) killContainer(containerID types.UID, container *api.Con
} else {
// TODO: pass reason down here, and state, or move this call up the stack.
dm.recorder.Eventf(ref, "Killing", "Killing with docker id %v", util.ShortenString(ID, 12))
dm.containerRefManager.ClearRef(ID)
}
return err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package rkt
import (
"bytes"
"encoding/json"
"fmt"
"io"
@ -183,11 +184,13 @@ func (r *runtime) buildCommand(args ...string) *exec.Cmd {
func (r *runtime) runCommand(args ...string) ([]string, error) {
glog.V(4).Info("rkt: Run command:", args)
output, err := r.buildCommand(args...).Output()
if err != nil {
return nil, err
var stdout, stderr bytes.Buffer
cmd := r.buildCommand(args...)
cmd.Stdout, cmd.Stderr = &stdout, &stderr
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("failed to run %v: %v\nstdout: %v\nstderr: %v", args, err, stdout.String(), stderr.String())
}
return strings.Split(strings.TrimSpace(string(output)), "\n"), nil
return strings.Split(strings.TrimSpace(stdout.String()), "\n"), nil
}
// makePodServiceFileName constructs the unit file name for a pod using its UID.
@ -532,16 +535,17 @@ func serviceFilePath(serviceName string) string {
// 1. Invoke 'rkt prepare' to prepare the pod, and get the rkt pod uuid.
// 2. Creates the unit file and save it under systemdUnitDir.
//
// On success, it will return a string that represents name of the unit file.
func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, error) {
// On success, it will return a string that represents name of the unit file
// and the runtime pod.
func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *kubecontainer.Pod, error) {
// Generate the pod manifest from the pod spec.
manifest, err := r.makePodManifest(pod, pullSecrets)
if err != nil {
return "", err
return "", nil, err
}
manifestFile, err := ioutil.TempFile("", fmt.Sprintf("manifest-%s-", pod.Name))
if err != nil {
return "", err
return "", nil, err
}
defer func() {
manifestFile.Close()
@ -552,31 +556,31 @@ func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, er
data, err := json.Marshal(manifest)
if err != nil {
return "", err
return "", nil, err
}
// Since File.Write returns error if the written length is less than len(data),
// so check error is enough for us.
if _, err := manifestFile.Write(data); err != nil {
return "", err
return "", nil, err
}
// Run 'rkt prepare' to get the rkt UUID.
cmds := []string{"prepare", "--quiet", "--pod-manifest", manifestFile.Name()}
output, err := r.runCommand(cmds...)
if err != nil {
return "", err
return "", nil, err
}
if len(output) != 1 {
return "", fmt.Errorf("invalid output from 'rkt prepare': %v", output)
return "", nil, fmt.Errorf("invalid output from 'rkt prepare': %v", output)
}
uuid := output[0]
glog.V(4).Infof("'rkt prepare' returns %q", uuid)
// Create systemd service file for the rkt pod.
p := apiPodToruntimePod(uuid, pod)
b, err := json.Marshal(p)
runtimePod := apiPodToruntimePod(uuid, pod)
b, err := json.Marshal(runtimePod)
if err != nil {
return "", err
return "", nil, err
}
var runPrepared string
@ -614,21 +618,57 @@ func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, er
glog.V(4).Infof("rkt: Creating service file %q for pod %q", serviceName, pod.Name)
serviceFile, err := os.Create(serviceFilePath(serviceName))
if err != nil {
return "", err
return "", nil, err
}
defer serviceFile.Close()
_, err = io.Copy(serviceFile, unit.Serialize(units))
if err != nil {
return "", err
return "", nil, err
}
if needReload {
if err := r.systemd.Reload(); err != nil {
return "", err
return "", nil, err
}
}
return serviceName, nil
return serviceName, runtimePod, nil
}
// generateEvents is a helper function that generates some container
// life cycle events for containers in a pod.
func (r *runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, failure error) {
// Set up container references.
for _, c := range runtimePod.Containers {
containerID := string(c.ID)
id, err := parseContainerID(containerID)
if err != nil {
glog.Warningf("Invalid container ID %q", containerID)
continue
}
ref, ok := r.containerRefManager.GetRef(containerID)
if !ok {
glog.Warningf("No ref for container %q", containerID)
continue
}
// Note that 'rkt id' is the pod id.
uuid := util.ShortenString(id.uuid, 8)
switch reason {
case "Created":
r.recorder.Eventf(ref, "Created", "Created with rkt id %v", uuid)
case "Started":
r.recorder.Eventf(ref, "Started", "Started with rkt id %v", uuid)
case "Failed":
r.recorder.Eventf(ref, "Failed", "Failed to start with rkt id %v with error %v", uuid, failure)
case "Killing":
r.recorder.Eventf(ref, "Killing", "Killing with rkt id %v", uuid)
default:
glog.Errorf("rkt: Unexpected event %q", reason)
}
}
return
}
// RunPod first creates the unit file for a pod, and then
@ -636,19 +676,42 @@ func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, er
func (r *runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
glog.V(4).Infof("Rkt starts to run pod: name %q.", pod.Name)
name, err := r.preparePod(pod, pullSecrets)
if err != nil {
return err
name, runtimePod, prepareErr := r.preparePod(pod, pullSecrets)
// Set container references and generate events.
// If preparedPod fails, then send out 'failed' events for each container.
// Otherwise, store the container references so we can use them later to send events.
for i, c := range pod.Spec.Containers {
ref, err := kubecontainer.GenerateContainerRef(pod, &c)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, c.Name, err)
continue
}
if prepareErr != nil {
r.recorder.Eventf(ref, "Failed", "Failed to create rkt container with error: %v", prepareErr)
continue
}
containerID := string(runtimePod.Containers[i].ID)
r.containerRefManager.SetRef(containerID, ref)
}
if prepareErr != nil {
return prepareErr
}
r.generateEvents(runtimePod, "Created", nil)
// TODO(yifan): This is the old version of go-systemd. Should update when libcontainer updates
// its version of go-systemd.
// RestartUnit has the same effect as StartUnit if the unit is not running, besides it can restart
// a unit if the unit file is changed and reloaded.
_, err = r.systemd.RestartUnit(name, "replace")
if err != nil {
if _, err := r.systemd.RestartUnit(name, "replace"); err != nil {
r.generateEvents(runtimePod, "Failed", err)
return err
}
r.generateEvents(runtimePod, "Started", nil)
return nil
}
@ -726,9 +789,17 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
}
// KillPod invokes 'systemctl kill' to kill the unit that runs the pod.
// TODO(yifan): Handle network plugin.
func (r *runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name)
serviceName := makePodServiceFileName(runningPod.ID)
r.generateEvents(&runningPod, "Killing", nil)
for _, c := range runningPod.Containers {
id := string(c.ID)
r.containerRefManager.ClearRef(id)
}
// TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout.
r.systemd.KillUnit(serviceName, int32(syscall.SIGKILL))
// Remove the systemd service file as well.
@ -948,7 +1019,6 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
}
if restartPod {
// TODO(yifan): Handle network plugin.
if err := r.KillPod(pod, runningPod); err != nil {
return err
}