mirror of https://github.com/k3s-io/k3s
Merge pull request #66476 from mortent/IntegrationTestForDaemonSetHashCollision
Automatic merge from submit-queue (batch tested with PRs 67090, 67159, 66866, 62111, 66476). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Fix to handle hash collisions correctly for DaemonSets **What this PR does / why we need it**: This adds an integration test for the case where there is a hash collision when creating a ControllerRevision for a DaemonSet. It also fixes a shadowed variable that prevented this functionality from working as intended. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #62519 **Special notes for your reviewer**: /sig apps **Release note**: ```release-note Fixes issue when updating a DaemonSet causes a hash collision. ```pull/8/head
commit
e26f5d19d4
|
@ -330,16 +330,16 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (*
|
|||
}
|
||||
|
||||
history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(history)
|
||||
if errors.IsAlreadyExists(err) {
|
||||
if outerErr := err; errors.IsAlreadyExists(outerErr) {
|
||||
// TODO: Is it okay to get from historyLister?
|
||||
existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(name, metav1.GetOptions{})
|
||||
if getErr != nil {
|
||||
return nil, getErr
|
||||
}
|
||||
// Check if we already created it
|
||||
done, err := Match(ds, existedHistory)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
done, matchErr := Match(ds, existedHistory)
|
||||
if matchErr != nil {
|
||||
return nil, matchErr
|
||||
}
|
||||
if done {
|
||||
return existedHistory, nil
|
||||
|
@ -360,7 +360,7 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (*
|
|||
return nil, updateErr
|
||||
}
|
||||
glog.V(2).Infof("Found a hash collision for DaemonSet %q - bumping collisionCount to %d to resolve it", ds.Name, *currDS.Status.CollisionCount)
|
||||
return nil, err
|
||||
return nil, outerErr
|
||||
}
|
||||
return history, err
|
||||
}
|
||||
|
|
|
@ -16,12 +16,14 @@ go_test(
|
|||
deps = [
|
||||
"//pkg/api/legacyscheme:go_default_library",
|
||||
"//pkg/api/v1/pod:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/daemon:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/scheduler:go_default_library",
|
||||
"//pkg/scheduler/algorithm:go_default_library",
|
||||
"//pkg/scheduler/algorithmprovider:go_default_library",
|
||||
"//pkg/scheduler/factory:go_default_library",
|
||||
"//pkg/util/labels:go_default_library",
|
||||
"//pkg/util/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/api/apps/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
|
|
|
@ -41,6 +41,7 @@ import (
|
|||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/daemon"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler"
|
||||
|
@ -48,6 +49,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
||||
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||
labelsutil "k8s.io/kubernetes/pkg/util/labels"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
@ -372,6 +374,52 @@ func waitForPodsCreated(podInformer cache.SharedIndexInformer, num int) error {
|
|||
})
|
||||
}
|
||||
|
||||
func waitForDaemonSetAndControllerRevisionCreated(c clientset.Interface, name string, namespace string) error {
|
||||
return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
|
||||
ds, err := c.AppsV1().DaemonSets(namespace).Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if ds == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
revs, err := c.AppsV1().ControllerRevisions(namespace).List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if revs.Size() == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
for _, rev := range revs.Items {
|
||||
for _, oref := range rev.OwnerReferences {
|
||||
if oref.Kind == "DaemonSet" && oref.UID == ds.UID {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
func hashAndNameForDaemonSet(ds *apps.DaemonSet) (string, string) {
|
||||
hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount))
|
||||
name := ds.Name + "-" + hash
|
||||
return hash, name
|
||||
}
|
||||
|
||||
func validateDaemonSetCollisionCount(dsClient appstyped.DaemonSetInterface, dsName string, expCount int32, t *testing.T) {
|
||||
ds, err := dsClient.Get(dsName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to look up DaemonSet: %v", err)
|
||||
}
|
||||
collisionCount := ds.Status.CollisionCount
|
||||
if *collisionCount != expCount {
|
||||
t.Fatalf("Expected collisionCount to be %d, but found %d", expCount, *collisionCount)
|
||||
}
|
||||
}
|
||||
|
||||
func validateDaemonSetStatus(
|
||||
dsClient appstyped.DaemonSetInterface,
|
||||
dsName string,
|
||||
|
@ -740,3 +788,112 @@ func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T)
|
|||
validateDaemonSetStatus(dsClient, ds.Name, 1, t)
|
||||
})
|
||||
}
|
||||
|
||||
// TestLaunchWithHashCollision tests that a DaemonSet can be updated even if there is a
|
||||
// hash collision with an existing ControllerRevision
|
||||
func TestLaunchWithHashCollision(t *testing.T) {
|
||||
server, closeFn, dc, informers, clientset := setup(t)
|
||||
defer closeFn()
|
||||
ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t)
|
||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||
|
||||
dsClient := clientset.AppsV1().DaemonSets(ns.Name)
|
||||
podInformer := informers.Core().V1().Pods().Informer()
|
||||
nodeClient := clientset.CoreV1().Nodes()
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
informers.Start(stopCh)
|
||||
go dc.Run(1, stopCh)
|
||||
|
||||
setupScheduler(t, clientset, informers, stopCh)
|
||||
|
||||
// Create single node
|
||||
_, err := nodeClient.Create(newNode("single-node", nil))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create node: %v", err)
|
||||
}
|
||||
|
||||
// Create new DaemonSet with RollingUpdate strategy
|
||||
orgDs := newDaemonSet("foo", ns.Name)
|
||||
oneIntString := intstr.FromInt(1)
|
||||
orgDs.Spec.UpdateStrategy = apps.DaemonSetUpdateStrategy{
|
||||
Type: apps.RollingUpdateDaemonSetStrategyType,
|
||||
RollingUpdate: &apps.RollingUpdateDaemonSet{
|
||||
MaxUnavailable: &oneIntString,
|
||||
},
|
||||
}
|
||||
ds, err := dsClient.Create(orgDs)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create DaemonSet: %v", err)
|
||||
}
|
||||
|
||||
// Wait for the DaemonSet to be created before proceeding
|
||||
err = waitForDaemonSetAndControllerRevisionCreated(clientset, ds.Name, ds.Namespace)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create DeamonSet: %v", err)
|
||||
}
|
||||
|
||||
ds, err = dsClient.Get(ds.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get DaemonSet: %v", err)
|
||||
}
|
||||
var orgCollisionCount int32
|
||||
if ds.Status.CollisionCount != nil {
|
||||
orgCollisionCount = *ds.Status.CollisionCount
|
||||
}
|
||||
|
||||
// Look up the ControllerRevision for the DaemonSet
|
||||
_, name := hashAndNameForDaemonSet(ds)
|
||||
revision, err := clientset.AppsV1().ControllerRevisions(ds.Namespace).Get(name, metav1.GetOptions{})
|
||||
if err != nil || revision == nil {
|
||||
t.Fatalf("Failed to look up ControllerRevision: %v", err)
|
||||
}
|
||||
|
||||
// Create a "fake" ControllerRevision that we know will create a hash collision when we make
|
||||
// the next update
|
||||
one := int64(1)
|
||||
ds.Spec.Template.Spec.TerminationGracePeriodSeconds = &one
|
||||
|
||||
newHash, newName := hashAndNameForDaemonSet(ds)
|
||||
newRevision := &apps.ControllerRevision{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: newName,
|
||||
Namespace: ds.Namespace,
|
||||
Labels: labelsutil.CloneAndAddLabel(ds.Spec.Template.Labels, apps.DefaultDaemonSetUniqueLabelKey, newHash),
|
||||
Annotations: ds.Annotations,
|
||||
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(ds, apps.SchemeGroupVersion.WithKind("DaemonSet"))},
|
||||
},
|
||||
Data: revision.Data,
|
||||
Revision: revision.Revision + 1,
|
||||
}
|
||||
_, err = clientset.AppsV1().ControllerRevisions(ds.Namespace).Create(newRevision)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create ControllerRevision: %v", err)
|
||||
}
|
||||
|
||||
// Make an update of the DaemonSet which we know will create a hash collision when
|
||||
// the next ControllerRevision is created.
|
||||
_, err = dsClient.Update(ds)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to update DaemonSet: %v", err)
|
||||
}
|
||||
|
||||
// Wait for any pod with the latest Spec to exist
|
||||
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
|
||||
objects := podInformer.GetIndexer().List()
|
||||
for _, object := range objects {
|
||||
pod := object.(*v1.Pod)
|
||||
if *pod.Spec.TerminationGracePeriodSeconds == *ds.Spec.Template.Spec.TerminationGracePeriodSeconds {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to wait for Pods with the latest Spec to be created: %v", err)
|
||||
}
|
||||
|
||||
validateDaemonSetCollisionCount(dsClient, ds.Name, orgCollisionCount+1, t)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue