mirror of https://github.com/k3s-io/k3s
Pod cache needs to be namespace-aware
parent
996c0e44d6
commit
9e60bf1e43
|
@ -60,7 +60,7 @@ var (
|
|||
|
||||
type fakePodInfoGetter struct{}
|
||||
|
||||
func (fakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
|
||||
func (fakePodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
|
||||
// This is a horrible hack to get around the fact that we can't provide
|
||||
// different port numbers per kubelet...
|
||||
var c client.PodInfoGetter
|
||||
|
@ -76,9 +76,9 @@ func (fakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
|
|||
Port: 10251,
|
||||
}
|
||||
default:
|
||||
glog.Fatalf("Can't get info for: '%v', '%v'", host, podID)
|
||||
glog.Fatalf("Can't get info for: '%v', '%v'", host, podNamespace, podID)
|
||||
}
|
||||
return c.GetPodInfo("localhost", podID)
|
||||
return c.GetPodInfo("localhost", podNamespace, podID)
|
||||
}
|
||||
|
||||
type delegateHandler struct {
|
||||
|
@ -182,11 +182,11 @@ func podsOnMinions(c *client.Client, pods api.PodList) wait.ConditionFunc {
|
|||
podInfo := fakePodInfoGetter{}
|
||||
return func() (bool, error) {
|
||||
for i := range pods.Items {
|
||||
host, id := pods.Items[i].CurrentState.Host, pods.Items[i].ID
|
||||
host, id, namespace := pods.Items[i].CurrentState.Host, pods.Items[i].ID, pods.Items[i].Namespace
|
||||
if len(host) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
if _, err := podInfo.GetPodInfo(host, id); err != nil {
|
||||
if _, err := podInfo.GetPodInfo(host, namespace, id); err != nil {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ var ErrPodInfoNotAvailable = errors.New("no pod info available")
|
|||
type PodInfoGetter interface {
|
||||
// GetPodInfo returns information about all containers which are part
|
||||
// Returns an api.PodInfo, or an error if one occurs.
|
||||
GetPodInfo(host, podID string) (api.PodInfo, error)
|
||||
GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error)
|
||||
}
|
||||
|
||||
// HTTPPodInfoGetter is the default implementation of PodInfoGetter, accesses the kubelet over HTTP.
|
||||
|
@ -46,13 +46,14 @@ type HTTPPodInfoGetter struct {
|
|||
}
|
||||
|
||||
// GetPodInfo gets information about the specified pod.
|
||||
func (c *HTTPPodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
|
||||
func (c *HTTPPodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
|
||||
request, err := http.NewRequest(
|
||||
"GET",
|
||||
fmt.Sprintf(
|
||||
"http://%s/podInfo?podID=%s",
|
||||
"http://%s/podInfo?podID=%s&podNamespace=%s",
|
||||
net.JoinHostPort(host, strconv.FormatUint(uint64(c.Port), 10)),
|
||||
podID),
|
||||
podID,
|
||||
podNamespace),
|
||||
nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -85,6 +86,6 @@ type FakePodInfoGetter struct {
|
|||
}
|
||||
|
||||
// GetPodInfo is a fake implementation of PodInfoGetter.GetPodInfo.
|
||||
func (c *FakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
|
||||
func (c *FakePodInfoGetter) GetPodInfo(host, podNamespace string, podID string) (api.PodInfo, error) {
|
||||
return c.data, c.err
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ func TestHTTPPodInfoGetter(t *testing.T) {
|
|||
Client: http.DefaultClient,
|
||||
Port: uint(port),
|
||||
}
|
||||
gotObj, err := podInfoGetter.GetPodInfo(parts[0], "foo")
|
||||
gotObj, err := podInfoGetter.GetPodInfo(parts[0], api.NamespaceDefault, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -102,7 +102,7 @@ func TestHTTPPodInfoGetterNotFound(t *testing.T) {
|
|||
Client: http.DefaultClient,
|
||||
Port: uint(port),
|
||||
}
|
||||
_, err = podInfoGetter.GetPodInfo(parts[0], "foo")
|
||||
_, err = podInfoGetter.GetPodInfo(parts[0], api.NamespaceDefault, "foo")
|
||||
if err != ErrPodInfoNotAvailable {
|
||||
t.Errorf("Expected %#v, Got %#v", ErrPodInfoNotAvailable, err)
|
||||
}
|
||||
|
|
|
@ -46,33 +46,38 @@ func NewPodCache(info client.PodInfoGetter, pods pod.Registry) *PodCache {
|
|||
}
|
||||
}
|
||||
|
||||
// makePodCacheKey constructs a key for use in a map to address a pod with specified namespace and id
|
||||
func makePodCacheKey(podNamespace, podID string) string {
|
||||
return podNamespace + "." + podID
|
||||
}
|
||||
|
||||
// GetPodInfo implements the PodInfoGetter.GetPodInfo.
|
||||
// The returned value should be treated as read-only.
|
||||
// TODO: Remove the host from this call, it's totally unnecessary.
|
||||
func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) {
|
||||
func (p *PodCache) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
|
||||
p.podLock.Lock()
|
||||
defer p.podLock.Unlock()
|
||||
value, ok := p.podInfo[podID]
|
||||
value, ok := p.podInfo[makePodCacheKey(podNamespace, podID)]
|
||||
if !ok {
|
||||
return nil, client.ErrPodInfoNotAvailable
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
func (p *PodCache) updatePodInfo(host, id string) error {
|
||||
info, err := p.containerInfo.GetPodInfo(host, id)
|
||||
func (p *PodCache) updatePodInfo(host, podNamespace, podID string) error {
|
||||
info, err := p.containerInfo.GetPodInfo(host, podNamespace, podID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.podLock.Lock()
|
||||
defer p.podLock.Unlock()
|
||||
p.podInfo[id] = info
|
||||
p.podInfo[makePodCacheKey(podNamespace, podID)] = info
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateAllContainers updates information about all containers. Either called by Loop() below, or one-off.
|
||||
func (p *PodCache) UpdateAllContainers() {
|
||||
var ctx api.Context
|
||||
ctx := api.NewContext()
|
||||
pods, err := p.pods.ListPods(ctx, labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("Error synchronizing container list: %v", err)
|
||||
|
@ -82,7 +87,7 @@ func (p *PodCache) UpdateAllContainers() {
|
|||
if pod.CurrentState.Host == "" {
|
||||
continue
|
||||
}
|
||||
err := p.updatePodInfo(pod.CurrentState.Host, pod.ID)
|
||||
err := p.updatePodInfo(pod.CurrentState.Host, pod.Namespace, pod.ID)
|
||||
if err != nil && err != client.ErrPodInfoNotAvailable {
|
||||
glog.Errorf("Error synchronizing container: %v", err)
|
||||
}
|
||||
|
|
|
@ -25,27 +25,59 @@ import (
|
|||
)
|
||||
|
||||
type FakePodInfoGetter struct {
|
||||
host string
|
||||
id string
|
||||
data api.PodInfo
|
||||
err error
|
||||
host string
|
||||
id string
|
||||
namespace string
|
||||
data api.PodInfo
|
||||
err error
|
||||
}
|
||||
|
||||
func (f *FakePodInfoGetter) GetPodInfo(host, id string) (api.PodInfo, error) {
|
||||
func (f *FakePodInfoGetter) GetPodInfo(host, namespace, id string) (api.PodInfo, error) {
|
||||
f.host = host
|
||||
f.id = id
|
||||
f.namespace = namespace
|
||||
return f.data, f.err
|
||||
}
|
||||
|
||||
func TestPodCacheGetDifferentNamespace(t *testing.T) {
|
||||
cache := NewPodCache(nil, nil)
|
||||
|
||||
expectedDefault := api.PodInfo{
|
||||
"foo": api.ContainerStatus{},
|
||||
}
|
||||
expectedOther := api.PodInfo{
|
||||
"bar": api.ContainerStatus{},
|
||||
}
|
||||
|
||||
cache.podInfo[makePodCacheKey(api.NamespaceDefault, "foo")] = expectedDefault
|
||||
cache.podInfo[makePodCacheKey("other", "foo")] = expectedOther
|
||||
|
||||
info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %#v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(info, expectedDefault) {
|
||||
t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expectedOther, info)
|
||||
}
|
||||
|
||||
info, err = cache.GetPodInfo("host", "other", "foo")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %#v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(info, expectedOther) {
|
||||
t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expectedOther, info)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPodCacheGet(t *testing.T) {
|
||||
cache := NewPodCache(nil, nil)
|
||||
|
||||
expected := api.PodInfo{
|
||||
"foo": api.ContainerStatus{},
|
||||
}
|
||||
cache.podInfo["foo"] = expected
|
||||
cache.podInfo[makePodCacheKey(api.NamespaceDefault, "foo")] = expected
|
||||
|
||||
info, err := cache.GetPodInfo("host", "foo")
|
||||
info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %#v", err)
|
||||
}
|
||||
|
@ -57,7 +89,7 @@ func TestPodCacheGet(t *testing.T) {
|
|||
func TestPodCacheGetMissing(t *testing.T) {
|
||||
cache := NewPodCache(nil, nil)
|
||||
|
||||
info, err := cache.GetPodInfo("host", "foo")
|
||||
info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo")
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected non-error: %#v", err)
|
||||
}
|
||||
|
@ -75,13 +107,13 @@ func TestPodGetPodInfoGetter(t *testing.T) {
|
|||
}
|
||||
cache := NewPodCache(&fake, nil)
|
||||
|
||||
cache.updatePodInfo("host", "foo")
|
||||
cache.updatePodInfo("host", api.NamespaceDefault, "foo")
|
||||
|
||||
if fake.host != "host" || fake.id != "foo" {
|
||||
if fake.host != "host" || fake.id != "foo" || fake.namespace != api.NamespaceDefault {
|
||||
t.Errorf("Unexpected access: %#v", fake)
|
||||
}
|
||||
|
||||
info, err := cache.GetPodInfo("host", "foo")
|
||||
info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %#v", err)
|
||||
}
|
||||
|
@ -92,7 +124,7 @@ func TestPodGetPodInfoGetter(t *testing.T) {
|
|||
|
||||
func TestPodUpdateAllContainers(t *testing.T) {
|
||||
pod := api.Pod{
|
||||
TypeMeta: api.TypeMeta{ID: "foo"},
|
||||
TypeMeta: api.TypeMeta{ID: "foo", Namespace: api.NamespaceDefault},
|
||||
CurrentState: api.PodState{
|
||||
Host: "machine",
|
||||
},
|
||||
|
@ -111,11 +143,11 @@ func TestPodUpdateAllContainers(t *testing.T) {
|
|||
|
||||
cache.UpdateAllContainers()
|
||||
|
||||
if fake.host != "machine" || fake.id != "foo" {
|
||||
if fake.host != "machine" || fake.id != "foo" || fake.namespace != api.NamespaceDefault {
|
||||
t.Errorf("Unexpected access: %#v", fake)
|
||||
}
|
||||
|
||||
info, err := cache.GetPodInfo("machine", "foo")
|
||||
info, err := cache.GetPodInfo("machine", api.NamespaceDefault, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %#v", err)
|
||||
}
|
||||
|
|
|
@ -208,13 +208,13 @@ func (rs *REST) fillPodInfo(pod *api.Pod) {
|
|||
// Get cached info for the list currently.
|
||||
// TODO: Optionally use fresh info
|
||||
if rs.podCache != nil {
|
||||
info, err := rs.podCache.GetPodInfo(pod.CurrentState.Host, pod.ID)
|
||||
info, err := rs.podCache.GetPodInfo(pod.CurrentState.Host, pod.Namespace, pod.ID)
|
||||
if err != nil {
|
||||
if err != client.ErrPodInfoNotAvailable {
|
||||
glog.Errorf("Error getting container info from cache: %#v", err)
|
||||
}
|
||||
if rs.podInfoGetter != nil {
|
||||
info, err = rs.podInfoGetter.GetPodInfo(pod.CurrentState.Host, pod.ID)
|
||||
info, err = rs.podInfoGetter.GetPodInfo(pod.CurrentState.Host, pod.Namespace, pod.ID)
|
||||
}
|
||||
if err != nil {
|
||||
if err != client.ErrPodInfoNotAvailable {
|
||||
|
|
|
@ -597,7 +597,7 @@ type FakePodInfoGetter struct {
|
|||
err error
|
||||
}
|
||||
|
||||
func (f *FakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
|
||||
func (f *FakePodInfoGetter) GetPodInfo(host, podNamespace string, podID string) (api.PodInfo, error) {
|
||||
return f.info, f.err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue