Delete pod_cache and rely on updating pod status by kublet.

pull/6/head
Filip Grzadkowski 2015-03-24 13:35:38 +01:00
parent 455fe8235b
commit 74da3b14b0
11 changed files with 44 additions and 1165 deletions

View File

@ -195,7 +195,6 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
ReadOnlyPort: portNumber,
PublicAddress: publicAddress,
CacheTimeout: 2 * time.Second,
SyncPodStatus: true,
EnableV1Beta3: true,
})
handler.delegate = m.Handler

View File

@ -73,7 +73,6 @@ type APIServer struct {
RuntimeConfig util.ConfigurationMap
KubeletConfig client.KubeletConfig
ClusterName string
SyncPodStatus bool
EnableProfiling bool
}
@ -94,7 +93,6 @@ func NewAPIServer() *APIServer {
EnableLogsSupport: true,
MasterServiceNamespace: api.NamespaceDefault,
ClusterName: "kubernetes",
SyncPodStatus: true,
RuntimeConfig: make(util.ConfigurationMap),
KubeletConfig: client.KubeletConfig{
@ -149,7 +147,6 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.AllowPrivileged, "allow_privileged", s.AllowPrivileged, "If true, allow privileged containers.")
fs.Var(&s.PortalNet, "portal_net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.")
fs.StringVar(&s.MasterServiceNamespace, "master_service_namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods")
fs.BoolVar(&s.SyncPodStatus, "sync_pod_status", s.SyncPodStatus, "If true, periodically fetch pods statuses from kubelets.")
fs.Var(&s.RuntimeConfig, "runtime_config", "A set of key=value pairs that describe runtime configuration that may be passed to the apiserver.")
client.BindKubeletClientConfigFlags(fs, &s.KubeletConfig)
fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster")
@ -250,7 +247,6 @@ func (s *APIServer) Run(_ []string) error {
EnableV1Beta3: v1beta3,
MasterServiceNamespace: s.MasterServiceNamespace,
ClusterName: s.ClusterName,
SyncPodStatus: s.SyncPodStatus,
}
m := master.New(config)

View File

@ -1423,6 +1423,11 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, runningPod kubeconta
if !hasMirrorPod && isStaticPod(pod) {
glog.V(4).Infof("Creating a mirror pod %q", podFullName)
// To make sure we will properly update static pod status we need to delete
// it from status manager. Otherwise it is possible that we will miss manual
// deletion of mirror pod in apiserver and will never reset its status to
// Running after recreating it.
kl.statusManager.DeletePodStatus(podFullName)
if err := kl.podManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
glog.Errorf("Failed creating a mirror pod %q: %#v", podFullName, err)
}
@ -1800,6 +1805,29 @@ func (kl *Kubelet) GetHostname() string {
return kl.hostname
}
// Returns host IP or nil in case of error.
func (kl *Kubelet) GetHostIP() (net.IP, error) {
node, err := kl.GetNode()
if err != nil {
return nil, fmt.Errorf("Cannot get node: %v", err)
}
addresses := node.Status.Addresses
addressMap := make(map[api.NodeAddressType][]api.NodeAddress)
for i := range addresses {
addressMap[addresses[i].Type] = append(addressMap[addresses[i].Type], addresses[i])
}
if addresses, ok := addressMap[api.NodeLegacyHostIP]; ok {
return net.ParseIP(addresses[0].Address), nil
}
if addresses, ok := addressMap[api.NodeInternalIP]; ok {
return net.ParseIP(addresses[0].Address), nil
}
if addresses, ok := addressMap[api.NodeExternalIP]; ok {
return net.ParseIP(addresses[0].Address), nil
}
return nil, fmt.Errorf("Host IP unknown; known addresses: %v", addresses)
}
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pods.
func (kl *Kubelet) GetPods() []api.Pod {
@ -2012,7 +2040,12 @@ func (kl *Kubelet) generatePodStatusByPod(pod *api.Pod) (api.PodStatus, error) {
podStatus.Info[c.Name] = containerStatus
}
podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, podStatus.Info)...)
podStatus.Host = kl.hostname
podStatus.Host = kl.GetHostname()
hostIP, err := kl.GetHostIP()
if err != nil {
return api.PodStatus{}, fmt.Errorf("Cannot get host IP: %v", err)
}
podStatus.HostIP = hostIP.String()
return *podStatus, nil
}

View File

@ -114,9 +114,6 @@ type Config struct {
// The name of the cluster.
ClusterName string
// If true we will periodically probe pods statuses.
SyncPodStatus bool
}
// Master contains state for a Kubernetes cluster master/api server.
@ -369,20 +366,6 @@ func (m *Master) init(c *Config) {
m.nodeRegistry = registry
nodeStorage := minion.NewStorage(m.nodeRegistry, c.KubeletClient)
// TODO: unify the storage -> registry and storage -> client patterns
nodeStorageClient := RESTStorageToNodes(nodeStorage)
podCache := NewPodCache(
c.KubeletClient,
nodeStorageClient.Nodes(),
podRegistry,
)
if c.SyncPodStatus {
go util.Forever(podCache.UpdateAllContainers, m.cacheTimeout)
go util.Forever(podCache.GarbageCollectPodStatus, time.Minute*30)
// Note the pod cache needs access to an un-decorated RESTStorage
podStorage = podStorage.WithPodStatus(podCache)
}
controllerStorage := controlleretcd.NewREST(c.EtcdHelper)

View File

@ -1,277 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
// PodCache contains both a cache of container information, as well as the mechanism for keeping
// that cache up to date.
type PodCache struct {
containerInfo client.PodInfoGetter
pods pod.Registry
// For confirming existance of a node
nodes client.NodeInterface
// lock protects access to all fields below
lock sync.Mutex
// cached pod statuses.
podStatus map[objKey]api.PodStatus
// nodes that we know exist. Cleared at the beginning of each
// UpdateAllPods call.
currentNodes map[objKey]api.NodeStatus
}
type objKey struct {
namespace, name string
}
// NewPodCache returns a new PodCache which watches container information
// registered in the given PodRegistry.
// TODO(lavalamp): pods should be a client.PodInterface.
func NewPodCache(info client.PodInfoGetter, nodes client.NodeInterface, pods pod.Registry) *PodCache {
return &PodCache{
containerInfo: info,
pods: pods,
nodes: nodes,
currentNodes: map[objKey]api.NodeStatus{},
podStatus: map[objKey]api.PodStatus{},
}
}
// GetPodStatus gets the stored pod status.
func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) {
status := p.getPodStatusInternal(namespace, name)
if status != nil {
return status, nil
}
return p.updateCacheAndReturn(namespace, name)
}
func (p *PodCache) updateCacheAndReturn(namespace, name string) (*api.PodStatus, error) {
pod, err := p.pods.GetPod(api.WithNamespace(api.NewContext(), namespace), name)
if err != nil {
return nil, err
}
if err := p.updatePodStatus(pod); err != nil {
return nil, err
}
status := p.getPodStatusInternal(namespace, name)
if status == nil {
glog.Warningf("nil status after successful update. that's odd... (%s %s)", namespace, name)
return nil, client.ErrPodInfoNotAvailable
}
return status, nil
}
func (p *PodCache) getPodStatusInternal(namespace, name string) *api.PodStatus {
p.lock.Lock()
defer p.lock.Unlock()
value, ok := p.podStatus[objKey{namespace, name}]
if !ok {
return nil
}
// Make a copy
return &value
}
func (p *PodCache) ClearPodStatus(namespace, name string) {
p.lock.Lock()
defer p.lock.Unlock()
delete(p.podStatus, objKey{namespace, name})
}
func (p *PodCache) getNodeStatusInCache(name string) (*api.NodeStatus, bool) {
p.lock.Lock()
defer p.lock.Unlock()
nodeStatus, cacheHit := p.currentNodes[objKey{"", name}]
return &nodeStatus, cacheHit
}
// lock must *not* be held
func (p *PodCache) getNodeStatus(name string) (*api.NodeStatus, error) {
nodeStatus, cacheHit := p.getNodeStatusInCache(name)
if cacheHit {
return nodeStatus, nil
}
// TODO: suppose there's N concurrent requests for node "foo"; in that case
// it might be useful to block all of them and only look up "foo" once.
// (This code will make up to N lookups.) One way of doing that would be to
// have a pool of M mutexes and require that before looking up "foo" you must
// lock mutex hash("foo") % M.
node, err := p.nodes.Get(name)
if err != nil {
glog.Errorf("Unexpected error verifying node existence: %+v", err)
return nil, err
}
p.lock.Lock()
defer p.lock.Unlock()
p.currentNodes[objKey{"", name}] = node.Status
return &node.Status, nil
}
func (p *PodCache) clearNodeStatus() {
p.lock.Lock()
defer p.lock.Unlock()
p.currentNodes = map[objKey]api.NodeStatus{}
}
func (p *PodCache) getHostAddress(addresses []api.NodeAddress) string {
addressMap := make(map[api.NodeAddressType][]api.NodeAddress)
for i := range addresses {
addressMap[addresses[i].Type] = append(addressMap[addresses[i].Type], addresses[i])
}
if addresses, ok := addressMap[api.NodeLegacyHostIP]; ok {
return addresses[0].Address
}
if addresses, ok := addressMap[api.NodeInternalIP]; ok {
return addresses[0].Address
}
if addresses, ok := addressMap[api.NodeExternalIP]; ok {
return addresses[0].Address
}
return ""
}
// TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an
// entire pod?
func (p *PodCache) updatePodStatus(pod *api.Pod) error {
newStatus, err := p.computePodStatus(pod)
p.lock.Lock()
defer p.lock.Unlock()
// Map accesses must be locked.
if err == nil {
p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus
}
return err
}
// computePodStatus always returns a new status, even if it also returns a non-nil error.
// TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an
// entire pod?
func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
newStatus := pod.Status
if pod.Status.Host == "" {
// Not assigned.
newStatus.Phase = api.PodPending
newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...)
return newStatus, nil
}
nodeStatus, err := p.getNodeStatus(pod.Status.Host)
// Assigned to non-existing node.
if err != nil || len(nodeStatus.Conditions) == 0 {
glog.V(5).Infof("node doesn't exist: %v %v, setting pod %q status to unknown", err, nodeStatus, pod.Name)
newStatus.Phase = api.PodUnknown
newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...)
return newStatus, nil
}
// Assigned to an unhealthy node.
for _, condition := range nodeStatus.Conditions {
if (condition.Type == api.NodeReady || condition.Type == api.NodeReachable) && condition.Status == api.ConditionFalse {
glog.V(5).Infof("node status: %v, setting pod %q status to unknown", condition, pod.Name)
newStatus.Phase = api.PodUnknown
newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...)
return newStatus, nil
}
}
result, err := p.containerInfo.GetPodStatus(pod.Status.Host, pod.Namespace, pod.Name)
if err != nil {
glog.V(5).Infof("error getting pod %s status: %v, retry later", pod.Name, err)
} else {
newStatus.HostIP = p.getHostAddress(nodeStatus.Addresses)
newStatus.Info = result.Status.Info
newStatus.PodIP = result.Status.PodIP
newStatus.Phase = result.Status.Phase
newStatus.Conditions = result.Status.Conditions
}
return newStatus, err
}
func (p *PodCache) GarbageCollectPodStatus() {
pods, err := p.pods.ListPods(api.NewContext(), labels.Everything())
if err != nil {
glog.Errorf("Error getting pod list: %v", err)
return
}
keys := map[objKey]bool{}
for _, pod := range pods.Items {
keys[objKey{pod.Namespace, pod.Name}] = true
}
p.lock.Lock()
defer p.lock.Unlock()
for key := range p.podStatus {
if _, found := keys[key]; !found {
glog.Infof("Deleting orphaned cache entry: %v", key)
delete(p.podStatus, key)
}
}
}
// UpdateAllContainers updates information about all containers.
// Callers should let one call to UpdateAllContainers finish before
// calling again, or risk having new info getting clobbered by delayed
// old info.
func (p *PodCache) UpdateAllContainers() {
// TODO: this is silly, we should pro-actively update the pod status when
// the API server makes changes.
p.clearNodeStatus()
ctx := api.NewContext()
pods, err := p.pods.ListPods(ctx, labels.Everything())
if err != nil {
glog.Errorf("Error getting pod list: %v", err)
return
}
// TODO: this algorithm is 1 goroutine & RPC per pod. With a little work,
// it should be possible to make it 1 per *node*, which will be important
// at very large scales. (To be clear, the goroutines shouldn't matter--
// it's the RPCs that need to be minimized.)
var wg sync.WaitGroup
for i := range pods.Items {
pod := &pods.Items[i]
wg.Add(1)
go func() {
defer util.HandleCrash()
defer wg.Done()
err := p.updatePodStatus(pod)
if err != nil && err != client.ErrPodInfoNotAvailable {
glog.Errorf("Error getting info for pod %v/%v: %v", pod.Namespace, pod.Name, err)
}
}()
}
wg.Wait()
}

View File

@ -1,525 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master
import (
"reflect"
"sync"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
type podInfoCall struct {
host string
namespace string
name string
}
type podInfoResponse struct {
useCount int
data api.PodStatusResult
err error
}
type podInfoCalls map[podInfoCall]*podInfoResponse
type FakePodInfoGetter struct {
calls podInfoCalls
lock sync.Mutex
// default data/error to return, or you can add
// responses to specific calls-- that will take precedence.
data api.PodStatusResult
err error
}
func (f *FakePodInfoGetter) GetPodStatus(host, namespace, name string) (api.PodStatusResult, error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.calls == nil {
f.calls = podInfoCalls{}
}
key := podInfoCall{host, namespace, name}
call, ok := f.calls[key]
if !ok {
f.calls[key] = &podInfoResponse{
0, f.data, f.err,
}
call = f.calls[key]
}
call.useCount++
return call.data, call.err
}
func TestPodCacheGetDifferentNamespace(t *testing.T) {
cache := NewPodCache(nil, nil, nil)
expectedDefault := api.PodStatus{
Info: api.PodInfo{
"foo": api.ContainerStatus{},
},
}
expectedOther := api.PodStatus{
Info: api.PodInfo{
"bar": api.ContainerStatus{},
},
}
cache.podStatus[objKey{api.NamespaceDefault, "foo"}] = expectedDefault
cache.podStatus[objKey{"other", "foo"}] = expectedOther
info, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("Unexpected error: %+v", err)
}
if !reflect.DeepEqual(info, &expectedDefault) {
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", &expectedOther, info)
}
info, err = cache.GetPodStatus("other", "foo")
if err != nil {
t.Errorf("Unexpected error: %+v", err)
}
if !reflect.DeepEqual(info, &expectedOther) {
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", &expectedOther, info)
}
}
func TestPodCacheGet(t *testing.T) {
cache := NewPodCache(nil, nil, nil)
expected := api.PodStatus{
Info: api.PodInfo{
"foo": api.ContainerStatus{},
},
}
cache.podStatus[objKey{api.NamespaceDefault, "foo"}] = expected
info, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("Unexpected error: %+v", err)
}
if !reflect.DeepEqual(info, &expected) {
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", &expected, info)
}
}
func TestPodCacheDelete(t *testing.T) {
config := podCacheTestConfig{
err: client.ErrPodInfoNotAvailable,
}
cache := config.Construct()
expected := api.PodStatus{
Info: api.PodInfo{
"foo": api.ContainerStatus{},
},
}
cache.podStatus[objKey{api.NamespaceDefault, "foo"}] = expected
info, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("Unexpected error: %+v", err)
}
if !reflect.DeepEqual(info, &expected) {
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", &expected, info)
}
cache.ClearPodStatus(api.NamespaceDefault, "foo")
_, err = cache.GetPodStatus(api.NamespaceDefault, "foo")
if err == nil {
t.Errorf("Unexpected non-error after deleting")
}
if err != client.ErrPodInfoNotAvailable {
t.Errorf("Unexpected error: %v, expecting: %v", err, client.ErrPodInfoNotAvailable)
}
}
func TestPodCacheGetMissing(t *testing.T) {
pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar")
config := podCacheTestConfig{
kubeletContainerInfo: api.PodStatus{
Info: api.PodInfo{"bar": api.ContainerStatus{}}},
nodes: []api.Node{*makeHealthyNode("machine", "1.2.3.5")},
pod: pod1,
}
cache := config.Construct()
status, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("Unexpected error: %+v", err)
}
if status == nil {
t.Errorf("Unexpected non-status.")
}
}
type podCacheTestConfig struct {
nodes []api.Node
pods []api.Pod
pod *api.Pod
err error
kubeletContainerInfo api.PodStatus
// Construct will fill in these fields
fakePodInfo *FakePodInfoGetter
fakeNodes *client.Fake
fakePods *registrytest.PodRegistry
}
func (c *podCacheTestConfig) Construct() *PodCache {
c.fakePodInfo = &FakePodInfoGetter{
data: api.PodStatusResult{
Status: c.kubeletContainerInfo,
},
}
c.fakeNodes = &client.Fake{
MinionsList: api.NodeList{
Items: c.nodes,
},
}
c.fakePods = registrytest.NewPodRegistry(&api.PodList{Items: c.pods})
c.fakePods.Pod = c.pod
c.fakePods.Err = c.err
return NewPodCache(
c.fakePodInfo,
c.fakeNodes.Nodes(),
c.fakePods,
)
}
func makePod(namespace, name, host string, containers ...string) *api.Pod {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: name},
Status: api.PodStatus{Host: host},
}
for _, c := range containers {
pod.Spec.Containers = append(pod.Spec.Containers, api.Container{Name: c})
}
return pod
}
func makeHealthyNode(name string, ip string) *api.Node {
return &api.Node{
ObjectMeta: api.ObjectMeta{Name: name},
Status: api.NodeStatus{
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip},
},
Conditions: []api.NodeCondition{
{Type: api.NodeReady, Status: api.ConditionTrue},
},
},
}
}
func makeUnhealthyNode(name string) *api.Node {
return &api.Node{
ObjectMeta: api.ObjectMeta{Name: name},
Status: api.NodeStatus{Conditions: []api.NodeCondition{
{Type: api.NodeReady, Status: api.ConditionFalse},
}},
}
}
func TestPodUpdateAllContainersClearsNodeStatus(t *testing.T) {
node := makeHealthyNode("machine", "1.2.3.5")
pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar")
pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux")
config := podCacheTestConfig{
kubeletContainerInfo: api.PodStatus{
Info: api.PodInfo{"bar": api.ContainerStatus{}}},
nodes: []api.Node{*node},
pods: []api.Pod{*pod1, *pod2},
}
cache := config.Construct()
if len(cache.currentNodes) != 0 {
t.Errorf("unexpected node cache: %v", cache.currentNodes)
}
key := objKey{"", "machine"}
cache.currentNodes[key] = makeUnhealthyNode("machine").Status
cache.UpdateAllContainers()
if len(cache.currentNodes) != 1 {
t.Errorf("unexpected empty node cache: %v", cache.currentNodes)
}
if !reflect.DeepEqual(cache.currentNodes[key], node.Status) {
t.Errorf("unexpected status:\n%#v\nexpected:\n%#v\n", cache.currentNodes[key], node.Status)
}
}
func TestPodUpdateAllContainers(t *testing.T) {
pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar")
pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux")
config := podCacheTestConfig{
kubeletContainerInfo: api.PodStatus{
Info: api.PodInfo{"bar": api.ContainerStatus{}}},
nodes: []api.Node{*makeHealthyNode("machine", "1.2.3.5")},
pods: []api.Pod{*pod1, *pod2},
}
cache := config.Construct()
cache.UpdateAllContainers()
call1 := config.fakePodInfo.calls[podInfoCall{"machine", api.NamespaceDefault, "foo"}]
call2 := config.fakePodInfo.calls[podInfoCall{"machine", api.NamespaceDefault, "baz"}]
if call1 == nil || call1.useCount != 1 {
t.Errorf("Expected 1 call for 'foo': %+v", config.fakePodInfo.calls)
}
if call2 == nil || call2.useCount != 1 {
t.Errorf("Expected 1 call for 'baz': %+v", config.fakePodInfo.calls)
}
if len(config.fakePodInfo.calls) != 2 {
t.Errorf("Expected 2 calls: %+v", config.fakePodInfo.calls)
}
status, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
if e, a := config.kubeletContainerInfo.Info, status.Info; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", e, a)
}
if e, a := "1.2.3.5", status.HostIP; e != a {
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", e, a)
}
if e, a := 1, len(config.fakeNodes.Actions); e != a {
t.Errorf("Expected: %v, Got: %v; %+v", e, a, config.fakeNodes.Actions)
} else {
if e, a := "get-minion", config.fakeNodes.Actions[0].Action; e != a {
t.Errorf("Expected: %v, Got: %v; %+v", e, a, config.fakeNodes.Actions)
}
}
}
func TestFillPodStatusNoHost(t *testing.T) {
pod := makePod(api.NamespaceDefault, "foo", "", "bar")
config := podCacheTestConfig{
kubeletContainerInfo: api.PodStatus{},
nodes: []api.Node{*makeHealthyNode("machine", "")},
pods: []api.Pod{*pod},
}
cache := config.Construct()
err := cache.updatePodStatus(&config.pods[0])
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
status, err := cache.GetPodStatus(pod.Namespace, pod.Name)
if e, a := api.PodPending, status.Phase; e != a {
t.Errorf("Expected: %+v, Got %+v", e, a)
}
}
func TestFillPodStatusMissingMachine(t *testing.T) {
pod := makePod(api.NamespaceDefault, "foo", "machine", "bar")
config := podCacheTestConfig{
kubeletContainerInfo: api.PodStatus{},
nodes: []api.Node{},
pods: []api.Pod{*pod},
}
cache := config.Construct()
err := cache.updatePodStatus(&config.pods[0])
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
status, err := cache.GetPodStatus(pod.Namespace, pod.Name)
if e, a := api.PodUnknown, status.Phase; e != a {
t.Errorf("Expected: %+v, Got %+v", e, a)
}
}
func TestFillPodInfoNoData(t *testing.T) {
pod := makePod(api.NamespaceDefault, "foo", "machine", "bar")
expectedIP := ""
config := podCacheTestConfig{
kubeletContainerInfo: api.PodStatus{
Phase: api.PodPending,
Host: "machine",
HostIP: "ip of machine",
Info: api.PodInfo{
leaky.PodInfraContainerName: {},
},
},
nodes: []api.Node{*makeHealthyNode("machine", "ip of machine")},
pods: []api.Pod{*pod},
}
cache := config.Construct()
err := cache.updatePodStatus(&config.pods[0])
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
status, err := cache.GetPodStatus(pod.Namespace, pod.Name)
if e, a := &config.kubeletContainerInfo, status; !reflect.DeepEqual(e, a) {
t.Errorf("Expected: %+v, Got %+v", e, a)
}
if status.PodIP != expectedIP {
t.Errorf("Expected %s, Got %s", expectedIP, status.PodIP)
}
}
func TestPodPhaseWithBadNode(t *testing.T) {
desiredState := api.PodSpec{
Containers: []api.Container{
{Name: "containerA"},
{Name: "containerB"},
},
RestartPolicy: api.RestartPolicyAlways,
}
runningState := api.ContainerStatus{
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
stoppedState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{},
},
}
tests := []struct {
pod *api.Pod
nodes []api.Node
status api.PodPhase
test string
}{
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Host: "machine-two",
},
},
[]api.Node{},
api.PodUnknown,
"no info, but bad machine",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": runningState,
},
Host: "machine-two",
},
},
[]api.Node{},
api.PodUnknown,
"all running but minion is missing",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": stoppedState,
"containerB": stoppedState,
},
Host: "machine-two",
},
},
[]api.Node{},
api.PodUnknown,
"all stopped but minion missing",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": runningState,
},
Host: "machine-two",
},
},
[]api.Node{*makeUnhealthyNode("machine-two")},
api.PodUnknown,
"all running but minion is unhealthy",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": stoppedState,
"containerB": stoppedState,
},
Host: "machine-two",
},
},
[]api.Node{*makeUnhealthyNode("machine-two")},
api.PodUnknown,
"all stopped but minion is unhealthy",
},
}
for _, test := range tests {
config := podCacheTestConfig{
kubeletContainerInfo: test.pod.Status,
nodes: test.nodes,
pods: []api.Pod{*test.pod},
}
cache := config.Construct()
cache.UpdateAllContainers()
status, err := cache.GetPodStatus(test.pod.Namespace, test.pod.Name)
if err != nil {
t.Errorf("%v: Unexpected error %v", test.test, err)
continue
}
if e, a := test.status, status.Phase; e != a {
t.Errorf("In test %s, expected %v, got %v", test.test, e, a)
}
}
}
func TestGarbageCollection(t *testing.T) {
pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar")
pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux")
config := podCacheTestConfig{
pods: []api.Pod{*pod1, *pod2},
}
cache := config.Construct()
expected := api.PodStatus{
Info: api.PodInfo{
"extra": api.ContainerStatus{},
},
}
cache.podStatus[objKey{api.NamespaceDefault, "extra"}] = expected
cache.GarbageCollectPodStatus()
status, found := cache.podStatus[objKey{api.NamespaceDefault, "extra"}]
if found {
t.Errorf("unexpectedly found: %v for key %v", status, objKey{api.NamespaceDefault, "extra"})
}
}

View File

@ -1,100 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master
import (
"errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// RESTStorageToNodes will take a RESTStorage object and return a client interface
// which will work for any use expecting a client.Nodes() interface. The advantage
// of doing this in server code is that it doesn't require an actual trip through
// the network.
//
// TODO: considering that the only difference between the various client types
// and RESTStorage type is the type of the arguments, maybe use "go generate" to
// write a specialized adaptor for every client type?
//
// TODO: this also means that pod and node API endpoints have to be colocated in the same
// process
func RESTStorageToNodes(storage rest.Storage) client.NodesInterface {
return &nodeAdaptor{storage}
}
type nodeAdaptor struct {
storage rest.Storage
}
func (n *nodeAdaptor) Nodes() client.NodeInterface {
return n
}
// Create creates a new node.
func (n *nodeAdaptor) Create(minion *api.Node) (*api.Node, error) {
return nil, errors.New("direct creation not implemented")
// TODO: apiserver should expose newOperation to make this easier.
// the actual code that should go here would start like this:
//
// ctx := api.NewDefaultContext()
// out, err := n.storage.Create(ctx, minion)
// if err != nil {
// return nil, err
// }
}
// List lists all the nodes in the cluster.
func (n *nodeAdaptor) List() (*api.NodeList, error) {
ctx := api.NewContext()
obj, err := n.storage.(rest.Lister).List(ctx, labels.Everything(), fields.Everything())
if err != nil {
return nil, err
}
return obj.(*api.NodeList), nil
}
// Get gets an existing node.
func (n *nodeAdaptor) Get(name string) (*api.Node, error) {
ctx := api.NewContext()
obj, err := n.storage.(rest.Getter).Get(ctx, name)
if err != nil {
return nil, err
}
return obj.(*api.Node), nil
}
// Delete deletes an existing node.
// TODO: implement
func (n *nodeAdaptor) Delete(name string) error {
return errors.New("direct deletion not implemented")
}
// Update updates an existing node.
func (n *nodeAdaptor) Update(minion *api.Node) (*api.Node, error) {
return nil, errors.New("direct update not implemented")
}
// Watch watches for nodes.
func (n *nodeAdaptor) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
return nil, errors.New("direct watch not implemented")
}

View File

@ -85,15 +85,6 @@ func (r *REST) ResourceLocation(ctx api.Context, name string) (*url.URL, http.Ro
return pod.ResourceLocation(r, ctx, name)
}
// WithPodStatus returns a rest object that decorates returned responses with extra
// status information.
func (r *REST) WithPodStatus(cache pod.PodStatusGetter) *REST {
store := *r
store.Decorator = pod.PodStatusDecorator(cache)
store.AfterDelete = rest.AllFuncs(store.AfterDelete, pod.PodStatusReset(cache))
return &store
}
// BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
type BindingREST struct {
store *etcdgeneric.Etcd

View File

@ -28,7 +28,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
@ -39,27 +38,6 @@ import (
"github.com/coreos/go-etcd/etcd"
)
type fakeCache struct {
requestedNamespace string
requestedName string
clearedNamespace string
clearedName string
statusToReturn *api.PodStatus
errorToReturn error
}
func (f *fakeCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) {
f.requestedNamespace = namespace
f.requestedName = name
return f.statusToReturn, f.errorToReturn
}
func (f *fakeCache) ClearPodStatus(namespace, name string) {
f.clearedNamespace = namespace
f.clearedName = name
}
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient := tools.NewFakeEtcdClient(t)
fakeEtcdClient.TestIndex = true
@ -70,7 +48,6 @@ func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient, h := newHelper(t)
storage, bindingStorage, statusStorage := NewStorage(h)
storage = storage.WithPodStatus(&fakeCache{statusToReturn: &api.PodStatus{}})
return storage, bindingStorage, statusStorage, fakeEtcdClient, h
}
@ -113,8 +90,6 @@ func TestStorage(t *testing.T) {
func TestCreate(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{}}
storage = storage.WithPodStatus(cache)
test := resttest.New(t, storage, fakeEtcdClient.SetError)
pod := validNewPod()
pod.ObjectMeta = api.ObjectMeta{}
@ -133,8 +108,6 @@ func TestCreate(t *testing.T) {
func TestDelete(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{}}
storage = storage.WithPodStatus(cache)
test := resttest.New(t, storage, fakeEtcdClient.SetError)
createFn := func() runtime.Object {
@ -182,8 +155,6 @@ func TestCreateRegistryError(t *testing.T) {
func TestCreateSetsFields(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{}}
storage = storage.WithPodStatus(cache)
pod := validNewPod()
_, err := storage.Create(api.NewDefaultContext(), pod)
if err != fakeEtcdClient.Err {
@ -206,8 +177,6 @@ func TestListError(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Err = fmt.Errorf("test error")
storage, _, _ := NewStorage(helper)
cache := &fakeCache{}
storage = storage.WithPodStatus(cache)
pods, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything())
if err != fakeEtcdClient.Err {
t.Fatalf("Expected %#v, Got %#v", fakeEtcdClient.Err, err)
@ -217,39 +186,6 @@ func TestListError(t *testing.T) {
}
}
func TestListCacheError(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Data["/registry/pods/default"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"},
}),
},
},
},
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable}
storage = storage.WithPodStatus(cache)
pods, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything())
if err != nil {
t.Fatalf("Expected no error, got %#v", err)
}
pl := pods.(*api.PodList)
if len(pl.Items) != 1 {
t.Fatalf("Unexpected 0-len pod list: %+v", pl)
}
if e, a := api.PodUnknown, pl.Items[0].Status.Phase; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestListEmptyPodList(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.ChangeIndex = 1
@ -259,8 +195,6 @@ func TestListEmptyPodList(t *testing.T) {
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{}
storage = storage.WithPodStatus(cache)
pods, err := storage.List(api.NewContext(), labels.Everything(), fields.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -283,7 +217,7 @@ func TestListPodList(t *testing.T) {
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"},
Status: api.PodStatus{Phase: api.PodRunning, Host: "machine"},
}),
},
{
@ -297,8 +231,6 @@ func TestListPodList(t *testing.T) {
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}
storage = storage.WithPodStatus(cache)
podsObj, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything())
pods := podsObj.(*api.PodList)
@ -348,8 +280,6 @@ func TestListPodListSelection(t *testing.T) {
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}
storage = storage.WithPodStatus(cache)
ctx := api.NewDefaultContext()
@ -433,6 +363,7 @@ func TestPodDecode(t *testing.T) {
func TestGet(t *testing.T) {
expect := validNewPod()
expect.Status.Phase = api.PodRunning
expect.Status.Host = "machine"
fakeEtcdClient, helper := newHelper(t)
@ -444,8 +375,6 @@ func TestGet(t *testing.T) {
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}
storage = storage.WithPodStatus(cache)
obj, err := storage.Get(api.WithNamespace(api.NewContext(), "test"), "foo")
pod := obj.(*api.Pod)
@ -453,47 +382,16 @@ func TestGet(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
expect.Status.Phase = api.PodRunning
if e, a := expect, pod; !api.Semantic.DeepEqual(e, a) {
t.Errorf("Unexpected pod: %s", util.ObjectDiff(e, a))
}
}
func TestGetCacheError(t *testing.T) {
expect := validNewPod()
expect.Status.Host = "machine"
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Data["/registry/pods/default/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(latest.Codec, expect),
},
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable}
storage = storage.WithPodStatus(cache)
obj, err := storage.Get(api.NewDefaultContext(), "foo")
pod := obj.(*api.Pod)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expect.Status.Phase = api.PodUnknown
if e, a := expect, pod; !api.Semantic.DeepEqual(e, a) {
t.Errorf("unexpected object: %s", util.ObjectDiff(e, a))
}
}
// TODO: remove, this is covered by RESTTest.TestCreate
func TestPodStorageValidatesCreate(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Err = fmt.Errorf("test error")
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{}}
storage = storage.WithPodStatus(cache)
pod := validNewPod()
pod.Labels = map[string]string{
@ -512,8 +410,6 @@ func TestPodStorageValidatesCreate(t *testing.T) {
func TestCreatePod(t *testing.T) {
_, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{}}
storage = storage.WithPodStatus(cache)
pod := validNewPod()
obj, err := storage.Create(api.NewDefaultContext(), pod)
@ -536,8 +432,6 @@ func TestCreatePod(t *testing.T) {
func TestCreateWithConflictingNamespace(t *testing.T) {
_, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
cache := &fakeCache{}
storage = storage.WithPodStatus(cache)
pod := validNewPod()
pod.Namespace = "not-default"
@ -567,8 +461,6 @@ func TestUpdateWithConflictingNamespace(t *testing.T) {
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{}
storage = storage.WithPodStatus(cache)
pod := validChangedPod()
pod.Namespace = "not-default"
@ -594,6 +486,7 @@ func TestResourceLocation(t *testing.T) {
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo",
location: expectedIP,
@ -601,6 +494,7 @@ func TestResourceLocation(t *testing.T) {
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo:12345",
location: expectedIP + ":12345",
@ -613,6 +507,7 @@ func TestResourceLocation(t *testing.T) {
{Name: "ctr"},
},
},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo",
location: expectedIP,
@ -625,6 +520,7 @@ func TestResourceLocation(t *testing.T) {
{Name: "ctr", Ports: []api.ContainerPort{{ContainerPort: 9376}}},
},
},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo",
location: expectedIP + ":9376",
@ -637,6 +533,7 @@ func TestResourceLocation(t *testing.T) {
{Name: "ctr", Ports: []api.ContainerPort{{ContainerPort: 9376}}},
},
},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo:12345",
location: expectedIP + ":12345",
@ -650,6 +547,7 @@ func TestResourceLocation(t *testing.T) {
{Name: "ctr2", Ports: []api.ContainerPort{{ContainerPort: 9376}}},
},
},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo",
location: expectedIP + ":9376",
@ -663,6 +561,7 @@ func TestResourceLocation(t *testing.T) {
{Name: "ctr2", Ports: []api.ContainerPort{{ContainerPort: 1234}}},
},
},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo",
location: expectedIP + ":9376",
@ -679,8 +578,6 @@ func TestResourceLocation(t *testing.T) {
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{PodIP: expectedIP}}
storage = storage.WithPodStatus(cache)
redirector := rest.Redirector(storage)
location, _, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query)
@ -719,16 +616,11 @@ func TestDeletePod(t *testing.T) {
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{}}
storage = storage.WithPodStatus(cache)
result, err := storage.Delete(api.NewDefaultContext(), "foo", nil)
_, err := storage.Delete(api.NewDefaultContext(), "foo", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if cache.clearedNamespace != "default" || cache.clearedName != "foo" {
t.Fatalf("Unexpected cache delete: %s %s %#v", cache.clearedName, cache.clearedNamespace, result)
}
}
// TestEtcdGetDifferentNamespace ensures same-name pods in different namespaces do not clash

View File

@ -25,7 +25,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
@ -92,40 +91,6 @@ func (podStatusStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.Val
return validation.ValidatePodStatusUpdate(obj.(*api.Pod), old.(*api.Pod))
}
// PodStatusGetter is an interface used by Pods to fetch and retrieve status info.
type PodStatusGetter interface {
GetPodStatus(namespace, name string) (*api.PodStatus, error)
ClearPodStatus(namespace, name string)
}
// PodStatusDecorator returns a function that updates pod.Status based
// on the provided pod cache.
func PodStatusDecorator(cache PodStatusGetter) rest.ObjectFunc {
return func(obj runtime.Object) error {
pod := obj.(*api.Pod)
host := pod.Status.Host
if status, err := cache.GetPodStatus(pod.Namespace, pod.Name); err != nil {
pod.Status = api.PodStatus{
Phase: api.PodUnknown,
}
} else {
pod.Status = *status
}
pod.Status.Host = host
return nil
}
}
// PodStatusReset returns a function that clears the pod cache when the object
// is deleted.
func PodStatusReset(cache PodStatusGetter) rest.ObjectFunc {
return func(obj runtime.Object) error {
pod := obj.(*api.Pod)
cache.ClearPodStatus(pod.Namespace, pod.Name)
return nil
}
}
// MatchPod returns a generic matcher for a given label and field selector.
func MatchPod(label labels.Selector, field fields.Selector) generic.Matcher {
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {

View File

@ -1,78 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pod
import (
"fmt"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
type fakeCache struct {
requestedNamespace string
requestedName string
clearedNamespace string
clearedName string
statusToReturn *api.PodStatus
errorToReturn error
}
func (f *fakeCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) {
f.requestedNamespace = namespace
f.requestedName = name
return f.statusToReturn, f.errorToReturn
}
func (f *fakeCache) ClearPodStatus(namespace, name string) {
f.clearedNamespace = namespace
f.clearedName = name
}
func TestPodStatusDecorator(t *testing.T) {
cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}
pod := &api.Pod{}
if err := PodStatusDecorator(cache)(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if pod.Status.Phase != api.PodRunning {
t.Errorf("unexpected pod: %#v", pod)
}
pod = &api.Pod{
Status: api.PodStatus{
Host: "foo",
},
}
if err := PodStatusDecorator(cache)(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if pod.Status.Phase != api.PodRunning || pod.Status.Host != "foo" {
t.Errorf("unexpected pod: %#v", pod)
}
}
func TestPodStatusDecoratorError(t *testing.T) {
cache := &fakeCache{errorToReturn: fmt.Errorf("test error")}
pod := &api.Pod{}
if err := PodStatusDecorator(cache)(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if pod.Status.Phase != api.PodUnknown {
t.Errorf("unexpected pod: %#v", pod)
}
}