mirror of https://github.com/k3s-io/k3s
controller: statefulsets respect observed generation
parent
113cd7da0a
commit
dbab67aa32
|
@ -2747,6 +2747,7 @@ runTests() {
|
||||||
hpa_max_field=".spec.maxReplicas"
|
hpa_max_field=".spec.maxReplicas"
|
||||||
hpa_cpu_field=".spec.targetCPUUtilizationPercentage"
|
hpa_cpu_field=".spec.targetCPUUtilizationPercentage"
|
||||||
statefulset_replicas_field=".spec.replicas"
|
statefulset_replicas_field=".spec.replicas"
|
||||||
|
statefulset_observed_generation=".status.observedGeneration"
|
||||||
job_parallelism_field=".spec.parallelism"
|
job_parallelism_field=".spec.parallelism"
|
||||||
deployment_replicas=".spec.replicas"
|
deployment_replicas=".spec.replicas"
|
||||||
secret_data=".data"
|
secret_data=".data"
|
||||||
|
@ -3180,10 +3181,12 @@ runTests() {
|
||||||
### Scale statefulset test with current-replicas and replicas
|
### Scale statefulset test with current-replicas and replicas
|
||||||
# Pre-condition: 0 replicas
|
# Pre-condition: 0 replicas
|
||||||
kube::test::get_object_assert 'statefulset nginx' "{{$statefulset_replicas_field}}" '0'
|
kube::test::get_object_assert 'statefulset nginx' "{{$statefulset_replicas_field}}" '0'
|
||||||
|
kube::test::wait_object_assert 'statefulset nginx' "{{$statefulset_observed_generation}}" '1'
|
||||||
# Command: Scale up
|
# Command: Scale up
|
||||||
kubectl scale --current-replicas=0 --replicas=1 statefulset nginx "${kube_flags[@]}"
|
kubectl scale --current-replicas=0 --replicas=1 statefulset nginx "${kube_flags[@]}"
|
||||||
# Post-condition: 1 replica, named nginx-0
|
# Post-condition: 1 replica, named nginx-0
|
||||||
kube::test::get_object_assert 'statefulset nginx' "{{$statefulset_replicas_field}}" '1'
|
kube::test::get_object_assert 'statefulset nginx' "{{$statefulset_replicas_field}}" '1'
|
||||||
|
kube::test::wait_object_assert 'statefulset nginx' "{{$statefulset_observed_generation}}" '2'
|
||||||
# Typically we'd wait and confirm that N>1 replicas are up, but this framework
|
# Typically we'd wait and confirm that N>1 replicas are up, but this framework
|
||||||
# doesn't start the scheduler, so pet-0 will block all others.
|
# doesn't start the scheduler, so pet-0 will block all others.
|
||||||
# TODO: test robust scaling in an e2e.
|
# TODO: test robust scaling in an e2e.
|
||||||
|
|
|
@ -49,10 +49,10 @@ type StatefulPodControlInterface interface {
|
||||||
// DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful,
|
// DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful,
|
||||||
// the returned error is nil.
|
// the returned error is nil.
|
||||||
DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
|
DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
|
||||||
// UpdateStatefulSetStatus Updates the Status.Replicas of a StatefulSet. set is an in-out parameter, and any
|
// UpdateStatefulSetStatus updates the status of a StatefulSet. set is an in-out parameter, and any
|
||||||
// updates made to the set are made visible as mutations to the parameter. If the method is successful, the
|
// updates made to the set are made visible as mutations to the parameter. If the method is successful, the
|
||||||
// returned error is nil, and set has its Status.Replicas field set to replicas.
|
// returned error is nil, and set has its status updated.
|
||||||
UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error
|
UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRealStatefulPodControl(
|
func NewRealStatefulPodControl(
|
||||||
|
@ -150,9 +150,10 @@ func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (spc *realStatefulPodControl) UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error {
|
func (spc *realStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error {
|
||||||
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||||
set.Status.Replicas = replicas
|
set.Status.Replicas = replicas
|
||||||
|
set.Status.ObservedGeneration = &generation
|
||||||
_, err := spc.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set)
|
_, err := spc.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -30,6 +30,7 @@ import (
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
podapi "k8s.io/kubernetes/pkg/api/v1/pod"
|
podapi "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
|
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||||
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
|
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
|
||||||
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||||
|
@ -470,7 +471,7 @@ func TestStatefulPodControlUpdatesSetStatus(t *testing.T) {
|
||||||
update := action.(core.UpdateAction)
|
update := action.(core.UpdateAction)
|
||||||
return true, update.GetObject(), nil
|
return true, update.GetObject(), nil
|
||||||
})
|
})
|
||||||
if err := control.UpdateStatefulSetReplicas(set, 2); err != nil {
|
if err := control.UpdateStatefulSetStatus(set, 2, 1); err != nil {
|
||||||
t.Errorf("Error returned on successful status update: %s", err)
|
t.Errorf("Error returned on successful status update: %s", err)
|
||||||
}
|
}
|
||||||
if set.Status.Replicas != 2 {
|
if set.Status.Replicas != 2 {
|
||||||
|
@ -482,6 +483,24 @@ func TestStatefulPodControlUpdatesSetStatus(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStatefulPodControlUpdatesObservedGeneration(t *testing.T) {
|
||||||
|
recorder := record.NewFakeRecorder(10)
|
||||||
|
set := newStatefulSet(3)
|
||||||
|
fakeClient := &fake.Clientset{}
|
||||||
|
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
|
||||||
|
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
|
||||||
|
update := action.(core.UpdateAction)
|
||||||
|
sts := update.GetObject().(*apps.StatefulSet)
|
||||||
|
if sts.Status.ObservedGeneration == nil || *sts.Status.ObservedGeneration != int64(3) {
|
||||||
|
t.Errorf("expected observedGeneration to be synced with generation for statefulset %q", sts.Name)
|
||||||
|
}
|
||||||
|
return true, sts, nil
|
||||||
|
})
|
||||||
|
if err := control.UpdateStatefulSetStatus(set, 2, 3); err != nil {
|
||||||
|
t.Errorf("Error returned on successful status update: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) {
|
func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) {
|
||||||
recorder := record.NewFakeRecorder(10)
|
recorder := record.NewFakeRecorder(10)
|
||||||
set := newStatefulSet(3)
|
set := newStatefulSet(3)
|
||||||
|
@ -493,7 +512,7 @@ func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) {
|
||||||
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
|
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
|
||||||
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
|
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
|
||||||
})
|
})
|
||||||
if err := control.UpdateStatefulSetReplicas(set, 2); err == nil {
|
if err := control.UpdateStatefulSetStatus(set, 2, 1); err == nil {
|
||||||
t.Error("Failed update did not return error")
|
t.Error("Failed update did not return error")
|
||||||
}
|
}
|
||||||
events := collectEvents(recorder.Events)
|
events := collectEvents(recorder.Events)
|
||||||
|
@ -520,7 +539,7 @@ func TestStatefulPodControlUpdateReplicasConflict(t *testing.T) {
|
||||||
return true, update.GetObject(), nil
|
return true, update.GetObject(), nil
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
if err := control.UpdateStatefulSetReplicas(set, 2); err != nil {
|
if err := control.UpdateStatefulSetStatus(set, 2, 1); err != nil {
|
||||||
t.Errorf("UpdateStatefulSetStatus returned an error: %s", err)
|
t.Errorf("UpdateStatefulSetStatus returned an error: %s", err)
|
||||||
}
|
}
|
||||||
if set.Status.Replicas != 2 {
|
if set.Status.Replicas != 2 {
|
||||||
|
@ -544,7 +563,7 @@ func TestStatefulPodControlUpdateReplicasConflictFailure(t *testing.T) {
|
||||||
update := action.(core.UpdateAction)
|
update := action.(core.UpdateAction)
|
||||||
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
|
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
|
||||||
})
|
})
|
||||||
if err := control.UpdateStatefulSetReplicas(set, 2); err == nil {
|
if err := control.UpdateStatefulSetStatus(set, 2, 1); err == nil {
|
||||||
t.Error("UpdateStatefulSetStatus failed to return an error on get failure")
|
t.Error("UpdateStatefulSetStatus failed to return an error on get failure")
|
||||||
}
|
}
|
||||||
events := collectEvents(recorder.Events)
|
events := collectEvents(recorder.Events)
|
||||||
|
|
|
@ -99,14 +99,14 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
|
||||||
sort.Sort(ascendingOrdinal(condemned))
|
sort.Sort(ascendingOrdinal(condemned))
|
||||||
|
|
||||||
// if the current number of replicas has changed update the statefulSets replicas
|
// if the current number of replicas has changed update the statefulSets replicas
|
||||||
if set.Status.Replicas != int32(ready) {
|
if set.Status.Replicas != int32(ready) || set.Status.ObservedGeneration == nil || set.Generation > *set.Status.ObservedGeneration {
|
||||||
obj, err := api.Scheme.Copy(set)
|
obj, err := api.Scheme.Copy(set)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to copy set: %v", err)
|
return fmt.Errorf("unable to copy set: %v", err)
|
||||||
}
|
}
|
||||||
set = obj.(*apps.StatefulSet)
|
set = obj.(*apps.StatefulSet)
|
||||||
|
|
||||||
if err := ssc.podControl.UpdateStatefulSetReplicas(set, int32(ready)); err != nil {
|
if err := ssc.podControl.UpdateStatefulSetStatus(set, int32(ready), set.Generation); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -769,13 +769,14 @@ func (spc *fakeStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (spc *fakeStatefulPodControl) UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error {
|
func (spc *fakeStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error {
|
||||||
defer spc.updateStatusTracker.inc()
|
defer spc.updateStatusTracker.inc()
|
||||||
if spc.updateStatusTracker.errorReady() {
|
if spc.updateStatusTracker.errorReady() {
|
||||||
defer spc.updateStatusTracker.reset()
|
defer spc.updateStatusTracker.reset()
|
||||||
return spc.updateStatusTracker.err
|
return spc.updateStatusTracker.err
|
||||||
}
|
}
|
||||||
set.Status.Replicas = replicas
|
set.Status.Replicas = replicas
|
||||||
|
set.Status.ObservedGeneration = &generation
|
||||||
spc.setsIndexer.Update(set)
|
spc.setsIndexer.Update(set)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue