mirror of https://github.com/k3s-io/k3s
Merge pull request #27349 from resouer/delete
Automatic merge from submit-queue Generates DELETE pod update operations fixes #27105 Generates DELETE pod update operations to make the code and log more intuitive. 1. main refactoring is in `kubelet/config` 2. kubelet will log if it received DELETE, just like other OPs cc @Random-Liu :)pull/6/head
commit
72f6493376
|
@ -80,5 +80,5 @@ func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) {
|
||||||
|
|
||||||
//TODO(jdef) revisit this if/when executor failover lands
|
//TODO(jdef) revisit this if/when executor failover lands
|
||||||
// Force kubelet to delete all pods.
|
// Force kubelet to delete all pods.
|
||||||
kl.HandlePodDeletions(kl.GetPods())
|
kl.HandlePodRemoves(kl.GetPods())
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,10 +43,10 @@ const (
|
||||||
// PodConfigNotificationSnapshot delivers the full configuration as a SET whenever
|
// PodConfigNotificationSnapshot delivers the full configuration as a SET whenever
|
||||||
// any change occurs.
|
// any change occurs.
|
||||||
PodConfigNotificationSnapshot
|
PodConfigNotificationSnapshot
|
||||||
// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE message whenever pods are
|
// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE and DELETE message whenever pods are
|
||||||
// changed, and a SET message if there are any additions or removals.
|
// changed, and a SET message if there are any additions or removals.
|
||||||
PodConfigNotificationSnapshotAndUpdates
|
PodConfigNotificationSnapshotAndUpdates
|
||||||
// PodConfigNotificationIncremental delivers ADD, UPDATE, REMOVE, RECONCILE to the update channel.
|
// PodConfigNotificationIncremental delivers ADD, UPDATE, DELETE, REMOVE, RECONCILE to the update channel.
|
||||||
PodConfigNotificationIncremental
|
PodConfigNotificationIncremental
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -152,14 +152,14 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
||||||
defer s.updateLock.Unlock()
|
defer s.updateLock.Unlock()
|
||||||
|
|
||||||
seenBefore := s.sourcesSeen.Has(source)
|
seenBefore := s.sourcesSeen.Has(source)
|
||||||
adds, updates, deletes, reconciles := s.merge(source, change)
|
adds, updates, deletes, removes, reconciles := s.merge(source, change)
|
||||||
firstSet := !seenBefore && s.sourcesSeen.Has(source)
|
firstSet := !seenBefore && s.sourcesSeen.Has(source)
|
||||||
|
|
||||||
// deliver update notifications
|
// deliver update notifications
|
||||||
switch s.mode {
|
switch s.mode {
|
||||||
case PodConfigNotificationIncremental:
|
case PodConfigNotificationIncremental:
|
||||||
if len(deletes.Pods) > 0 {
|
if len(removes.Pods) > 0 {
|
||||||
s.updates <- *deletes
|
s.updates <- *removes
|
||||||
}
|
}
|
||||||
if len(adds.Pods) > 0 {
|
if len(adds.Pods) > 0 {
|
||||||
s.updates <- *adds
|
s.updates <- *adds
|
||||||
|
@ -167,9 +167,12 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
||||||
if len(updates.Pods) > 0 {
|
if len(updates.Pods) > 0 {
|
||||||
s.updates <- *updates
|
s.updates <- *updates
|
||||||
}
|
}
|
||||||
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 {
|
if len(deletes.Pods) > 0 {
|
||||||
|
s.updates <- *deletes
|
||||||
|
}
|
||||||
|
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
|
||||||
// Send an empty update when first seeing the source and there are
|
// Send an empty update when first seeing the source and there are
|
||||||
// no ADD or UPDATE pods from the source. This signals kubelet that
|
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
|
||||||
// the source is ready.
|
// the source is ready.
|
||||||
s.updates <- *adds
|
s.updates <- *adds
|
||||||
}
|
}
|
||||||
|
@ -179,15 +182,18 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
case PodConfigNotificationSnapshotAndUpdates:
|
case PodConfigNotificationSnapshotAndUpdates:
|
||||||
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
|
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
|
||||||
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
|
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
|
||||||
}
|
}
|
||||||
if len(updates.Pods) > 0 {
|
if len(updates.Pods) > 0 {
|
||||||
s.updates <- *updates
|
s.updates <- *updates
|
||||||
}
|
}
|
||||||
|
if len(deletes.Pods) > 0 {
|
||||||
|
s.updates <- *deletes
|
||||||
|
}
|
||||||
|
|
||||||
case PodConfigNotificationSnapshot:
|
case PodConfigNotificationSnapshot:
|
||||||
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
|
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
|
||||||
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
|
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,13 +206,14 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, reconciles *kubetypes.PodUpdate) {
|
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
|
||||||
s.podLock.Lock()
|
s.podLock.Lock()
|
||||||
defer s.podLock.Unlock()
|
defer s.podLock.Unlock()
|
||||||
|
|
||||||
addPods := []*api.Pod{}
|
addPods := []*api.Pod{}
|
||||||
updatePods := []*api.Pod{}
|
updatePods := []*api.Pod{}
|
||||||
deletePods := []*api.Pod{}
|
deletePods := []*api.Pod{}
|
||||||
|
removePods := []*api.Pod{}
|
||||||
reconcilePods := []*api.Pod{}
|
reconcilePods := []*api.Pod{}
|
||||||
|
|
||||||
pods := s.pods[source]
|
pods := s.pods[source]
|
||||||
|
@ -228,11 +235,13 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
||||||
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
|
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
|
||||||
if existing, found := oldPods[name]; found {
|
if existing, found := oldPods[name]; found {
|
||||||
pods[name] = existing
|
pods[name] = existing
|
||||||
needUpdate, needReconcile := checkAndUpdatePod(existing, ref)
|
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
|
||||||
if needUpdate {
|
if needUpdate {
|
||||||
updatePods = append(updatePods, existing)
|
updatePods = append(updatePods, existing)
|
||||||
} else if needReconcile {
|
} else if needReconcile {
|
||||||
reconcilePods = append(reconcilePods, existing)
|
reconcilePods = append(reconcilePods, existing)
|
||||||
|
} else if needGracefulDelete {
|
||||||
|
deletePods = append(deletePods, existing)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -244,9 +253,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
||||||
|
|
||||||
update := change.(kubetypes.PodUpdate)
|
update := change.(kubetypes.PodUpdate)
|
||||||
switch update.Op {
|
switch update.Op {
|
||||||
case kubetypes.ADD, kubetypes.UPDATE:
|
case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
|
||||||
if update.Op == kubetypes.ADD {
|
if update.Op == kubetypes.ADD {
|
||||||
glog.V(4).Infof("Adding new pods from source %s : %v", source, update.Pods)
|
glog.V(4).Infof("Adding new pods from source %s : %v", source, update.Pods)
|
||||||
|
} else if update.Op == kubetypes.DELETE {
|
||||||
|
glog.V(4).Infof("Graceful deleting pods from source %s : %v", source, update.Pods)
|
||||||
} else {
|
} else {
|
||||||
glog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods)
|
glog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods)
|
||||||
}
|
}
|
||||||
|
@ -259,7 +270,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
||||||
if existing, found := pods[name]; found {
|
if existing, found := pods[name]; found {
|
||||||
// this is a delete
|
// this is a delete
|
||||||
delete(pods, name)
|
delete(pods, name)
|
||||||
deletePods = append(deletePods, existing)
|
removePods = append(removePods, existing)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// this is a no-op
|
// this is a no-op
|
||||||
|
@ -275,7 +286,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
||||||
for name, existing := range oldPods {
|
for name, existing := range oldPods {
|
||||||
if _, found := pods[name]; !found {
|
if _, found := pods[name]; !found {
|
||||||
// this is a delete
|
// this is a delete
|
||||||
deletePods = append(deletePods, existing)
|
removePods = append(removePods, existing)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,10 +299,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
||||||
|
|
||||||
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
|
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
|
||||||
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
|
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
|
||||||
deletes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(deletePods), Source: source}
|
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
|
||||||
|
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
|
||||||
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
|
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
|
||||||
|
|
||||||
return adds, updates, deletes, reconciles
|
return adds, updates, deletes, removes, reconciles
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *podStorage) markSourceSet(source string) {
|
func (s *podStorage) markSourceSet(source string) {
|
||||||
|
@ -413,10 +425,13 @@ func podsDifferSemantically(existing, ref *api.Pod) bool {
|
||||||
|
|
||||||
// checkAndUpdatePod updates existing, and:
|
// checkAndUpdatePod updates existing, and:
|
||||||
// * if ref makes a meaningful change, returns needUpdate=true
|
// * if ref makes a meaningful change, returns needUpdate=true
|
||||||
|
// * if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true
|
||||||
// * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
|
// * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
|
||||||
// * else return both false
|
// * else return all false
|
||||||
// Now, needUpdate and needReconcile should never be both true
|
// Now, needUpdate, needGracefulDelete and needReconcile should never be both true
|
||||||
func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool) {
|
func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {
|
||||||
|
|
||||||
|
// 1. this is a reconcile
|
||||||
// TODO: it would be better to update the whole object and only preserve certain things
|
// TODO: it would be better to update the whole object and only preserve certain things
|
||||||
// like the source annotation or the UID (to ensure safety)
|
// like the source annotation or the UID (to ensure safety)
|
||||||
if !podsDifferSemantically(existing, ref) {
|
if !podsDifferSemantically(existing, ref) {
|
||||||
|
@ -431,7 +446,6 @@ func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// this is an update
|
|
||||||
|
|
||||||
// Overwrite the first-seen time with the existing one. This is our own
|
// Overwrite the first-seen time with the existing one. This is our own
|
||||||
// internal annotation, there is no need to update.
|
// internal annotation, there is no need to update.
|
||||||
|
@ -443,7 +457,15 @@ func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool)
|
||||||
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
|
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
|
||||||
existing.Status = ref.Status
|
existing.Status = ref.Status
|
||||||
updateAnnotations(existing, ref)
|
updateAnnotations(existing, ref)
|
||||||
needUpdate = true
|
|
||||||
|
// 2. this is an graceful delete
|
||||||
|
if ref.DeletionTimestamp != nil {
|
||||||
|
needGracefulDelete = true
|
||||||
|
} else {
|
||||||
|
// 3. this is an update
|
||||||
|
needUpdate = true
|
||||||
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,8 +22,10 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/conversion"
|
"k8s.io/kubernetes/pkg/conversion"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
|
@ -248,6 +250,25 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
|
||||||
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod))
|
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewPodAddedDelete(t *testing.T) {
|
||||||
|
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
|
||||||
|
|
||||||
|
// should register an add
|
||||||
|
addedPod := CreateValidPod("foo", "new")
|
||||||
|
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, addedPod)
|
||||||
|
channel <- podUpdate
|
||||||
|
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, addedPod))
|
||||||
|
|
||||||
|
// mark this pod as deleted
|
||||||
|
timestamp := unversioned.NewTime(time.Now())
|
||||||
|
deletedPod := CreateValidPod("foo", "new")
|
||||||
|
deletedPod.ObjectMeta.DeletionTimestamp = ×tamp
|
||||||
|
podUpdate = CreatePodUpdate(kubetypes.DELETE, TestSource, deletedPod)
|
||||||
|
channel <- podUpdate
|
||||||
|
// the existing pod should be gracefully deleted
|
||||||
|
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.DELETE, TestSource, addedPod))
|
||||||
|
}
|
||||||
|
|
||||||
func TestNewPodAddedUpdatedSet(t *testing.T) {
|
func TestNewPodAddedUpdatedSet(t *testing.T) {
|
||||||
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
|
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
|
||||||
|
|
||||||
|
|
|
@ -160,7 +160,7 @@ const (
|
||||||
type SyncHandler interface {
|
type SyncHandler interface {
|
||||||
HandlePodAdditions(pods []*api.Pod)
|
HandlePodAdditions(pods []*api.Pod)
|
||||||
HandlePodUpdates(pods []*api.Pod)
|
HandlePodUpdates(pods []*api.Pod)
|
||||||
HandlePodDeletions(pods []*api.Pod)
|
HandlePodRemoves(pods []*api.Pod)
|
||||||
HandlePodReconcile(pods []*api.Pod)
|
HandlePodReconcile(pods []*api.Pod)
|
||||||
HandlePodSyncs(pods []*api.Pod)
|
HandlePodSyncs(pods []*api.Pod)
|
||||||
HandlePodCleanups() error
|
HandlePodCleanups() error
|
||||||
|
@ -1673,7 +1673,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
|
||||||
// pod - the pod to sync
|
// pod - the pod to sync
|
||||||
// mirrorPod - the mirror pod for the pod to sync, if it is a static pod
|
// mirrorPod - the mirror pod for the pod to sync, if it is a static pod
|
||||||
// podStatus - the current status (TODO: always from the status manager?)
|
// podStatus - the current status (TODO: always from the status manager?)
|
||||||
// updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE)
|
// updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE, DELETE)
|
||||||
//
|
//
|
||||||
// The workflow is:
|
// The workflow is:
|
||||||
// * If the pod is being created, record pod worker start latency
|
// * If the pod is being created, record pod worker start latency
|
||||||
|
@ -2321,13 +2321,18 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
|
||||||
handler.HandlePodUpdates(u.Pods)
|
handler.HandlePodUpdates(u.Pods)
|
||||||
case kubetypes.REMOVE:
|
case kubetypes.REMOVE:
|
||||||
glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
|
glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
|
||||||
handler.HandlePodDeletions(u.Pods)
|
handler.HandlePodRemoves(u.Pods)
|
||||||
case kubetypes.RECONCILE:
|
case kubetypes.RECONCILE:
|
||||||
glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
|
glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
|
||||||
handler.HandlePodReconcile(u.Pods)
|
handler.HandlePodReconcile(u.Pods)
|
||||||
|
case kubetypes.DELETE:
|
||||||
|
glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
|
||||||
|
// DELETE is treated as a UPDATE because of graceful deletion.
|
||||||
|
handler.HandlePodUpdates(u.Pods)
|
||||||
case kubetypes.SET:
|
case kubetypes.SET:
|
||||||
// TODO: Do we want to support this?
|
// TODO: Do we want to support this?
|
||||||
glog.Errorf("Kubelet does not support snapshot update")
|
glog.Errorf("Kubelet does not support snapshot update")
|
||||||
|
|
||||||
}
|
}
|
||||||
case e := <-plegCh:
|
case e := <-plegCh:
|
||||||
// PLEG event for a pod; sync it.
|
// PLEG event for a pod; sync it.
|
||||||
|
@ -2463,9 +2468,9 @@ func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandlePodDeletions is the callback in the SyncHandler interface for pods
|
// HandlePodRemoves is the callback in the SyncHandler interface for pods
|
||||||
// being deleted from a config source.
|
// being removed from a config source.
|
||||||
func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
|
func (kl *Kubelet) HandlePodRemoves(pods []*api.Pod) {
|
||||||
start := kl.clock.Now()
|
start := kl.clock.Now()
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
kl.podManager.DeletePod(pod)
|
kl.podManager.DeletePod(pod)
|
||||||
|
|
|
@ -35,6 +35,8 @@ const (
|
||||||
SET PodOperation = iota
|
SET PodOperation = iota
|
||||||
// Pods with the given ids are new to this source
|
// Pods with the given ids are new to this source
|
||||||
ADD
|
ADD
|
||||||
|
// Pods with the given ids are gracefully deleted from this source
|
||||||
|
DELETE
|
||||||
// Pods with the given ids have been removed from this source
|
// Pods with the given ids have been removed from this source
|
||||||
REMOVE
|
REMOVE
|
||||||
// Pods with the given ids have been updated in this source
|
// Pods with the given ids have been updated in this source
|
||||||
|
|
Loading…
Reference in New Issue