kubelet/rkt: Add RunPod() for rkt.

pull/6/head
Yifan Gu 2015-04-30 13:34:46 -07:00
parent 262c34e7db
commit 1636fd2e1c
1 changed files with 153 additions and 0 deletions

View File

@ -19,17 +19,27 @@ package rkt
import (
"encoding/json"
"fmt"
"hash/adler32"
"io"
"io/ioutil"
"os"
"os/exec"
"path"
"strings"
"syscall"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
appcschema "github.com/appc/spec/schema"
appctypes "github.com/appc/spec/schema/types"
"github.com/coreos/go-systemd/dbus"
"github.com/coreos/go-systemd/unit"
"github.com/coreos/rkt/store"
"github.com/golang/glog"
)
@ -382,3 +392,146 @@ func (r *Runtime) getImageID(imageName string) (string, error) {
}
return last, nil
}
func newUnitOption(section, name, value string) *unit.UnitOption {
return &unit.UnitOption{Section: section, Name: name, Value: value}
}
// TODO(yifan): Move this duplicated function to container runtime.
// hashContainer computes the hash of one api.Container.
func hashContainer(container *api.Container) uint64 {
hash := adler32.New()
util.DeepHashObject(hash, *container)
return uint64(hash.Sum32())
}
// TODO(yifan): Remove the receiver once we can solve the appName->imageID problem.
func (r *Runtime) apiPodToRuntimePod(uuid string, pod *api.Pod) *kubecontainer.Pod {
p := &kubecontainer.Pod{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
}
for i := range pod.Spec.Containers {
c := &pod.Spec.Containers[i]
imageID, err := r.getImageID(c.Image)
if err != nil {
glog.Warningf("rkt: Cannot get image id: %v", err)
}
p.Containers = append(p.Containers, &kubecontainer.Container{
ID: types.UID(buildContainerID(&containerID{uuid, c.Name, imageID})),
Name: c.Name,
Image: c.Image,
Hash: hashContainer(c),
Created: time.Now().Unix(),
})
}
return p
}
// preparePod will:
//
// 1. Invoke 'rkt prepare' to prepare the pod, and get the rkt pod uuid.
// 2. Creates the unit file and save it under systemdUnitDir.
//
// On success, it will return a string that represents name of the unit file
// and a boolean that indicates if the unit file needs to be reloaded (whether
// the file is already existed).
func (r *Runtime) preparePod(pod *api.Pod, volumeMap map[string]volume.Volume) (string, bool, error) {
cmds := []string{"prepare", "--quiet", "--pod-manifest"}
// Generate the pod manifest from the pod spec.
manifest, err := r.makePodManifest(pod, volumeMap)
if err != nil {
return "", false, err
}
manifestFile, err := ioutil.TempFile("", "manifest")
if err != nil {
return "", false, err
}
defer func() {
manifestFile.Close()
if err := os.Remove(manifestFile.Name()); err != nil {
glog.Warningf("rkt: Cannot remove temp manifest file %q: %v", manifestFile.Name(), err)
}
}()
data, err := json.Marshal(manifest)
if err != nil {
return "", false, err
}
// Since File.Write returns error if the written length is less than len(data),
// so check error is enough for us.
if _, err := manifestFile.Write(data); err != nil {
return "", false, err
}
cmds = append(cmds, manifestFile.Name())
output, err := r.runCommand(cmds...)
if err != nil {
return "", false, err
}
if len(output) != 1 {
return "", false, fmt.Errorf("cannot get uuid from 'rkt prepare'")
}
uuid := output[0]
glog.V(4).Infof("'rkt prepare' returns %q.", uuid)
p := r.apiPodToRuntimePod(uuid, pod)
b, err := json.Marshal(p)
if err != nil {
return "", false, err
}
runPrepared := fmt.Sprintf("%s run-prepared --private-net=%v %s", r.absPath, pod.Spec.HostNetwork, uuid)
units := []*unit.UnitOption{
newUnitOption(unitKubernetesSection, unitRktID, uuid),
newUnitOption(unitKubernetesSection, unitPodName, string(b)),
newUnitOption("Service", "ExecStart", runPrepared),
}
// Save the unit file under systemd's service directory.
// TODO(yifan) Garbage collect 'dead' service files.
needReload := false
unitName := makePodServiceFileName(pod.UID)
if _, err := os.Stat(path.Join(systemdServiceDir, unitName)); err == nil {
needReload = true
}
unitFile, err := os.Create(path.Join(systemdServiceDir, unitName))
if err != nil {
return "", false, err
}
defer unitFile.Close()
_, err = io.Copy(unitFile, unit.Serialize(units))
if err != nil {
return "", false, err
}
return unitName, needReload, nil
}
// RunPod first creates the unit file for a pod, and then calls
// StartUnit over d-bus.
func (r *Runtime) RunPod(pod *api.Pod, volumeMap map[string]volume.Volume) error {
glog.V(4).Infof("Rkt starts to run pod: name %q.", pod.Name)
name, needReload, err := r.preparePod(pod, volumeMap)
if err != nil {
return err
}
if needReload {
// TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout.
r.systemd.KillUnit(name, int32(syscall.SIGKILL))
if err := r.systemd.Reload(); err != nil {
return err
}
}
// TODO(yifan): This is the old version of go-systemd. Should update when libcontainer updates
// its version of go-systemd.
_, err = r.systemd.StartUnit(name, "replace")
if err != nil {
return err
}
return nil
}