Merge pull request #17969 from dgonyeo/rkt_api_get_pods

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-12-10 23:13:37 -08:00
commit 6b8eb902ce
3 changed files with 522 additions and 69 deletions

View File

@ -32,6 +32,8 @@ type fakeRktInterface struct {
sync.Mutex
info rktapi.Info
images []*rktapi.Image
podFilter *rktapi.PodFilter
pods []*rktapi.Pod
called []string
err error
}
@ -55,11 +57,25 @@ func (f *fakeRktInterface) GetInfo(ctx context.Context, in *rktapi.GetInfoReques
}
func (f *fakeRktInterface) ListPods(ctx context.Context, in *rktapi.ListPodsRequest, opts ...grpc.CallOption) (*rktapi.ListPodsResponse, error) {
return nil, fmt.Errorf("Not implemented")
f.Lock()
defer f.Unlock()
f.called = append(f.called, "ListPods")
f.podFilter = in.Filter
return &rktapi.ListPodsResponse{f.pods}, f.err
}
func (f *fakeRktInterface) InspectPod(ctx context.Context, in *rktapi.InspectPodRequest, opts ...grpc.CallOption) (*rktapi.InspectPodResponse, error) {
return nil, fmt.Errorf("Not implemented")
f.Lock()
defer f.Unlock()
f.called = append(f.called, "InspectPod")
for _, pod := range f.pods {
if pod.Id == in.Id {
return &rktapi.InspectPodResponse{pod}, f.err
}
}
return &rktapi.InspectPodResponse{nil}, f.err
}
func (f *fakeRktInterface) ListImages(ctx context.Context, in *rktapi.ListImagesRequest, opts ...grpc.CallOption) (*rktapi.ListImagesResponse, error) {

View File

@ -72,6 +72,16 @@ const (
unitRktID = "RktID"
unitRestartCount = "RestartCount"
k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet"
k8sRktKubeletAnnoValue = "true"
k8sRktUIDAnno = "rkt.kubernetes.io/uid"
k8sRktNameAnno = "rkt.kubernetes.io/name"
k8sRktNamespaceAnno = "rkt.kubernetes.io/namespace"
//TODO: remove the creation time annotation once this is closed: https://github.com/coreos/rkt/issues/1789
k8sRktCreationTimeAnno = "rkt.kubernetes.io/created"
k8sRktContainerHashAnno = "rkt.kubernetes.io/containerhash"
k8sRktRestartCountAnno = "rkt.kubernetes.io/restartcount"
dockerPrefix = "docker://"
authDir = "auth.d"
@ -415,50 +425,60 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
var globalPortMappings []kubecontainer.PortMapping
manifest := appcschema.BlankPodManifest()
for _, c := range pod.Spec.Containers {
if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil {
return nil, err
}
imgManifest, err := r.getImageManifest(c.Image)
if err != nil {
return nil, err
}
if imgManifest.App == nil {
imgManifest.App = new(appctypes.App)
}
img, err := r.getImageByName(c.Image)
if err != nil {
return nil, err
}
hash, err := appctypes.NewHash(img.ID)
if err != nil {
return nil, err
}
opts, err := r.generator.GenerateRunContainerOptions(pod, &c)
if err != nil {
return nil, err
}
globalPortMappings = append(globalPortMappings, opts.PortMappings...)
if err := setApp(imgManifest.App, &c, opts); err != nil {
return nil, err
}
name, err := appctypes.SanitizeACName(c.Name)
if err != nil {
return nil, err
}
appName := appctypes.MustACName(name)
manifest.Apps = append(manifest.Apps, appcschema.RuntimeApp{
Name: *appName,
Image: appcschema.RuntimeImage{ID: *hash},
App: imgManifest.App,
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{
Filter: kubernetesPodFilter(pod),
})
if err != nil {
return nil, fmt.Errorf("couldn't list pods: %v", err)
}
restartCount := 0
for _, rktpod := range listResp.Pods {
//TODO: get the manifest from listresp.Pods when this gets merged: https://github.com/coreos/rkt/pull/1786
inspectResp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{rktpod.Id})
if err != nil {
glog.Warningf("rkt: error while inspecting pod %s", rktpod.Id)
continue
}
if inspectResp.Pod == nil {
glog.Warningf("rkt: pod %s vanished?!", rktpod.Id)
continue
}
manifest := &appcschema.PodManifest{}
err = json.Unmarshal(inspectResp.Pod.Manifest, manifest)
if err != nil {
glog.Warningf("rkt: error unmatshaling pod manifest: %v", err)
continue
}
if countString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno); ok {
num, err := strconv.Atoi(countString)
if err != nil {
glog.Warningf("rkt: error reading restart count on pod: %v", err)
continue
}
if num+1 > restartCount {
restartCount = num + 1
}
}
}
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktKubeletAnno), k8sRktKubeletAnnoValue)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktUIDAnno), string(pod.UID))
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktNameAnno), pod.Name)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktNamespaceAnno), pod.Namespace)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktCreationTimeAnno), strconv.FormatInt(time.Now().Unix(), 10))
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktRestartCountAnno), strconv.Itoa(restartCount))
for _, c := range pod.Spec.Containers {
app, portMappings, err := r.newAppcRuntimeApp(pod, c, pullSecrets)
if err != nil {
return nil, err
}
manifest.Apps = append(manifest.Apps, *app)
globalPortMappings = append(globalPortMappings, portMappings...)
}
volumeMap, ok := r.volumeGetter.GetVolumes(pod.UID)
@ -495,6 +515,80 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
return manifest, nil
}
func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, c api.Container, pullSecrets []api.Secret) (*appcschema.RuntimeApp, []kubecontainer.PortMapping, error) {
if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil {
return nil, nil, err
}
imgManifest, err := r.getImageManifest(c.Image)
if err != nil {
return nil, nil, err
}
if imgManifest.App == nil {
imgManifest.App = new(appctypes.App)
}
img, err := r.getImageByName(c.Image)
if err != nil {
return nil, nil, err
}
hash, err := appctypes.NewHash(img.ID)
if err != nil {
return nil, nil, err
}
opts, err := r.generator.GenerateRunContainerOptions(pod, &c)
if err != nil {
return nil, nil, err
}
if err := setApp(imgManifest.App, &c, opts); err != nil {
return nil, nil, err
}
name, err := appctypes.SanitizeACName(c.Name)
if err != nil {
return nil, nil, err
}
appName := appctypes.MustACName(name)
kubehash := kubecontainer.HashContainer(&c)
return &appcschema.RuntimeApp{
Name: *appName,
Image: appcschema.RuntimeImage{ID: *hash},
App: imgManifest.App,
Annotations: []appctypes.Annotation{
{
Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
Value: strconv.FormatUint(kubehash, 10),
},
},
}, opts.PortMappings, nil
}
func kubernetesPodFilter(pod *api.Pod) *rktapi.PodFilter {
return &rktapi.PodFilter{
States: []rktapi.PodState{
//TODO: In the future some pods can remain running after some apps exit: https://github.com/appc/spec/pull/500
rktapi.PodState_POD_STATE_RUNNING,
rktapi.PodState_POD_STATE_EXITED,
rktapi.PodState_POD_STATE_DELETING,
rktapi.PodState_POD_STATE_GARBAGE,
},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
{
Key: k8sRktUIDAnno,
Value: string(pod.UID),
},
},
}
}
func newUnitOption(section, name, value string) *unit.UnitOption {
return &unit.UnitOption{Section: section, Name: name, Value: value}
}
@ -719,6 +813,79 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
return nil
}
// convertRktPod will convert a rktapi.Pod to a kubecontainer.Pod
func (r *Runtime) convertRktPod(rktpod rktapi.Pod) (*kubecontainer.Pod, error) {
manifest := &appcschema.PodManifest{}
err := json.Unmarshal(rktpod.Manifest, manifest)
if err != nil {
return nil, err
}
podUID, ok := manifest.Annotations.Get(k8sRktUIDAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktUIDAnno)
}
podName, ok := manifest.Annotations.Get(k8sRktNameAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktNameAnno)
}
podNamespace, ok := manifest.Annotations.Get(k8sRktNamespaceAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktNamespaceAnno)
}
podCreatedString, ok := manifest.Annotations.Get(k8sRktCreationTimeAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktCreationTimeAnno)
}
podCreated, err := strconv.ParseInt(podCreatedString, 10, 64)
if err != nil {
return nil, fmt.Errorf("couldn't parse pod creation timestamp: %v", err)
}
var state kubecontainer.ContainerState
switch rktpod.State {
case rktapi.PodState_POD_STATE_RUNNING:
state = kubecontainer.ContainerStateRunning
case rktapi.PodState_POD_STATE_ABORTED_PREPARE, rktapi.PodState_POD_STATE_EXITED,
rktapi.PodState_POD_STATE_DELETING, rktapi.PodState_POD_STATE_GARBAGE:
state = kubecontainer.ContainerStateExited
default:
state = kubecontainer.ContainerStateUnknown
}
kubepod := &kubecontainer.Pod{
ID: types.UID(podUID),
Name: podName,
Namespace: podNamespace,
}
for _, app := range rktpod.Apps {
manifest := &appcschema.ImageManifest{}
err := json.Unmarshal(app.Image.Manifest, manifest)
if err != nil {
return nil, err
}
containerHashString, ok := manifest.Annotations.Get(k8sRktContainerHashAnno)
if !ok {
return nil, fmt.Errorf("app is missing annotation %s", k8sRktContainerHashAnno)
}
containerHash, err := strconv.ParseUint(containerHashString, 10, 64)
if err != nil {
return nil, fmt.Errorf("couldn't parse container's hash: %v", err)
}
kubepod.Containers = append(kubepod.Containers, &kubecontainer.Container{
ID: buildContainerID(&containerID{rktpod.Id, app.Name}),
Name: app.Name,
Image: app.Image.Name,
Hash: containerHash,
Created: podCreated,
State: state,
})
}
return kubepod, nil
}
// readServiceFile reads the service file and constructs the runtime pod and the rkt info.
func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) {
f, err := os.Open(serviceFilePath(serviceName))
@ -770,35 +937,43 @@ func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktI
func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
glog.V(4).Infof("Rkt getting pods")
units, err := r.systemd.ListUnits()
listReq := &rktapi.ListPodsRequest{
Filter: &rktapi.PodFilter{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
}
if !all {
listReq.Filter.States = []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING}
}
listResp, err := r.apisvc.ListPods(context.Background(), listReq)
if err != nil {
return nil, fmt.Errorf("couldn't list pods: %v", err)
}
var pods []*kubecontainer.Pod
for _, rktpod := range listResp.Pods {
//TODO: get the manifest from listresp.Pods when this gets merged: https://github.com/coreos/rkt/pull/1786
inspectResp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{rktpod.Id})
if err != nil {
return nil, err
}
var pods []*kubecontainer.Pod
for _, u := range units {
if strings.HasPrefix(u.Name, kubernetesUnitPrefix) {
var state kubecontainer.ContainerState
switch {
case u.SubState == "running":
state = kubecontainer.ContainerStateRunning
default:
state = kubecontainer.ContainerStateExited
if inspectResp.Pod == nil {
return nil, fmt.Errorf("pod %s vanished?!", rktpod.Id)
}
if !all && state != kubecontainer.ContainerStateRunning {
continue
}
pod, _, err := r.readServiceFile(u.Name)
pod, err := r.convertRktPod(*inspectResp.Pod)
if err != nil {
glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
continue
}
for _, c := range pod.Containers {
c.State = state
}
pods = append(pods, pod)
}
}
return pods, nil
}

View File

@ -17,11 +17,15 @@ limitations under the License.
package rkt
import (
"encoding/json"
"fmt"
"testing"
appcschema "github.com/appc/spec/schema"
appctypes "github.com/appc/spec/schema/types"
rktapi "github.com/coreos/rkt/api/v1alpha"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/types"
)
func TestCheckVersion(t *testing.T) {
@ -197,3 +201,261 @@ func TestListImages(t *testing.T) {
fr.CleanCalls()
}
}
func TestGetPods(t *testing.T) {
fr := newFakeRktInterface()
fs := newFakeSystemd()
r := &Runtime{apisvc: fr, systemd: fs}
tests := []struct {
k8sUID types.UID
k8sName string
k8sNamespace string
k8sCreation int64
k8sRestart int
k8sContHashes []uint64
rktPodState rktapi.PodState
pods []*rktapi.Pod
}{
{},
{
k8sUID: types.UID("0"),
k8sName: "guestbook",
k8sNamespace: "default",
k8sCreation: 10000000000,
k8sRestart: 1,
k8sContHashes: []uint64{2353434678},
rktPodState: rktapi.PodState_POD_STATE_RUNNING,
pods: []*rktapi.Pod{
{
State: rktapi.PodState_POD_STATE_RUNNING,
Apps: []*rktapi.App{
{
Name: "test",
Image: &rktapi.Image{
Name: "test",
Manifest: mustMarshalImageManifest(
&appcschema.ImageManifest{
ACKind: appcschema.ImageManifestKind,
ACVersion: appcschema.AppContainerVersion,
Name: *appctypes.MustACIdentifier("test"),
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
Value: "2353434678",
},
},
},
),
},
},
},
Manifest: mustMarshalPodManifest(
&appcschema.PodManifest{
ACKind: appcschema.PodManifestKind,
ACVersion: appcschema.AppContainerVersion,
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktKubeletAnno),
Value: k8sRktKubeletAnnoValue,
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktUIDAnno),
Value: "0",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktNameAnno),
Value: "guestbook",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktNamespaceAnno),
Value: "default",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktCreationTimeAnno),
Value: "10000000000",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktRestartCountAnno),
Value: "1",
},
},
},
),
},
},
},
{
k8sUID: types.UID("1"),
k8sName: "test-pod",
k8sNamespace: "default",
k8sCreation: 10000000001,
k8sRestart: 3,
k8sContHashes: []uint64{2353434682, 8732645},
rktPodState: rktapi.PodState_POD_STATE_EXITED,
pods: []*rktapi.Pod{
{
State: rktapi.PodState_POD_STATE_EXITED,
Apps: []*rktapi.App{
{
Name: "test",
Image: &rktapi.Image{
Name: "test",
Manifest: mustMarshalImageManifest(
&appcschema.ImageManifest{
ACKind: appcschema.ImageManifestKind,
ACVersion: appcschema.AppContainerVersion,
Name: *appctypes.MustACIdentifier("test"),
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
Value: "2353434682",
},
},
},
),
},
},
{
Name: "test2",
Image: &rktapi.Image{
Name: "test2",
Manifest: mustMarshalImageManifest(
&appcschema.ImageManifest{
ACKind: appcschema.ImageManifestKind,
ACVersion: appcschema.AppContainerVersion,
Name: *appctypes.MustACIdentifier("test2"),
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
Value: "8732645",
},
},
},
),
},
},
},
Manifest: mustMarshalPodManifest(
&appcschema.PodManifest{
ACKind: appcschema.PodManifestKind,
ACVersion: appcschema.AppContainerVersion,
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktKubeletAnno),
Value: k8sRktKubeletAnnoValue,
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktUIDAnno),
Value: "1",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktNameAnno),
Value: "test-pod",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktNamespaceAnno),
Value: "default",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktCreationTimeAnno),
Value: "10000000001",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktRestartCountAnno),
Value: "3",
},
},
},
),
},
},
},
}
for i, tt := range tests {
fr.pods = tt.pods
pods, err := r.GetPods(true)
if err != nil {
t.Errorf("%v", err)
}
assert.Equal(t, len(pods), len(tt.pods), fmt.Sprintf("test case %d: mismatched number of pods", i))
for j, pod := range pods {
assert.Equal(t, pod.ID, tt.k8sUID, fmt.Sprintf("test case %d: mismatched UIDs", i))
assert.Equal(t, pod.Name, tt.k8sName, fmt.Sprintf("test case %d: mismatched Names", i))
assert.Equal(t, pod.Namespace, tt.k8sNamespace, fmt.Sprintf("test case %d: mismatched Namespaces", i))
assert.Equal(t, len(pod.Containers), len(tt.pods[j].Apps), fmt.Sprintf("test case %d: mismatched number of containers", i))
for k, cont := range pod.Containers {
assert.Equal(t, cont.Created, tt.k8sCreation, fmt.Sprintf("test case %d: mismatched creation times", i))
assert.Equal(t, cont.Hash, tt.k8sContHashes[k], fmt.Sprintf("test case %d: mismatched container hashes", i))
}
}
var inspectPodCalls []string
for range pods {
inspectPodCalls = append(inspectPodCalls, "InspectPod")
}
assert.Equal(t, append([]string{"ListPods"}, inspectPodCalls...), fr.called, fmt.Sprintf("test case %d: unexpected called list", i))
fr.CleanCalls()
}
}
func TestGetPodsFilter(t *testing.T) {
fr := newFakeRktInterface()
fs := newFakeSystemd()
r := &Runtime{apisvc: fr, systemd: fs}
for _, test := range []struct {
All bool
ExpectedFilter *rktapi.PodFilter
}{
{
true,
&rktapi.PodFilter{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
},
{
false,
&rktapi.PodFilter{
States: []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
},
} {
_, err := r.GetPods(test.All)
if err != nil {
t.Errorf("%v", err)
}
assert.Equal(t, test.ExpectedFilter, fr.podFilter, "filters didn't match when all=%b", test.All)
}
}
func mustMarshalPodManifest(man *appcschema.PodManifest) []byte {
manblob, err := json.Marshal(man)
if err != nil {
panic(err)
}
return manblob
}
func mustMarshalImageManifest(man *appcschema.ImageManifest) []byte {
manblob, err := json.Marshal(man)
if err != nil {
panic(err)
}
return manblob
}