mirror of https://github.com/k3s-io/k3s
Use uid in config.go instead of pod full name.
parent
948e3754f8
commit
fbc320af28
|
@ -22,6 +22,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
@ -117,8 +118,8 @@ func (c *PodConfig) Sync() {
|
|||
// available, then this object should be considered authoritative.
|
||||
type podStorage struct {
|
||||
podLock sync.RWMutex
|
||||
// map of source name to pod name to pod reference
|
||||
pods map[string]map[string]*v1.Pod
|
||||
// map of source name to pod uid to pod reference
|
||||
pods map[string]map[types.UID]*v1.Pod
|
||||
mode PodConfigNotificationMode
|
||||
|
||||
// ensures that updates are delivered in strict order
|
||||
|
@ -139,7 +140,7 @@ type podStorage struct {
|
|||
// TODO: allow initialization of the current state of the store with snapshotted version.
|
||||
func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
|
||||
return &podStorage{
|
||||
pods: make(map[string]map[string]*v1.Pod),
|
||||
pods: make(map[string]map[types.UID]*v1.Pod),
|
||||
mode: mode,
|
||||
updates: updates,
|
||||
sourcesSeen: sets.String{},
|
||||
|
@ -221,23 +222,22 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|||
|
||||
pods := s.pods[source]
|
||||
if pods == nil {
|
||||
pods = make(map[string]*v1.Pod)
|
||||
pods = make(map[types.UID]*v1.Pod)
|
||||
}
|
||||
|
||||
// updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
|
||||
// After updated, new pod will be stored in the pod cache *pods*.
|
||||
// Notice that *pods* and *oldPods* could be the same cache.
|
||||
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[string]*v1.Pod) {
|
||||
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
|
||||
filtered := filterInvalidPods(newPods, source, s.recorder)
|
||||
for _, ref := range filtered {
|
||||
name := kubecontainer.GetPodFullName(ref)
|
||||
// Annotate the pod with the source before any comparison.
|
||||
if ref.Annotations == nil {
|
||||
ref.Annotations = make(map[string]string)
|
||||
}
|
||||
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
|
||||
if existing, found := oldPods[name]; found {
|
||||
pods[name] = existing
|
||||
if existing, found := oldPods[ref.UID]; found {
|
||||
pods[ref.UID] = existing
|
||||
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
|
||||
if needUpdate {
|
||||
updatePods = append(updatePods, existing)
|
||||
|
@ -249,7 +249,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|||
continue
|
||||
}
|
||||
recordFirstSeenTime(ref)
|
||||
pods[name] = ref
|
||||
pods[ref.UID] = ref
|
||||
addPods = append(addPods, ref)
|
||||
}
|
||||
}
|
||||
|
@ -280,10 +280,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|||
case kubetypes.REMOVE:
|
||||
glog.V(4).Infof("Removing pods from source %s : %v", source, update.Pods)
|
||||
for _, value := range update.Pods {
|
||||
name := kubecontainer.GetPodFullName(value)
|
||||
if existing, found := pods[name]; found {
|
||||
if existing, found := pods[value.UID]; found {
|
||||
// this is a delete
|
||||
delete(pods, name)
|
||||
delete(pods, value.UID)
|
||||
removePods = append(removePods, existing)
|
||||
continue
|
||||
}
|
||||
|
@ -295,10 +294,10 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|||
s.markSourceSet(source)
|
||||
// Clear the old map entries by just creating a new map
|
||||
oldPods := pods
|
||||
pods = make(map[string]*v1.Pod)
|
||||
pods = make(map[types.UID]*v1.Pod)
|
||||
updatePodsFunc(update.Pods, oldPods, pods)
|
||||
for name, existing := range oldPods {
|
||||
if _, found := pods[name]; !found {
|
||||
for uid, existing := range oldPods {
|
||||
if _, found := pods[uid]; !found {
|
||||
// this is a delete
|
||||
removePods = append(removePods, existing)
|
||||
}
|
||||
|
@ -339,9 +338,8 @@ func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecor
|
|||
// TODO: remove the conversion when validation is performed on versioned objects.
|
||||
internalPod := &api.Pod{}
|
||||
if err := v1.Convert_v1_Pod_To_api_Pod(pod, internalPod, nil); err != nil {
|
||||
name := kubecontainer.GetPodFullName(pod)
|
||||
glog.Warningf("Pod[%d] (%s) from %s failed to convert to v1, ignoring: %v", i+1, name, source, err)
|
||||
recorder.Eventf(pod, v1.EventTypeWarning, "FailedConversion", "Error converting pod %s from %s, ignoring: %v", name, source, err)
|
||||
glog.Warningf("Pod[%d] (%s) from %s failed to convert to v1, ignoring: %v", i+1, format.Pod(pod), source, err)
|
||||
recorder.Eventf(pod, v1.EventTypeWarning, "FailedConversion", "Error converting pod %s from %s, ignoring: %v", format.Pod(pod), source, err)
|
||||
continue
|
||||
}
|
||||
if errs := validation.ValidatePod(internalPod); len(errs) != 0 {
|
||||
|
@ -359,10 +357,9 @@ func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecor
|
|||
}
|
||||
}
|
||||
if len(errlist) > 0 {
|
||||
name := bestPodIdentString(pod)
|
||||
err := errlist.ToAggregate()
|
||||
glog.Warningf("Pod[%d] (%s) from %s failed validation, ignoring: %v", i+1, name, source, err)
|
||||
recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s, ignoring: %v", name, source, err)
|
||||
glog.Warningf("Pod[%d] (%s) from %s failed validation, ignoring: %v", i+1, format.Pod(pod), source, err)
|
||||
recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s, ignoring: %v", format.Pod(pod), source, err)
|
||||
continue
|
||||
}
|
||||
filtered = append(filtered, pod)
|
||||
|
@ -515,18 +512,6 @@ func (s *podStorage) MergedState() interface{} {
|
|||
return pods
|
||||
}
|
||||
|
||||
func bestPodIdentString(pod *v1.Pod) string {
|
||||
namespace := pod.Namespace
|
||||
if namespace == "" {
|
||||
namespace = "<empty-namespace>"
|
||||
}
|
||||
name := pod.Name
|
||||
if name == "" {
|
||||
name = "<empty-name>"
|
||||
}
|
||||
return fmt.Sprintf("%s.%s", name, namespace)
|
||||
}
|
||||
|
||||
func copyPods(sourcePods []*v1.Pod) []*v1.Pod {
|
||||
pods := []*v1.Pod{}
|
||||
for _, source := range sourcePods {
|
||||
|
|
|
@ -62,7 +62,7 @@ func (s sortedPods) Less(i, j int) bool {
|
|||
func CreateValidPod(name, namespace string) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: types.UID(name), // for the purpose of testing, this is unique enough
|
||||
UID: types.UID(name + namespace), // for the purpose of testing, this is unique enough
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
},
|
||||
|
@ -248,7 +248,7 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
|
|||
channel <- podUpdate
|
||||
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
|
||||
|
||||
podUpdate = CreatePodUpdate(kubetypes.REMOVE, TestSource, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "new"}})
|
||||
podUpdate = CreatePodUpdate(kubetypes.REMOVE, TestSource, CreateValidPod("foo", "new"))
|
||||
channel <- podUpdate
|
||||
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue