Sync static pods from Kubelet to the API server

Currently, API server is not aware of the static pods (manifests from
sources other than the API server, e.g. file and http) at all. This is
inconvenient since users cannot check the static pods through kubectl.
It is also sub-optimal because scheduler is unaware of the resource
consumption by these static pods on the node.

This change syncs the information back to the API server by creating a
mirror pod via API server for each static pod.

 - Kubelet creates containers for the static pod, as it would do
   normally.

 - If a mirror pod gets deleted, Kubelet will re-create one. The
   containers are sync'd to the static pods, so they will not be
   affected.

 - If a static pod gets removed from the source (e.g. manifest file
   removed from the directory), the orphaned mirror pod will be deleted.

Note that because events are associated with UID, and the mirror pod has
a different UID than the original static pod, the events will not be
shown for the mirror pod when running `kubectl describe pod
<mirror_pod>`.
pull/6/head
Yu-Ju Hong 2015-03-09 15:46:47 -07:00
parent 2c97522692
commit 929fb63b33
14 changed files with 554 additions and 79 deletions

View File

@ -19,6 +19,7 @@ limitations under the License.
package main
import (
"errors"
"fmt"
"io/ioutil"
"net"
@ -34,13 +35,14 @@ import (
kubeletapp "github.com/GoogleCloudPlatform/kubernetes/cmd/kubelet/app"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir"
@ -118,7 +120,7 @@ func (h *delegateHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusNotFound)
}
func startComponents(manifestURL string) (apiServerURL string) {
func startComponents(manifestURL string) (string, string) {
// Setup
servers := []string{}
glog.Infof("Creating etcd client pointing to %v", servers)
@ -215,21 +217,24 @@ func startComponents(manifestURL string) (apiServerURL string) {
cadvisorInterface := new(cadvisor.Fake)
// Kubelet (localhost)
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
testRootDir := makeTempDirOrDie("kubelet_integ_1.", "")
configFilePath := makeTempDirOrDie("config", testRootDir)
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
kubeletapp.SimpleRunKubelet(cl, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface)
kubeletapp.SimpleRunKubelet(cl, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
testRootDir = makeTempDirOrDie("kubelet_integ_2.")
testRootDir = makeTempDirOrDie("kubelet_integ_2.", "")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
kubeletapp.SimpleRunKubelet(cl, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface)
return apiServer.URL
kubeletapp.SimpleRunKubelet(cl, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "")
return apiServer.URL, configFilePath
}
func makeTempDirOrDie(prefix string) string {
tempDir, err := ioutil.TempDir("/tmp", prefix)
func makeTempDirOrDie(prefix string, baseDir string) string {
if baseDir == "" {
baseDir = "/tmp"
}
tempDir, err := ioutil.TempDir(baseDir, prefix)
if err != nil {
glog.Fatalf("Can't make a temp rootdir: %v", err)
}
@ -278,6 +283,60 @@ func podExists(c *client.Client, podNamespace string, podID string) wait.Conditi
}
}
func podNotFound(c *client.Client, podNamespace string, podID string) wait.ConditionFunc {
return func() (bool, error) {
_, err := c.Pods(podNamespace).Get(podID)
return apierrors.IsNotFound(err), nil
}
}
func podRunning(c *client.Client, podNamespace string, podID string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.Pods(podNamespace).Get(podID)
if err != nil {
return false, err
}
if pod.Status.Phase != api.PodRunning {
return false, errors.New(fmt.Sprintf("Pod status is %q", pod.Status.Phase))
}
return true, nil
}
}
func runStaticPodTest(c *client.Client, configFilePath string) {
manifest := `version: v1beta2
id: static-pod
containers:
- name: static-container
image: kubernetes/pause`
manifestFile, err := ioutil.TempFile(configFilePath, "")
defer os.Remove(manifestFile.Name())
ioutil.WriteFile(manifestFile.Name(), []byte(manifest), 0600)
// Wait for the mirror pod to be created.
hostname, _ := os.Hostname()
podName := fmt.Sprintf("static-pod-%s", hostname)
namespace := kubelet.NamespaceDefault
if err := wait.Poll(time.Second, time.Second*30,
podRunning(c, namespace, podName)); err != nil {
glog.Fatalf("FAILED: mirror pod has not been created or is not running: %v", err)
}
// Delete the mirror pod, and wait for it to be recreated.
c.Pods(namespace).Delete(podName)
if err = wait.Poll(time.Second, time.Second*30,
podRunning(c, namespace, podName)); err != nil {
glog.Fatalf("FAILED: mirror pod has not been re-created or is not running: %v", err)
}
// Remove the manifest file, and wait for the mirror pod to be deleted.
os.Remove(manifestFile.Name())
if err = wait.Poll(time.Second, time.Second*30,
podNotFound(c, namespace, podName)); err != nil {
glog.Fatalf("FAILED: mirror pod has not been deleted: %v", err)
}
}
func runReplicationControllerTest(c *client.Client) {
data, err := ioutil.ReadFile("cmd/integration/controller.json")
if err != nil {
@ -447,7 +506,7 @@ func runAtomicPutTest(c *client.Client) {
glog.Infof("Posting update (%s, %s)", l, v)
err = c.Put().Resource("services").Name(svc.Name).Body(&tmpSvc).Do().Error()
if err != nil {
if errors.IsConflict(err) {
if apierrors.IsConflict(err) {
glog.Infof("Conflict: (%s, %s)", l, v)
// This is what we expect.
continue
@ -733,7 +792,7 @@ func main() {
manifestURL := ServeCachedManifestFile()
apiServerURL := startComponents(manifestURL)
apiServerURL, configFilePath := startComponents(manifestURL)
// Ok. we're good to go.
glog.Infof("API Server started on %s", apiServerURL)
@ -754,6 +813,9 @@ func main() {
runSelfLinkTestOnNamespace(c, "")
runSelfLinkTestOnNamespace(c, "other")
},
func(c *client.Client) {
runStaticPodTest(c, configFilePath)
},
}
var wg sync.WaitGroup
wg.Add(len(testFuncs))
@ -782,11 +844,15 @@ func main() {
createdConts.Insert(p[:n-8])
}
}
// We expect 9: 2 infra containers + 2 containers from the replication controller +
// 1 infra container + 2 containers from the URL +
// 1 infra container + 1 container from the service test.
if len(createdConts) != 9 {
glog.Fatalf("Expected 9 containers; got %v\n\nlist of created containers:\n\n%#v\n\nDocker 1 Created:\n\n%#v\n\nDocker 2 Created:\n\n%#v\n\n", len(createdConts), createdConts.List(), fakeDocker1.Created, fakeDocker2.Created)
// We expect 9: 2 pod infra containers + 2 pods from the replication controller +
// 1 pod infra container + 2 pods from the URL +
// 1 pod infra container + 1 pod from the service test.
// In addition, runStaticPodTest creates 1 pod infra containers +
// 1 pod container from the mainfest file
// The total number of container created is 11
if len(createdConts) != 11 {
glog.Fatalf("Expected 11 containers; got %v\n\nlist of created containers:\n\n%#v\n\nDocker 1 Created:\n\n%#v\n\nDocker 2 Created:\n\n%#v\n\n", len(createdConts), createdConts.List(), fakeDocker1.Created, fakeDocker2.Created)
}
glog.Infof("OK - found created containers: %#v", createdConts.List())
}

View File

@ -244,7 +244,8 @@ func SimpleRunKubelet(client *client.Client,
masterServiceNamespace string,
volumePlugins []volume.Plugin,
tlsOptions *kubelet.TLSOptions,
cadvisorInterface cadvisor.Interface) {
cadvisorInterface cadvisor.Interface,
configFilePath string) {
kcfg := KubeletConfig{
KubeClient: client,
DockerClient: dockerClient,
@ -264,6 +265,7 @@ func SimpleRunKubelet(client *client.Client,
VolumePlugins: volumePlugins,
TLSOptions: tlsOptions,
CadvisorInterface: cadvisorInterface,
ConfigFile: configFilePath,
}
RunKubelet(&kcfg)
}

View File

@ -151,7 +151,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP
if err != nil {
glog.Fatalf("Failed to create cAdvisor: %v", err)
}
kubeletapp.SimpleRunKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface)
kubeletapp.SimpleRunKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "")
}
func newApiClient(addr net.IP, port int) *client.Client {

View File

@ -206,7 +206,7 @@ func extractFromFile(filename string) (api.Pod, error) {
if glog.V(4) {
glog.Infof("Got pod from file %q: %#v", filename, pod)
} else {
glog.V(1).Infof("Got pod from file %q: %s.%s (%s)", filename, pod.Namespace, pod.Name, pod.UID)
glog.V(5).Infof("Got pod from file %q: %s.%s (%s)", filename, pod.Namespace, pod.Name, pod.UID)
}
return pod, nil
}

View File

@ -78,19 +78,23 @@ const (
)
var (
// ErrNoKubeletContainers returned when there are not containers managed by the kubelet (ie: either no containers on the node, or none that the kubelet cares about).
// ErrNoKubeletContainers returned when there are not containers managed by
// the kubelet (ie: either no containers on the node, or none that the kubelet cares about).
ErrNoKubeletContainers = errors.New("no containers managed by kubelet")
// ErrContainerNotFound returned when a container in the given pod with the given container name was not found, amongst those managed by the kubelet.
// ErrContainerNotFound returned when a container in the given pod with the
// given container name was not found, amongst those managed by the kubelet.
ErrContainerNotFound = errors.New("no matching container")
)
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet,
startTime time.Time) error
}
type SourcesReadyFn func() bool
@ -206,6 +210,8 @@ func NewMainKubelet(
klet.podStatuses = make(map[string]api.PodStatus)
klet.mirrorManager = newBasicMirrorManager(klet.kubeClient)
return klet, nil
}
@ -235,6 +241,11 @@ type Kubelet struct {
// pods are immutable
podLock sync.RWMutex
pods []api.Pod
// Record the set of mirror pods (see mirror_manager.go for more details);
// similar to pods, this is not immutable and is protected by the same podLock.
// Note that Kubelet.pods do not contain mirror pods as they are filtered
// out beforehand.
mirrorPods util.StringSet
// Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events
@ -288,6 +299,9 @@ type Kubelet struct {
// A pod status cache currently used to store rejected pods and their statuses.
podStatusesLock sync.RWMutex
podStatuses map[string]api.PodStatus
// A mirror pod manager which provides helper functions.
mirrorManager mirrorManager
}
// getRootDir returns the full path to the directory under which kubelet can
@ -1240,7 +1254,7 @@ type podContainerChangesSpec struct {
containersToKeep map[dockertools.DockerID]int
}
func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, containersInPod dockertools.DockerContainers) (podContainerChangesSpec, error) {
func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) (podContainerChangesSpec, error) {
podFullName := GetPodFullName(pod)
uid := pod.UID
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
@ -1343,10 +1357,10 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, containersInPod dock
}, nil
}
func (kl *Kubelet) syncPod(pod *api.Pod, containersInPod dockertools.DockerContainers) error {
func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) error {
podFullName := GetPodFullName(pod)
uid := pod.UID
containerChanges, err := kl.computePodContainerChanges(pod, containersInPod)
containerChanges, err := kl.computePodContainerChanges(pod, hasMirrorPod, containersInPod)
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
if err != nil {
return err
@ -1416,6 +1430,13 @@ func (kl *Kubelet) syncPod(pod *api.Pod, containersInPod dockertools.DockerConta
kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID)
}
if !hasMirrorPod && isStaticPod(pod) {
glog.V(4).Infof("Creating a mirror pod %q", podFullName)
if err := kl.mirrorManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
glog.Errorf("Failed creating a mirror pod %q: %#v", podFullName, err)
}
}
return nil
}
@ -1496,7 +1517,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
@ -1543,7 +1564,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}
// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, func() {
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.Has(podFullName), func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
@ -1604,6 +1625,9 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
return err
}
// Remove any orphaned mirror pods.
deleteOrphanedMirrorPods(pods, mirrorPods, kl.mirrorManager)
return err
}
@ -1704,18 +1728,18 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
}
}
pods, err := kl.GetPods()
pods, mirrorPods, err := kl.GetPods()
if err != nil {
glog.Errorf("Failed to get bound pods.")
return
}
if err := handler.SyncPods(pods, podSyncTypes, start); err != nil {
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
}
}
}
// Updated the Kubelet's internal pods with those provided by the update.
// Update the Kubelet's internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
kl.podLock.Lock()
@ -1723,6 +1747,7 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
newPods, newMirrorPods := filterAndCategorizePods(u.Pods)
// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
@ -1730,13 +1755,15 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
for i := range kl.pods {
existingPods[kl.pods[i].UID] = struct{}{}
}
for i := range u.Pods {
if _, ok := existingPods[u.Pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodCreate
for _, pod := range newPods {
if _, ok := existingPods[pod.UID]; !ok {
podSyncTypes[pod.UID] = metrics.SyncPodCreate
}
}
// Actually update the pods.
kl.pods = newPods
kl.mirrorPods = newMirrorPods
kl.pods = u.Pods
kl.handleHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
@ -1746,7 +1773,8 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}
kl.pods = updatePods(u.Pods, kl.pods)
allPods := updatePods(u.Pods, kl.pods)
kl.pods, kl.mirrorPods = filterAndCategorizePods(allPods)
kl.handleHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
@ -1818,11 +1846,12 @@ func (kl *Kubelet) GetHostname() string {
return kl.hostname
}
// GetPods returns all pods bound to the kubelet and their spec.
func (kl *Kubelet) GetPods() ([]api.Pod, error) {
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pod map.
func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet, error) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
return append([]api.Pod{}, kl.pods...), nil
return append([]api.Pod{}, kl.pods...), kl.mirrorPods, nil
}
// GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found.

View File

@ -53,11 +53,12 @@ func init() {
}
type TestKubelet struct {
kubelet *Kubelet
fakeDocker *dockertools.FakeDockerClient
fakeCadvisor *cadvisor.Mock
fakeKubeClient *client.Fake
waitGroup *sync.WaitGroup
kubelet *Kubelet
fakeDocker *dockertools.FakeDockerClient
fakeCadvisor *cadvisor.Mock
fakeKubeClient *client.Fake
waitGroup *sync.WaitGroup
fakeMirrorManager *fakeMirrorManager
}
func newTestKubelet(t *testing.T) *TestKubelet {
@ -83,8 +84,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
waitGroup := new(sync.WaitGroup)
kubelet.podWorkers = newPodWorkers(
fakeDockerCache,
func(pod *api.Pod, containers dockertools.DockerContainers) error {
err := kubelet.syncPod(pod, containers)
func(pod *api.Pod, hasMirrorPod bool, containers dockertools.DockerContainers) error {
err := kubelet.syncPod(pod, hasMirrorPod, containers)
waitGroup.Done()
return err
},
@ -100,8 +101,9 @@ func newTestKubelet(t *testing.T) *TestKubelet {
}
mockCadvisor := &cadvisor.Mock{}
kubelet.cadvisor = mockCadvisor
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup}
mirrorManager := newFakeMirrorMananger()
kubelet.mirrorManager = mirrorManager
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, mirrorManager}
}
func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) {
@ -442,7 +444,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -475,7 +477,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -524,7 +526,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -577,7 +579,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -627,7 +629,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -684,7 +686,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -753,7 +755,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
},
}
waitGroup.Add(2)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -793,7 +795,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, time.Now()); err != nil {
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
@ -801,7 +803,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
fakeDocker.ClearCalls()
ready = true
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, time.Now()); err != nil {
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
@ -839,7 +841,7 @@ func TestSyncPodsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, time.Now())
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -893,7 +895,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
},
}
kubelet.pods = append(kubelet.pods, bound)
err := kubelet.syncPod(&bound, dockerContainers)
err := kubelet.syncPod(&bound, false, dockerContainers)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -934,7 +936,7 @@ func TestSyncPodBadHash(t *testing.T) {
},
}
kubelet.pods = append(kubelet.pods, bound)
err := kubelet.syncPod(&bound, dockerContainers)
err := kubelet.syncPod(&bound, false, dockerContainers)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -988,7 +990,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
},
}
kubelet.pods = append(kubelet.pods, bound)
err := kubelet.syncPod(&bound, dockerContainers)
err := kubelet.syncPod(&bound, false, dockerContainers)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1678,7 +1680,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
},
}
kubelet.pods = append(kubelet.pods, bound)
err := kubelet.syncPod(&bound, dockerContainers)
err := kubelet.syncPod(&bound, false, dockerContainers)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -2075,7 +2077,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
},
},
},
}, emptyPodUIDs, time.Now())
}, emptyPodUIDs, util.NewStringSet(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -3176,7 +3178,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]api.Pod{}, emptyPodUIDs, time.Now())
kl.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
if len(kl.podStatuses) != 0 {
t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses)
}
@ -3388,3 +3390,57 @@ func TestUpdateNodeStatusError(t *testing.T) {
t.Errorf("unexpected actions: %v", kubeClient.Actions)
}
}
func TestCreateMirrorPod(t *testing.T) {
testKubelet := newTestKubelet(t)
kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorManager
pod := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "bar",
Namespace: "foo",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "file",
},
},
}
kl.pods = append(kl.pods, pod)
hasMirrorPod := false
err := kl.syncPod(&pod, hasMirrorPod, dockertools.DockerContainers{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
podFullName := GetPodFullName(&pod)
if !manager.HasPod(podFullName) {
t.Errorf("expected mirror pod %q to be created", podFullName)
}
if manager.NumOfPods() != 1 || !manager.HasPod(podFullName) {
t.Errorf("expected one mirror pod %q, got %v", podFullName, manager.GetPods())
}
}
func TestDeleteOrphanedMirrorPods(t *testing.T) {
testKubelet := newTestKubelet(t)
kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorManager
orphanedPodNames := []string{"pod1_ns", "pod2_ns"}
mirrorPods := util.NewStringSet()
for _, name := range orphanedPodNames {
mirrorPods.Insert(name)
}
// Sync with an empty pod list to delete all mirror pods.
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, mirrorPods, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if manager.NumOfPods() != 0 {
t.Errorf("expected zero mirror pods, got %v", manager.GetPods())
}
for _, name := range orphanedPodNames {
creates, deletes := manager.GetCounts(name)
if creates != 0 || deletes != 1 {
t.Errorf("expected 0 creation and one deletion of %q, got %d, %d", name, creates, deletes)
}
}
}

View File

@ -0,0 +1,138 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
// Kubelet discover pod updates from 3 sources: file, http, and apiserver.
// Pods from non-apiserver sources are called static pods, and API server is
// not aware of the existence of static pods. In order to monitor the status of
// such pods, kubelet create a mirror pod for each static pod via the API
// server.
//
// A mirror pod has the same pod full name (name and namespace) as its static
// counterpart (albeit different metadata such as UID, etc). By leveraging the
// fact that kubelet reports the pod status using the pod full name, the status
// of the mirror pod always reflects the acutal status of the static pod.
// When a static pod gets deleted, the associated orphaned mirror pods will
// also be removed.
//
// This file includes functions to manage the mirror pods.
type mirrorManager interface {
CreateMirrorPod(api.Pod, string) error
DeleteMirrorPod(string) error
}
type basicMirrorManager struct {
// mirror pods are stored in the kubelet directly because they need to be
// in sync with the internal pods.
apiserverClient client.Interface
}
func newBasicMirrorManager(apiserverClient client.Interface) *basicMirrorManager {
return &basicMirrorManager{apiserverClient: apiserverClient}
}
// Creates a mirror pod.
func (self *basicMirrorManager) CreateMirrorPod(pod api.Pod, hostname string) error {
if self.apiserverClient == nil {
return nil
}
// Indicate that the pod should be scheduled to the current node.
pod.Spec.Host = hostname
pod.Annotations[ConfigMirrorAnnotationKey] = MirrorType
_, err := self.apiserverClient.Pods(NamespaceDefault).Create(&pod)
return err
}
// Deletes a mirror pod.
func (self *basicMirrorManager) DeleteMirrorPod(podFullName string) error {
if self.apiserverClient == nil {
return nil
}
name, namespace, err := ParsePodFullName(podFullName)
if err != nil {
glog.Errorf("Failed to parse a pod full name %q", podFullName)
return err
}
glog.V(4).Infof("Deleting a mirror pod %q", podFullName)
if err := self.apiserverClient.Pods(namespace).Delete(name); err != nil {
glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err)
}
return nil
}
// Delete all orphaned mirror pods.
func deleteOrphanedMirrorPods(pods []api.Pod, mirrorPods util.StringSet, manager mirrorManager) {
existingPods := util.NewStringSet()
for _, pod := range pods {
existingPods.Insert(GetPodFullName(&pod))
}
for podFullName := range mirrorPods {
if !existingPods.Has(podFullName) {
manager.DeleteMirrorPod(podFullName)
}
}
}
// Helper functions.
func getPodSource(pod *api.Pod) (string, error) {
if pod.Annotations != nil {
if source, ok := pod.Annotations[ConfigSourceAnnotationKey]; ok {
return source, nil
}
}
return "", fmt.Errorf("cannot get source of pod %q", pod.UID)
}
func isStaticPod(pod *api.Pod) bool {
source, err := getPodSource(pod)
return err == nil && source != ApiserverSource
}
func isMirrorPod(pod *api.Pod) bool {
if value, ok := pod.Annotations[ConfigMirrorAnnotationKey]; !ok {
return false
} else {
return value == MirrorType
}
}
// This function separate the mirror pods from regular pods to
// facilitate pods syncing and mirror pod creation/deletion.
func filterAndCategorizePods(pods []api.Pod) ([]api.Pod, util.StringSet) {
filteredPods := []api.Pod{}
mirrorPods := util.NewStringSet()
for _, pod := range pods {
name := GetPodFullName(&pod)
if isMirrorPod(&pod) {
mirrorPods.Insert(name)
} else {
filteredPods = append(filteredPods, pod)
}
}
return filteredPods, mirrorPods
}

View File

@ -0,0 +1,158 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"reflect"
"sync"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
type fakeMirrorManager struct {
mirrorPodLock sync.RWMutex
// Note that a real mirror manager does not store the mirror pods in
// itself. This fake manager does this to track calls.
mirrorPods util.StringSet
createCounts map[string]int
deleteCounts map[string]int
}
func (self *fakeMirrorManager) CreateMirrorPod(pod api.Pod, _ string) error {
self.mirrorPodLock.Lock()
defer self.mirrorPodLock.Unlock()
podFullName := GetPodFullName(&pod)
self.mirrorPods.Insert(podFullName)
self.createCounts[podFullName]++
return nil
}
func (self *fakeMirrorManager) DeleteMirrorPod(podFullName string) error {
self.mirrorPodLock.Lock()
defer self.mirrorPodLock.Unlock()
self.mirrorPods.Delete(podFullName)
self.deleteCounts[podFullName]++
return nil
}
func newFakeMirrorMananger() *fakeMirrorManager {
m := fakeMirrorManager{}
m.mirrorPods = util.NewStringSet()
m.createCounts = make(map[string]int)
m.deleteCounts = make(map[string]int)
return &m
}
func (self *fakeMirrorManager) HasPod(podFullName string) bool {
self.mirrorPodLock.RLock()
defer self.mirrorPodLock.RUnlock()
return self.mirrorPods.Has(podFullName)
}
func (self *fakeMirrorManager) NumOfPods() int {
self.mirrorPodLock.RLock()
defer self.mirrorPodLock.RUnlock()
return self.mirrorPods.Len()
}
func (self *fakeMirrorManager) GetPods() []string {
self.mirrorPodLock.RLock()
defer self.mirrorPodLock.RUnlock()
return self.mirrorPods.List()
}
func (self *fakeMirrorManager) GetCounts(podFullName string) (int, int) {
self.mirrorPodLock.RLock()
defer self.mirrorPodLock.RUnlock()
return self.createCounts[podFullName], self.deleteCounts[podFullName]
}
// Tests that mirror pods are filtered out properly from the pod update.
func TestFilterOutMirrorPods(t *testing.T) {
mirrorPod := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "987654321",
Name: "bar",
Namespace: "default",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
}
staticPod := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "123456789",
Name: "bar",
Namespace: "default",
Annotations: map[string]string{ConfigSourceAnnotationKey: "file"},
},
}
expectedPods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "999999999",
Name: "taco",
Namespace: "default",
Annotations: map[string]string{ConfigSourceAnnotationKey: "api"},
},
},
staticPod,
}
updates := append(expectedPods, mirrorPod)
actualPods, actualMirrorPods := filterAndCategorizePods(updates)
if !reflect.DeepEqual(expectedPods, actualPods) {
t.Errorf("expected %#v, got %#v", expectedPods, actualPods)
}
if !actualMirrorPods.Has(GetPodFullName(&mirrorPod)) {
t.Errorf("mirror pod is not recorded")
}
}
func TestParsePodFullName(t *testing.T) {
type nameTuple struct {
Name string
Namespace string
}
successfulCases := map[string]nameTuple{
"bar_foo": {Name: "bar", Namespace: "foo"},
"bar.org_foo.com": {Name: "bar.org", Namespace: "foo.com"},
"bar-bar_foo": {Name: "bar-bar", Namespace: "foo"},
}
failedCases := []string{"barfoo", "bar_foo_foo", ""}
for podFullName, expected := range successfulCases {
name, namespace, err := ParsePodFullName(podFullName)
if err != nil {
t.Errorf("unexpected error when parsing the full name: %v", err)
continue
}
if name != expected.Name || namespace != expected.Namespace {
t.Errorf("expected name %q, namespace %q; got name %q, namespace %q",
expected.Name, expected.Namespace, name, namespace)
}
}
for _, podFullName := range failedCases {
_, _, err := ParsePodFullName(podFullName)
if err == nil {
t.Errorf("expected error when parsing the full name, got none")
}
}
}

View File

@ -28,7 +28,7 @@ import (
"github.com/golang/glog"
)
type syncPodFnType func(*api.Pod, dockertools.DockerContainers) error
type syncPodFnType func(*api.Pod, bool, dockertools.DockerContainers) error
type podWorkers struct {
// Protects podUpdates field.
@ -60,11 +60,15 @@ type workUpdate struct {
// The pod state to reflect.
pod *api.Pod
// Whether there exists a mirror pod for pod.
hasMirrorPod bool
// Function to call when the update is complete.
updateCompleteFn func()
}
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType, recorder record.EventRecorder) *podWorkers {
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType,
recorder record.EventRecorder) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
@ -92,7 +96,8 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
return
}
err = p.syncPodFn(newWork.pod, containers.FindContainersByPod(newWork.pod.UID, GetPodFullName(newWork.pod)))
err = p.syncPodFn(newWork.pod, newWork.hasMirrorPod,
containers.FindContainersByPod(newWork.pod.UID, GetPodFullName(newWork.pod)))
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
@ -106,7 +111,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
}
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
func (p *podWorkers) UpdatePod(pod *api.Pod, updateComplete func()) {
func (p *podWorkers) UpdatePod(pod *api.Pod, hasMirrorPod bool, updateComplete func()) {
uid := pod.UID
var podUpdates chan workUpdate
var exists bool
@ -129,11 +134,13 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, updateComplete func()) {
p.isWorking[pod.UID] = true
podUpdates <- workUpdate{
pod: pod,
hasMirrorPod: hasMirrorPod,
updateCompleteFn: updateComplete,
}
} else {
p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{
pod: pod,
hasMirrorPod: hasMirrorPod,
updateCompleteFn: updateComplete,
}
}

View File

@ -46,7 +46,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
podWorkers := newPodWorkers(
fakeDockerCache,
func(pod *api.Pod, containers dockertools.DockerContainers) error {
func(pod *api.Pod, hasMirrorPod bool, containers dockertools.DockerContainers) error {
func() {
lock.Lock()
defer lock.Unlock()
@ -54,7 +54,8 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
}()
return nil
},
recorder)
recorder,
)
return podWorkers, processed
}
@ -82,7 +83,7 @@ func TestUpdatePod(t *testing.T) {
numPods := 20
for i := 0; i < numPods; i++ {
for j := i; j < numPods; j++ {
podWorkers.UpdatePod(newPod(string(j), string(i)), func() {})
podWorkers.UpdatePod(newPod(string(j), string(i)), false, func() {})
}
}
drainWorkers(podWorkers, numPods)
@ -115,7 +116,7 @@ func TestForgetNonExistingPodWorkers(t *testing.T) {
numPods := 20
for i := 0; i < numPods; i++ {
podWorkers.UpdatePod(newPod(string(i), "name"), func() {})
podWorkers.UpdatePod(newPod(string(i), "name"), false, func() {})
}
drainWorkers(podWorkers, numPods)

View File

@ -104,7 +104,9 @@ func (kl *Kubelet) runPod(pod api.Pod, retryDelay time.Duration) error {
return nil
}
glog.Infof("pod %q containers not running: syncing", pod.Name)
if err = kl.syncPod(&pod, dockerContainers); err != nil {
// We don't create mirror pods in this mode; pass a dummy boolean value
// to sycnPod.
if err = kl.syncPod(&pod, false, dockerContainers); err != nil {
return fmt.Errorf("error syncing pod: %v", err)
}
if retry >= RunOnceMaxRetries {

View File

@ -38,6 +38,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
"github.com/golang/glog"
@ -84,7 +85,7 @@ type HostInterface interface {
GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
GetDockerVersion() ([]uint, error)
GetMachineInfo() (*cadvisorApi.MachineInfo, error)
GetPods() ([]api.Pod, error)
GetPods() ([]api.Pod, util.StringSet, error)
GetPodByName(namespace, name string) (*api.Pod, bool)
GetPodStatus(name string, uid types.UID) (api.PodStatus, error)
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
@ -258,9 +259,9 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
}
}
// handlePods returns a list of pod bounds to the Kubelet and their spec
// handlePods returns a list of pod bound to the Kubelet and their spec
func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) {
pods, err := s.host.GetPods()
pods, _, err := s.host.GetPods()
if err != nil {
s.error(w, err)
return

View File

@ -33,6 +33,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
cadvisorApi "github.com/google/cadvisor/info/v1"
@ -44,7 +45,7 @@ type fakeKubelet struct {
containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
rootInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
machineInfoFunc func() (*cadvisorApi.MachineInfo, error)
podsFunc func() ([]api.Pod, error)
podsFunc func() ([]api.Pod, util.StringSet, error)
logFunc func(w http.ResponseWriter, req *http.Request)
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
dockerVersionFunc func() ([]uint, error)
@ -79,7 +80,7 @@ func (fk *fakeKubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) {
return fk.machineInfoFunc()
}
func (fk *fakeKubelet) GetPods() ([]api.Pod, error) {
func (fk *fakeKubelet) GetPods() ([]api.Pod, util.StringSet, error) {
return fk.podsFunc()
}

View File

@ -18,11 +18,13 @@ package kubelet
import (
"fmt"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
const ConfigSourceAnnotationKey = "kubernetes.io/config.source"
const ConfigMirrorAnnotationKey = "kubernetes.io/config.mirror"
// PodOperation defines what changes will be made on a pod configuration.
type PodOperation int
@ -49,6 +51,9 @@ const (
// Updates from all sources
AllSource = "*"
// Used for ConfigMirrorAnnotationKey.
MirrorType = "mirror"
NamespaceDefault = api.NamespaceDefault
)
@ -67,7 +72,7 @@ type PodUpdate struct {
Source string
}
// GetPodFullName returns a name that uniquely identifies a pod across all config sources.
// GetPodFullName returns a name that uniquely identifies a pod.
func GetPodFullName(pod *api.Pod) string {
// Use underscore as the delimiter because it is not allowed in pod name
// (DNS subdomain format), while allowed in the container name format.
@ -78,3 +83,12 @@ func GetPodFullName(pod *api.Pod) string {
func BuildPodFullName(name, namespace string) string {
return name + "_" + namespace
}
// Parse the pod full name.
func ParsePodFullName(podFullName string) (string, string, error) {
parts := strings.Split(podFullName, "_")
if len(parts) != 2 {
return "", "", fmt.Errorf("failed to parse the pod full name %q", podFullName)
}
return parts[0], parts[1], nil
}