Implements StatefulSet update

Implements history utilities for ControllerRevision in the controller/history package
StatefulSetStatus now has additional fields for consistency with DaemonSet and Deployment
StatefulSetStatus.Replicas now represents the current number of createdPods and StatefulSetStatus.ReadyReplicas is the current number of ready Pods
pull/6/head
Kenneth Owens 2017-06-04 15:30:31 -07:00
parent 4a01f44b73
commit 1b55f57391
26 changed files with 4789 additions and 448 deletions

View File

@ -723,6 +723,13 @@ func appsFuncs(t apitesting.TestingCommon) []interface{} {
if len(s.Spec.PodManagementPolicy) == 0 {
s.Spec.PodManagementPolicy = apps.OrderedReadyPodManagement
}
if len(s.Spec.UpdateStrategy.Type) == 0 {
s.Spec.UpdateStrategy.Type = apps.RollingUpdateStatefulSetStrategyType
}
if s.Spec.RevisionHistoryLimit == nil {
s.Spec.RevisionHistoryLimit = new(int32)
*s.Spec.RevisionHistoryLimit = 10
}
},
}
}

View File

@ -60,6 +60,54 @@ const (
ParallelPodManagement = "Parallel"
)
// StatefulSetUpdateStrategy indicates the strategy that the StatefulSet
// controller will use to perform updates. It includes any additional parameters
// necessary to perform the update for the indicated strategy.
type StatefulSetUpdateStrategy struct {
// Type indicates the type of the StatefulSetUpdateStrategy.
Type StatefulSetUpdateStrategyType
// Partition is used to communicate the ordinal at which to partition
// the StatefulSet when Type is PartitionStatefulSetStrategyType. This
// value must be set when Type is PartitionStatefulSetStrategyType,
// and it must be nil otherwise.
Partition *PartitionStatefulSetStrategy
}
// StatefulSetUpdateStrategyType is a string enumeration type that enumerates
// all possible update strategies for the StatefulSet controller.
type StatefulSetUpdateStrategyType string
const (
// PartitionStatefulSetStrategyType indicates that updates will only be
// applied to a partition of the StatefulSet. This is useful for canaries
// and phased roll outs. When a scale operation is performed with this
// strategy, new Pods will be created from the specification version indicated
// by the StatefulSet's currentRevision if there ordinal is less than the supplied
// Partition's ordinal. Otherwise, they will be created from the specification
// version indicated by the StatefulSet's updateRevision.
PartitionStatefulSetStrategyType StatefulSetUpdateStrategyType = "Partition"
// RollingUpdateStatefulSetStrategyType indicates that update will be
// applied to all Pods in the StatefulSet with respect to the StatefulSet
// ordering constraints. When a scale operation is performed with this
// strategy, new Pods will be created from the specification version indicated
// by the StatefulSet's updateRevision.
RollingUpdateStatefulSetStrategyType = "RollingUpdate"
// OnDeleteStatefulSetStrategyType triggers the legacy behavior. Version
// tracking and ordered rolling restarts are disabled. Pods are recreated
// from the StatefulSetSpec when they are manually deleted. When a scale
// operation is performed with this strategy,specification version indicated
// by the StatefulSet's currentRevision.
OnDeleteStatefulSetStrategyType = "OnDelete"
)
// PartitionStatefulSetStrategy contains the parameters used with the
// PartitionStatefulSetStrategyType.
type PartitionStatefulSetStrategy struct {
// Ordinal indicates the ordinal at which the StatefulSet should be
// partitioned.
Ordinal int32
}
// A StatefulSetSpec is the specification of a StatefulSet.
type StatefulSetSpec struct {
// Replicas is the desired number of replicas of the given Template.
@ -109,16 +157,47 @@ type StatefulSetSpec struct {
// all pods at once.
// +optional
PodManagementPolicy PodManagementPolicyType
// updateStrategy indicates the StatefulSetUpdateStrategy that will be
// employed to update Pods in the StatefulSet when a revision is made to
// Template.
UpdateStrategy StatefulSetUpdateStrategy
// revisionHistoryLimit is the maximum number of revisions that will
// be maintained in the StatefulSet's revision history. The revision history
// consists of all revisions not represented by a currently applied
// StatefulSetSpec version. The default value is 10.
RevisionHistoryLimit *int32
}
// StatefulSetStatus represents the current state of a StatefulSet.
type StatefulSetStatus struct {
// most recent generation observed by this StatefulSet.
// observedGeneration is the most recent generation observed for this StatefulSet. It corresponds to the
// StatefulSet's generation, which is updated on mutation by the API Server.
// +optional
ObservedGeneration *int64
// Replicas is the number of actual replicas.
// replicas is the number of Pods created by the StatefulSet controller.
Replicas int32
// readyReplicas is the number of Pods created by the StatefulSet controller that have a Ready Condition.
ReadyReplicas int32
// currentReplicas is the number of Pods created by the StatefulSet controller from the StatefulSet version
// indicated by currentRevision.
CurrentReplicas int32
// updatedReplicas is the number of Pods created by the StatefulSet controller from the StatefulSet version
// indicated by updateRevision.
UpdatedReplicas int32
// currentRevision, if not empty, indicates the version of the StatefulSet used to generate Pods in the
// sequence [0,currentReplicas).
CurrentRevision string
// updateRevision, if not empty, indicates the version of the StatefulSet used to generate Pods in the sequence
// [replicas-updatedReplicas,replicas)
UpdateRevision string
}
// StatefulSetList is a collection of StatefulSets.

View File

@ -37,6 +37,8 @@ func addConversionFuncs(scheme *runtime.Scheme) error {
err := scheme.AddConversionFuncs(
Convert_v1beta1_StatefulSetSpec_To_apps_StatefulSetSpec,
Convert_apps_StatefulSetSpec_To_v1beta1_StatefulSetSpec,
Convert_v1beta1_StatefulSetUpdateStrategy_To_apps_StatefulSetUpdateStrategy,
Convert_apps_StatefulSetUpdateStrategy_To_v1beta1_StatefulSetUpdateStrategy,
// extensions
// TODO: below conversions should be dropped in favor of auto-generated
// ones, see https://github.com/kubernetes/kubernetextensionsssues/39865
@ -109,6 +111,15 @@ func Convert_v1beta1_StatefulSetSpec_To_apps_StatefulSetSpec(in *StatefulSetSpec
} else {
out.VolumeClaimTemplates = nil
}
if err := Convert_v1beta1_StatefulSetUpdateStrategy_To_apps_StatefulSetUpdateStrategy(&in.UpdateStrategy, &out.UpdateStrategy, s); err != nil {
return err
}
if in.RevisionHistoryLimit != nil {
out.RevisionHistoryLimit = new(int32)
*out.RevisionHistoryLimit = *in.RevisionHistoryLimit
} else {
out.RevisionHistoryLimit = nil
}
out.ServiceName = in.ServiceName
out.PodManagementPolicy = apps.PodManagementPolicyType(in.PodManagementPolicy)
return nil
@ -140,8 +151,39 @@ func Convert_apps_StatefulSetSpec_To_v1beta1_StatefulSetSpec(in *apps.StatefulSe
} else {
out.VolumeClaimTemplates = nil
}
if in.RevisionHistoryLimit != nil {
out.RevisionHistoryLimit = new(int32)
*out.RevisionHistoryLimit = *in.RevisionHistoryLimit
} else {
out.RevisionHistoryLimit = nil
}
out.ServiceName = in.ServiceName
out.PodManagementPolicy = PodManagementPolicyType(in.PodManagementPolicy)
if err := Convert_apps_StatefulSetUpdateStrategy_To_v1beta1_StatefulSetUpdateStrategy(&in.UpdateStrategy, &out.UpdateStrategy, s); err != nil {
return err
}
return nil
}
func Convert_v1beta1_StatefulSetUpdateStrategy_To_apps_StatefulSetUpdateStrategy(in *StatefulSetUpdateStrategy, out *apps.StatefulSetUpdateStrategy, s conversion.Scope) error {
out.Type = apps.StatefulSetUpdateStrategyType(in.Type)
if in.Partition != nil {
out.Partition = new(apps.PartitionStatefulSetStrategy)
out.Partition.Ordinal = in.Partition.Ordinal
} else {
out.Partition = nil
}
return nil
}
func Convert_apps_StatefulSetUpdateStrategy_To_v1beta1_StatefulSetUpdateStrategy(in *apps.StatefulSetUpdateStrategy, out *StatefulSetUpdateStrategy, s conversion.Scope) error {
out.Type = StatefulSetUpdateStrategyType(in.Type)
if in.Partition != nil {
out.Partition = new(PartitionStatefulSetStrategy)
out.Partition.Ordinal = in.Partition.Ordinal
} else {
out.Partition = nil
}
return nil
}

View File

@ -30,6 +30,10 @@ func SetDefaults_StatefulSet(obj *StatefulSet) {
if len(obj.Spec.PodManagementPolicy) == 0 {
obj.Spec.PodManagementPolicy = OrderedReadyPodManagement
}
if obj.Spec.UpdateStrategy.Type == "" {
obj.Spec.UpdateStrategy.Type = OnDeleteStatefulSetStrategyType
}
labels := obj.Spec.Template.Labels
if labels != nil {
if obj.Spec.Selector == nil {
@ -45,6 +49,11 @@ func SetDefaults_StatefulSet(obj *StatefulSet) {
obj.Spec.Replicas = new(int32)
*obj.Spec.Replicas = 1
}
if obj.Spec.RevisionHistoryLimit == nil {
obj.Spec.RevisionHistoryLimit = new(int32)
*obj.Spec.RevisionHistoryLimit = 10
}
}
// SetDefaults_Deployment sets additional defaults compared to its counterpart

View File

@ -26,6 +26,7 @@ import (
const (
// StatefulSetInitAnnotation if present, and set to false, indicates that a Pod's readiness should be ignored.
StatefulSetInitAnnotation = "pod.alpha.kubernetes.io/initialized"
StatefulSetRevisionLabel = "statefulset.beta.kubernetes.io/revision"
)
// ScaleSpec describes the attributes of a scale subresource
@ -111,6 +112,54 @@ const (
ParallelPodManagement = "Parallel"
)
// StatefulSetUpdateStrategy indicates the strategy that the StatefulSet
// controller will use to perform updates. It includes any additional parameters
// necessary to perform the update for the indicated strategy.
type StatefulSetUpdateStrategy struct {
// Type indicates the type of the StatefulSetUpdateStrategy.
Type StatefulSetUpdateStrategyType `json:"type,omitempty" protobuf:"bytes,1,opt,name=type,casttype=StatefulSetStrategyType"`
// Partition is used to communicate the ordinal at which to partition
// the StatefulSet when Type is PartitionStatefulSetStrategyType. This
// value must be set when Type is PartitionStatefulSetStrategyType,
// and it must be nil otherwise.
Partition *PartitionStatefulSetStrategy `json:"partition,omitempty" protobuf:"bytes,2,opt,name=partition"`
}
// StatefulSetUpdateStrategyType is a string enumeration type that enumerates
// all possible update strategies for the StatefulSet controller.
type StatefulSetUpdateStrategyType string
const (
// PartitionStatefulSetStrategyType indicates that updates will only be
// applied to a partition of the StatefulSet. This is useful for canaries
// and phased roll outs. When a scale operation is performed with this
// strategy, new Pods will be created from the specification version indicated
// by the StatefulSet's currentRevision if there ordinal is less than the supplied
// Partition's ordinal. Otherwise, they will be created from the specification
// version indicated by the StatefulSet's updateRevision.
PartitionStatefulSetStrategyType StatefulSetUpdateStrategyType = "Partition"
// RollingUpdateStatefulSetStrategyType indicates that update will be
// applied to all Pods in the StatefulSet with respect to the StatefulSet
// ordering constraints. When a scale operation is performed with this
// strategy, new Pods will be created from the specification version indicated
// by the StatefulSet's updateRevision.
RollingUpdateStatefulSetStrategyType = "RollingUpdate"
// OnDeleteStatefulSetStrategyType triggers the legacy behavior. Version
// tracking and ordered rolling restarts are disabled. Pods are recreated
// from the StatefulSetSpec when they are manually deleted. When a scale
// operation is performed with this strategy,specification version indicated
// by the StatefulSet's currentRevision.
OnDeleteStatefulSetStrategyType = "OnDelete"
)
// PartitionStatefulSetStrategy contains the parameters used with the
// PartitionStatefulSetStrategyType.
type PartitionStatefulSetStrategy struct {
// Ordinal indicates the ordinal at which the StatefulSet should be
// partitioned.
Ordinal int32 `json:"ordinal" protobuf:"varint,1,opt,name=ordinal"`
}
// A StatefulSetSpec is the specification of a StatefulSet.
type StatefulSetSpec struct {
// replicas is the desired number of replicas of the given Template.
@ -160,16 +209,47 @@ type StatefulSetSpec struct {
// all pods at once.
// +optional
PodManagementPolicy PodManagementPolicyType `json:"podManagementPolicy,omitempty" protobuf:"bytes,6,opt,name=podManagementPolicy,casttype=PodManagementPolicyType"`
// updateStrategy indicates the StatefulSetUpdateStrategy that will be
// employed to update Pods in the StatefulSet when a revision is made to
// Template.
UpdateStrategy StatefulSetUpdateStrategy `json:"updateStrategy,omitempty" protobuf:"bytes,7,opt,name=updateStrategy"`
// revisionHistoryLimit is the maximum number of revisions that will
// be maintained in the StatefulSet's revision history. The revision history
// consists of all revisions not represented by a currently applied
// StatefulSetSpec version. The default value is 10.
RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty" protobuf:"varint,8,opt,name=revisionHistoryLimit"`
}
// StatefulSetStatus represents the current state of a StatefulSet.
type StatefulSetStatus struct {
// observedGeneration is the most recent generation observed by this StatefulSet.
// observedGeneration is the most recent generation observed for this StatefulSet. It corresponds to the
// StatefulSet's generation, which is updated on mutation by the API Server.
// +optional
ObservedGeneration *int64 `json:"observedGeneration,omitempty" protobuf:"varint,1,opt,name=observedGeneration"`
// replicas is the number of actual replicas.
// replicas is the number of Pods created by the StatefulSet controller.
Replicas int32 `json:"replicas" protobuf:"varint,2,opt,name=replicas"`
// readyReplicas is the number of Pods created by the StatefulSet controller that have a Ready Condition.
ReadyReplicas int32 `json:"readyReplicas,omitempty" protobuf:"varint,3,opt,name=readyReplicas"`
// currentReplicas is the number of Pods created by the StatefulSet controller from the StatefulSet version
// indicated by currentRevision.
CurrentReplicas int32 `json:"currentReplicas,omitempty" protobuf:"varint,4,opt,name=currentReplicas"`
// updatedReplicas is the number of Pods created by the StatefulSet controller from the StatefulSet version
// indicated by updateRevision.
UpdatedReplicas int32 `json:"updatedReplicas,omitempty" protobuf:"varint,5,opt,name=updatedReplicas"`
// currentRevision, if not empty, indicates the version of the StatefulSet used to generate Pods in the
// sequence [0,currentReplicas).
CurrentRevision string `json:"currentRevision,omitempty" protobuf:"bytes,6,opt,name=currentRevision"`
// updateRevision, if not empty, indicates the version of the StatefulSet used to generate Pods in the sequence
// [replicas-updatedReplicas,replicas)
UpdateRevision string `json:"updateRevision,omitempty" protobuf:"bytes,7,opt,name=updateRevision"`
}
// StatefulSetList is a collection of StatefulSets.

View File

@ -75,6 +75,40 @@ func ValidateStatefulSetSpec(spec *apps.StatefulSetSpec, fldPath *field.Path) fi
allErrs = append(allErrs, field.Invalid(fldPath.Child("podManagementPolicy"), spec.PodManagementPolicy, fmt.Sprintf("must be '%s' or '%s'", apps.OrderedReadyPodManagement, apps.ParallelPodManagement)))
}
switch spec.UpdateStrategy.Type {
case "":
allErrs = append(allErrs, field.Required(fldPath.Child("updateStrategy"), ""))
case apps.OnDeleteStatefulSetStrategyType, apps.RollingUpdateStatefulSetStrategyType:
if spec.UpdateStrategy.Partition != nil {
allErrs = append(
allErrs,
field.Invalid(
fldPath.Child("updateStrategy").Child("partition"),
spec.UpdateStrategy.Partition.Ordinal,
fmt.Sprintf("only allowed for updateStrategy '%s'", apps.PartitionStatefulSetStrategyType)))
}
case apps.PartitionStatefulSetStrategyType:
if spec.UpdateStrategy.Partition == nil {
allErrs = append(
allErrs,
field.Required(
fldPath.Child("updateStrategy").Child("partition"),
fmt.Sprintf("required for updateStrategy '%s'", apps.PartitionStatefulSetStrategyType)))
break
}
allErrs = append(allErrs,
apivalidation.ValidateNonnegativeField(
int64(spec.UpdateStrategy.Partition.Ordinal),
fldPath.Child("updateStrategy").Child("partition").Child("ordinal"))...)
default:
allErrs = append(allErrs,
field.Invalid(fldPath.Child("updateStrategy"), spec.UpdateStrategy,
fmt.Sprintf("must be '%s', '%s', or '%s'",
apps.RollingUpdateStatefulSetStrategyType,
apps.OnDeleteStatefulSetStrategyType,
apps.PartitionStatefulSetStrategyType)))
}
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(spec.Replicas), fldPath.Child("replicas"))...)
if spec.Selector == nil {
allErrs = append(allErrs, field.Required(fldPath.Child("selector"), ""))
@ -113,20 +147,21 @@ func ValidateStatefulSet(statefulSet *apps.StatefulSet) field.ErrorList {
func ValidateStatefulSetUpdate(statefulSet, oldStatefulSet *apps.StatefulSet) field.ErrorList {
allErrs := apivalidation.ValidateObjectMetaUpdate(&statefulSet.ObjectMeta, &oldStatefulSet.ObjectMeta, field.NewPath("metadata"))
// TODO: For now we're taking the safe route and disallowing all updates to
// spec except for Replicas, for scaling, and Template.Spec.containers.image
// for rolling-update. Enable others on a case by case basis.
restoreReplicas := statefulSet.Spec.Replicas
statefulSet.Spec.Replicas = oldStatefulSet.Spec.Replicas
restoreContainers := statefulSet.Spec.Template.Spec.Containers
statefulSet.Spec.Template.Spec.Containers = oldStatefulSet.Spec.Template.Spec.Containers
restoreTemplate := statefulSet.Spec.Template
statefulSet.Spec.Template = oldStatefulSet.Spec.Template
restoreStrategy := statefulSet.Spec.UpdateStrategy
statefulSet.Spec.UpdateStrategy = oldStatefulSet.Spec.UpdateStrategy
if !reflect.DeepEqual(statefulSet.Spec, oldStatefulSet.Spec) {
allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "updates to statefulset spec for fields other than 'replicas' and 'containers' are forbidden."))
allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "updates to statefulset spec for fields other than 'replicas', 'template', and 'updateStrategy' are forbidden."))
}
statefulSet.Spec.Replicas = restoreReplicas
statefulSet.Spec.Template.Spec.Containers = restoreContainers
statefulSet.Spec.Template = restoreTemplate
statefulSet.Spec.UpdateStrategy = restoreStrategy
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(statefulSet.Spec.Replicas), field.NewPath("spec", "replicas"))...)
containerErrs, _ := apivalidation.ValidateContainerUpdates(statefulSet.Spec.Template.Spec.Containers, oldStatefulSet.Spec.Template.Spec.Containers, field.NewPath("spec").Child("template").Child("containers"))

View File

@ -59,6 +59,7 @@ func TestValidateStatefulSet(t *testing.T) {
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
{
@ -67,6 +68,39 @@ func TestValidateStatefulSet(t *testing.T) {
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "abc-123", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
PodManagementPolicy: apps.ParallelPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "abc-123", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.OnDeleteStatefulSetStrategyType},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "abc-123", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
Replicas: 3,
UpdateStrategy: apps.StatefulSetUpdateStrategy{
Type: apps.PartitionStatefulSetStrategyType,
Partition: func() *apps.PartitionStatefulSetStrategy {
return &apps.PartitionStatefulSetStrategy{Ordinal: 2}
}()},
},
},
}
@ -83,6 +117,7 @@ func TestValidateStatefulSet(t *testing.T) {
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
"missing-namespace": {
@ -91,6 +126,7 @@ func TestValidateStatefulSet(t *testing.T) {
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
"empty selector": {
@ -98,6 +134,7 @@ func TestValidateStatefulSet(t *testing.T) {
Spec: apps.StatefulSetSpec{
PodManagementPolicy: apps.OrderedReadyPodManagement,
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
"selector_doesnt_match": {
@ -106,6 +143,7 @@ func TestValidateStatefulSet(t *testing.T) {
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
"invalid manifest": {
@ -113,6 +151,7 @@ func TestValidateStatefulSet(t *testing.T) {
Spec: apps.StatefulSetSpec{
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
"negative_replicas": {
@ -121,6 +160,7 @@ func TestValidateStatefulSet(t *testing.T) {
PodManagementPolicy: apps.OrderedReadyPodManagement,
Replicas: -1,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
"invalid_label": {
@ -135,6 +175,7 @@ func TestValidateStatefulSet(t *testing.T) {
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
"invalid_label 2": {
@ -148,6 +189,7 @@ func TestValidateStatefulSet(t *testing.T) {
Spec: apps.StatefulSetSpec{
PodManagementPolicy: apps.OrderedReadyPodManagement,
Template: invalidPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
"invalid_annotation": {
@ -162,6 +204,7 @@ func TestValidateStatefulSet(t *testing.T) {
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
"invalid restart policy 1": {
@ -182,6 +225,7 @@ func TestValidateStatefulSet(t *testing.T) {
Labels: validLabels,
},
},
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
"invalid restart policy 2": {
@ -202,6 +246,42 @@ func TestValidateStatefulSet(t *testing.T) {
Labels: validLabels,
},
},
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
"invalid udpate strategy": {
ObjectMeta: metav1.ObjectMeta{Name: "abc-123", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
Replicas: 3,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: "foo"},
},
},
"partitioned rolling update": {
ObjectMeta: metav1.ObjectMeta{Name: "abc-123", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
Replicas: 3,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType,
Partition: func() *apps.PartitionStatefulSetStrategy {
return &apps.PartitionStatefulSetStrategy{Ordinal: 2}
}()},
},
},
"empty partition": {
ObjectMeta: metav1.ObjectMeta{Name: "abc-123", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
Replicas: 3,
UpdateStrategy: apps.StatefulSetUpdateStrategy{
Type: apps.PartitionStatefulSetStrategyType,
Partition: nil},
},
},
}
@ -222,7 +302,10 @@ func TestValidateStatefulSet(t *testing.T) {
field != "spec.template.labels" &&
field != "metadata.annotations" &&
field != "metadata.labels" &&
field != "status.replicas" {
field != "status.replicas" &&
field != "spec.updateStrategy" &&
field != "spec.updateStrategy.partition" &&
field != "spec.updateStrategy.partition.ordinal" {
t.Errorf("%s: missing prefix for: %v", k, errs[i])
}
}
@ -280,6 +363,7 @@ func TestValidateStatefulSetUpdate(t *testing.T) {
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
update: apps.StatefulSet{
@ -289,6 +373,7 @@ func TestValidateStatefulSetUpdate(t *testing.T) {
Replicas: 3,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
},
@ -305,8 +390,9 @@ func TestValidateStatefulSetUpdate(t *testing.T) {
old: apps.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: "", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
update: apps.StatefulSet{
@ -316,6 +402,7 @@ func TestValidateStatefulSetUpdate(t *testing.T) {
Replicas: 2,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: readWriteVolumePodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
},
@ -323,16 +410,18 @@ func TestValidateStatefulSetUpdate(t *testing.T) {
old: apps.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
update: apps.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
Replicas: 3,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
Replicas: 3,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
},
@ -340,8 +429,9 @@ func TestValidateStatefulSetUpdate(t *testing.T) {
old: apps.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
update: apps.StatefulSet{
@ -351,24 +441,7 @@ func TestValidateStatefulSetUpdate(t *testing.T) {
Replicas: 3,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
},
},
},
"updates to a field other than spec.Replicas": {
old: apps.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
},
},
update: apps.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
PodManagementPolicy: apps.OrderedReadyPodManagement,
Replicas: 1,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: readWriteVolumePodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
},
@ -376,8 +449,9 @@ func TestValidateStatefulSetUpdate(t *testing.T) {
old: apps.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: "", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
update: apps.StatefulSet{
@ -387,6 +461,7 @@ func TestValidateStatefulSetUpdate(t *testing.T) {
Replicas: 2,
Selector: &metav1.LabelSelector{MatchLabels: invalidLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
},
@ -397,14 +472,16 @@ func TestValidateStatefulSetUpdate(t *testing.T) {
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
update: apps.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: apps.StatefulSetSpec{
Replicas: 2,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: invalidPodTemplate.Template,
Replicas: 2,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: invalidPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
},
@ -423,6 +500,7 @@ func TestValidateStatefulSetUpdate(t *testing.T) {
Replicas: -1,
Selector: &metav1.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
},
},

16
pkg/controller/history/OWNERS Executable file
View File

@ -0,0 +1,16 @@
approvers:
- bprashanth
- enisoc
- foxish
- janetkuo
- kargakis
- kow3ns
- smarterclayton
reviewers:
- bprashanth
- enisoc
- foxish
- janetkuo
- kargakis
- kow3ns
- smarterclayton

View File

@ -0,0 +1,490 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package history
import (
"bytes"
"fmt"
"hash/fnv"
"sort"
"strconv"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
"k8s.io/kubernetes/pkg/controller"
hashutil "k8s.io/kubernetes/pkg/util/hash"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/client/retry"
)
// ControllerRevisionHashLabel is the label used to indicate the hash value of a ControllerRevision's Data.
const ControllerRevisionHashLabel = "controller.kubernetes.io/hash"
// ControllerRevisionName returns the Name for a ControllerRevision in the form prefix-hash. If the length
// of prefix is greater than 223 bytes, it is truncated to allow for a name that is no larger than 253 bytes.
func ControllerRevisionName(prefix string, hash uint32) string {
if len(prefix) > 223 {
prefix = prefix[:223]
}
return fmt.Sprintf("%s-%d", prefix, hash)
}
// NewControllerRevision returns the a ControllerRevision with a ControllerRef pointing parent and indicating that
// parent is of parentKind. The ControllerRevision has labels matching selector, contains Data equal to data, and
// has a Revision equal to revision. If the returned error is nil, the returned ControllerRevision is valid. If the
// returned error is not nil, the returned ControllerRevision is invalid for use.
func NewControllerRevision(parent metav1.Object,
parentKind schema.GroupVersionKind,
selector labels.Selector,
data runtime.RawExtension,
revision int64) (*apps.ControllerRevision, error) {
labelMap, err := labels.ConvertSelectorToLabelsMap(selector.String())
if err != nil {
return nil, err
}
blockOwnerDeletion := true
isController := true
cr := &apps.ControllerRevision{
ObjectMeta: metav1.ObjectMeta{
Labels: labelMap,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: parentKind.GroupVersion().String(),
Kind: parentKind.Kind,
Name: parent.GetName(),
UID: parent.GetUID(),
BlockOwnerDeletion: &blockOwnerDeletion,
Controller: &isController,
},
},
},
Data: data,
Revision: revision,
}
hash := HashControllerRevision(cr, nil)
cr.Name = ControllerRevisionName(parent.GetName(), hash)
cr.Labels[ControllerRevisionHashLabel] = strconv.FormatInt(int64(hash), 10)
return cr, nil
}
// HashControllerRevision hashes the contents of revision's Data using FNV hashing. If probe is not nil, the byte value
// of probe is added written to the hash as well.
func HashControllerRevision(revision *apps.ControllerRevision, probe *uint32) uint32 {
hf := fnv.New32()
if len(revision.Data.Raw) > 0 {
hf.Write(revision.Data.Raw)
}
if revision.Data.Object != nil {
hashutil.DeepHashObject(hf, revision.Data.Object)
}
if probe != nil {
hf.Write([]byte(strconv.FormatInt(int64(*probe), 10)))
}
return hf.Sum32()
}
// SortControllerRevisions sorts revisions by their Revision.
func SortControllerRevisions(revisions []*apps.ControllerRevision) {
sort.Sort(byRevision(revisions))
}
// EqualRevision returns true if lhs and rhs are either both nil, or both point to non-nil ControllerRevisions that
// contain semantically equivalent data. Otherwise this method returns false.
func EqualRevision(lhs *apps.ControllerRevision, rhs *apps.ControllerRevision) bool {
var lhsHash, rhsHash *uint32
if lhs == nil || rhs == nil {
return lhs == rhs
}
if hs, found := lhs.Labels[ControllerRevisionHashLabel]; found {
hash, err := strconv.ParseInt(hs, 10, 32)
if err == nil {
lhsHash = new(uint32)
*lhsHash = uint32(hash)
}
}
if hs, found := rhs.Labels[ControllerRevisionHashLabel]; found {
hash, err := strconv.ParseInt(hs, 10, 32)
if err == nil {
rhsHash = new(uint32)
*rhsHash = uint32(hash)
}
}
if lhsHash != nil && rhsHash != nil && *lhsHash != *rhsHash {
return false
}
return bytes.Equal(lhs.Data.Raw, rhs.Data.Raw) && apiequality.Semantic.DeepEqual(lhs.Data.Object, rhs.Data.Object)
}
// FindEqualRevisions returns all ControllerRevisions in revisions that are equal to needle using EqualRevision as the
// equality test. The returned slice preserves the order of revisions.
func FindEqualRevisions(revisions []*apps.ControllerRevision, needle *apps.ControllerRevision) []*apps.ControllerRevision {
var eq []*apps.ControllerRevision
for i := range revisions {
if EqualRevision(revisions[i], needle) {
eq = append(eq, revisions[i])
}
}
return eq
}
// byRevision implements sort.Interface to allow ControllerRevisions to be sorted by Revision.
type byRevision []*apps.ControllerRevision
func (br byRevision) Len() int {
return len(br)
}
func (br byRevision) Less(i, j int) bool {
return br[i].Revision < br[j].Revision
}
func (br byRevision) Swap(i, j int) {
br[i], br[j] = br[j], br[i]
}
// Interface provides an interface allowing for management of a Controller's history as realized by recorded
// ControllerRevisions. An instance of Interface can be retrieved from NewHistory. Implementations must treat all
// pointer parameters as "in" parameter, and they must not be mutated.
type Interface interface {
// ListControllerRevisions lists all ControllerRevisions matching selector and owned by parent or no other
// controller. If the returned error is nil the returned slice of ControllerRevisions is valid. If the
// returned error is not nil, the returned slice is not valid.
ListControllerRevisions(parent metav1.Object, selector labels.Selector) ([]*apps.ControllerRevision, error)
// CreateControllerRevision attempts to create the revision as owned by parent via a ControllerRef. If name
// collision occurs, a unique identifier is added to the hash of the revision and it is renamed using
// ControllerRevisionName. Implementations may cease to attempt to retry creation after some number of attempts
// and return an error. If the returned error is not nil, creation failed. If the returned error is nil, the
// returned ControllerRevision has been created.
CreateControllerRevision(parent metav1.Object, revision *apps.ControllerRevision) (*apps.ControllerRevision, error)
// DeleteControllerRevision attempts to delete revision. If the returned error is not nil, deletion has failed.
DeleteControllerRevision(revision *apps.ControllerRevision) error
// UpdateControllerRevision updates revision such that its Revision is equal to newRevision. Implementations
// may retry on conflict. If the returned error is nil, the update was successful and returned ControllerRevision
// is valid. If the returned error is not nil, the update failed and the returned ControllerRevision is invalid.
UpdateControllerRevision(revision *apps.ControllerRevision, newRevision int64) (*apps.ControllerRevision, error)
// AdoptControllerRevision attempts to adopt revision by adding a ControllerRef indicating that the parent
// Object of parentKind is the owner of revision. If revision is already owned, an error is returned. If the
// resource patch fails, an error is returned. If no error is returned, the returned ControllerRevision is
// valid.
AdoptControllerRevision(parent metav1.Object, parentKind schema.GroupVersionKind, revision *apps.ControllerRevision) (*apps.ControllerRevision, error)
// ReleaseControllerRevision attempts to release parent's ownership of revision by removing parent from the
// OwnerReferences of revision. If an error is returned, parent remains the owner of revision. If no error is
// returned, the returned ControllerRevision is valid.
ReleaseControllerRevision(parent metav1.Object, revision *apps.ControllerRevision) (*apps.ControllerRevision, error)
}
// NewHistory returns an instance of Interface that uses client to communicate with the API Server and lister to list
// ControllerRevisions. This method should be used to create an Interface for all scenarios other than testing.
func NewHistory(client clientset.Interface, lister appslisters.ControllerRevisionLister) Interface {
return &realHistory{client, lister}
}
// NewFakeHistory returns an instance of Interface that uses informer to create, update, list, and delete
// ControllerRevisions. This method should be used to create an Interface for testing purposes.
func NewFakeHistory(informer appsinformers.ControllerRevisionInformer) Interface {
return &fakeHistory{informer.Informer().GetIndexer(), informer.Lister()}
}
type realHistory struct {
client clientset.Interface
lister appslisters.ControllerRevisionLister
}
func (rh *realHistory) ListControllerRevisions(parent metav1.Object, selector labels.Selector) ([]*apps.ControllerRevision, error) {
// List all revisions in the namespace that match the selector
history, err := rh.lister.ControllerRevisions(parent.GetNamespace()).List(selector)
if err != nil {
return nil, err
}
var owned []*apps.ControllerRevision
for i := range history {
ref := controller.GetControllerOf(history[i])
if ref == nil || ref.UID == parent.GetUID() {
owned = append(owned, history[i])
}
}
return owned, err
}
func (rh *realHistory) CreateControllerRevision(parent metav1.Object, revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
// Initialize the probe to 0
probe := uint32(0)
// Clone the input
any, err := scheme.Scheme.DeepCopy(revision)
if err != nil {
return nil, err
}
clone := any.(*apps.ControllerRevision)
// Continue to attempt to create the revision updating the name with a new hash on each iteration
for {
var hash uint32
// The first attempt uses no probe to resolve collisions
if probe > 0 {
hash = HashControllerRevision(revision, &probe)
} else {
hash = HashControllerRevision(revision, nil)
}
// Update the revisions name and labels
clone.Name = ControllerRevisionName(parent.GetName(), hash)
created, err := rh.client.Apps().ControllerRevisions(parent.GetNamespace()).Create(clone)
if errors.IsAlreadyExists(err) {
probe++
continue
}
return created, err
}
}
func (rh *realHistory) UpdateControllerRevision(revision *apps.ControllerRevision, newRevision int64) (*apps.ControllerRevision, error) {
obj, err := scheme.Scheme.DeepCopy(revision)
if err != nil {
return nil, err
}
clone := obj.(*apps.ControllerRevision)
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if clone.Revision == newRevision {
return nil
}
clone.Revision = newRevision
updated, updateErr := rh.client.Apps().ControllerRevisions(clone.Namespace).Update(clone)
if updateErr == nil {
return nil
}
if updated != nil {
clone = updated
}
if updated, err := rh.lister.ControllerRevisions(clone.Namespace).Get(clone.Name); err == nil {
// make a copy so we don't mutate the shared cache
obj, err := scheme.Scheme.DeepCopy(updated)
if err != nil {
return err
}
clone = obj.(*apps.ControllerRevision)
}
return updateErr
})
return clone, err
}
func (rh *realHistory) DeleteControllerRevision(revision *apps.ControllerRevision) error {
return rh.client.Apps().ControllerRevisions(revision.Namespace).Delete(revision.Name, nil)
}
func (rh *realHistory) AdoptControllerRevision(parent metav1.Object, parentKind schema.GroupVersionKind, revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
// Return an error if the parent does not own the revision
if owner := controller.GetControllerOf(revision); owner != nil {
return nil, fmt.Errorf("attempt to adopt revision owned by %v", owner)
}
// Use strategic merge patch to add an owner reference indicating a controller ref
return rh.client.Apps().ControllerRevisions(parent.GetNamespace()).Patch(revision.GetName(),
types.StrategicMergePatchType, []byte(fmt.Sprintf(
`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`,
parentKind.GroupVersion().String(), parentKind.Kind,
parent.GetName(), parent.GetUID(), revision.UID)))
}
func (rh *realHistory) ReleaseControllerRevision(parent metav1.Object, revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
// Use strategic merge patch to add an owner reference indicating a controller ref
released, err := rh.client.Apps().ControllerRevisions(revision.GetNamespace()).Patch(revision.GetName(),
types.StrategicMergePatchType,
[]byte(fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, parent.GetUID(), revision.UID)))
if err != nil {
if errors.IsNotFound(err) {
// We ignore deleted revisions
return nil, nil
}
if errors.IsInvalid(err) {
// We ignore cases where the parent no longer owns the revision or where the revision has no
// owner.
return nil, nil
}
}
return released, err
}
type fakeHistory struct {
indexer cache.Indexer
lister appslisters.ControllerRevisionLister
}
func (fh *fakeHistory) ListControllerRevisions(parent metav1.Object, selector labels.Selector) ([]*apps.ControllerRevision, error) {
history, err := fh.lister.ControllerRevisions(parent.GetNamespace()).List(selector)
if err != nil {
return nil, err
}
var owned []*apps.ControllerRevision
for i := range history {
ref := controller.GetControllerOf(history[i])
if ref == nil || ref.UID == parent.GetUID() {
owned = append(owned, history[i])
}
}
return owned, err
}
func (fh *fakeHistory) addRevision(revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(revision)
if err != nil {
return nil, err
}
obj, found, err := fh.indexer.GetByKey(key)
if err != nil {
return nil, err
}
if found {
foundRevision := obj.(*apps.ControllerRevision)
return foundRevision, errors.NewAlreadyExists(apps.Resource("controllerrevision"), revision.Name)
}
return revision, fh.indexer.Update(revision)
}
func (fh *fakeHistory) CreateControllerRevision(parent metav1.Object, revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
// Initialize the probe to 0
probe := uint32(0)
// Clone the input
any, err := scheme.Scheme.DeepCopy(revision)
if err != nil {
return nil, err
}
clone := any.(*apps.ControllerRevision)
clone.Namespace = parent.GetNamespace()
// Continue to attempt to create the revision updating the name with a new hash on each iteration
for {
var hash uint32
// The first attempt uses no probe to resolve collisions
if probe > 0 {
hash = HashControllerRevision(revision, &probe)
} else {
hash = HashControllerRevision(revision, nil)
}
// Update the revisions name and labels
clone.Name = ControllerRevisionName(parent.GetName(), hash)
created, err := fh.addRevision(clone)
if errors.IsAlreadyExists(err) {
probe++
continue
}
return created, err
}
}
func (fh *fakeHistory) DeleteControllerRevision(revision *apps.ControllerRevision) error {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(revision)
if err != nil {
return err
}
obj, found, err := fh.indexer.GetByKey(key)
if err != nil {
return err
}
if !found {
return errors.NewNotFound(apps.Resource("controllerrevisions"), revision.Name)
}
return fh.indexer.Delete(obj)
}
func (fh *fakeHistory) UpdateControllerRevision(revision *apps.ControllerRevision, newRevision int64) (*apps.ControllerRevision, error) {
obj, err := scheme.Scheme.DeepCopy(revision)
if err != nil {
return nil, err
}
clone := obj.(*apps.ControllerRevision)
clone.Revision = newRevision
return clone, fh.indexer.Update(clone)
}
func (fh *fakeHistory) AdoptControllerRevision(parent metav1.Object, parentKind schema.GroupVersionKind, revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
blockOwnerDeletion := true
isController := true
if owner := controller.GetControllerOf(revision); owner != nil {
return nil, fmt.Errorf("attempt to adopt revision owned by %v", owner)
}
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(revision)
if err != nil {
return nil, err
}
_, found, err := fh.indexer.GetByKey(key)
if err != nil {
return nil, err
}
if !found {
return nil, errors.NewNotFound(apps.Resource("controllerrevisions"), revision.Name)
}
obj2, err := scheme.Scheme.DeepCopy(revision)
if err != nil {
return nil, err
}
clone := obj2.(*apps.ControllerRevision)
clone.OwnerReferences = append(clone.OwnerReferences, metav1.OwnerReference{
APIVersion: parentKind.GroupVersion().String(),
Kind: parentKind.Kind,
Name: parent.GetName(),
UID: parent.GetUID(),
BlockOwnerDeletion: &blockOwnerDeletion,
Controller: &isController,
})
return clone, fh.indexer.Update(clone)
}
func (fh *fakeHistory) ReleaseControllerRevision(parent metav1.Object, revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(revision)
if err != nil {
return nil, err
}
_, found, err := fh.indexer.GetByKey(key)
if err != nil {
return nil, err
}
if !found {
return nil, nil
}
obj2, err := scheme.Scheme.DeepCopy(revision)
if err != nil {
return nil, err
}
clone := obj2.(*apps.ControllerRevision)
refs := clone.OwnerReferences
clone.OwnerReferences = nil
for i := range refs {
if refs[i].UID != parent.GetUID() {
clone.OwnerReferences = append(clone.OwnerReferences, refs[i])
}
}
return clone, fh.indexer.Update(clone)
}

File diff suppressed because it is too large Load Diff

View File

@ -49,10 +49,6 @@ type StatefulPodControlInterface interface {
// DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful,
// the returned error is nil.
DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
// UpdateStatefulSetStatus updates the status of a StatefulSet. set is an in-out parameter, and any
// updates made to the set are made visible as mutations to the parameter. If the method is successful, the
// returned error is nil, and set has its status updated.
UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error
}
func NewRealStatefulPodControl(
@ -93,7 +89,6 @@ func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod
func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
attemptedUpdate := false
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// assume the Pod is consistent
consistent := true
@ -149,30 +144,6 @@ func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod
return err
}
func (spc *realStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
set.Status.Replicas = replicas
set.Status.ObservedGeneration = &generation
_, updateErr := spc.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set)
if updateErr == nil {
return nil
}
if updated, err := spc.setLister.StatefulSets(set.Namespace).Get(set.Name); err == nil {
// make a copy so we don't mutate the shared cache
if copy, err := scheme.Scheme.DeepCopy(updated); err == nil {
set = copy.(*apps.StatefulSet)
} else {
utilruntime.HandleError(fmt.Errorf("error copying updated StatefulSet: %v", err))
}
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated StatefulSet %s/%s from lister: %v", set.Namespace, set.Name, err))
}
return updateErr
})
}
// recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will
// have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a reason of v1.EventTypeWarning.
func (spc *realStatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) {

View File

@ -29,9 +29,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
)
@ -461,116 +459,6 @@ func TestStatefulPodControlDeleteFailure(t *testing.T) {
}
}
func TestStatefulPodControlUpdatesSetStatus(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), nil
})
if err := control.UpdateStatefulSetStatus(set, 2, 1); err != nil {
t.Errorf("Error returned on successful status update: %s", err)
}
if set.Status.Replicas != 2 {
t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.Replicas)
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 0 {
t.Errorf("Expected 0 events for successful status update %d", eventCount)
}
}
func TestStatefulPodControlUpdatesObservedGeneration(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
sts := update.GetObject().(*apps.StatefulSet)
if sts.Status.ObservedGeneration == nil || *sts.Status.ObservedGeneration != int64(3) {
t.Errorf("expected observedGeneration to be synced with generation for statefulset %q", sts.Name)
}
return true, sts, nil
})
if err := control.UpdateStatefulSetStatus(set, 2, 3); err != nil {
t.Errorf("Error returned on successful status update: %s", err)
}
}
func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
if err := control.UpdateStatefulSetStatus(set, 2, 1); err == nil {
t.Error("Failed update did not return error")
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 0 {
t.Errorf("Expected 0 events for successful status update %d", eventCount)
}
}
func TestStatefulPodControlUpdateReplicasConflict(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
conflict := false
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
if !conflict {
conflict = true
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
} else {
return true, update.GetObject(), nil
}
})
if err := control.UpdateStatefulSetStatus(set, 2, 1); err != nil {
t.Errorf("UpdateStatefulSetStatus returned an error: %s", err)
}
if set.Status.Replicas != 2 {
t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.Replicas)
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 0 {
t.Errorf("Expected 0 events for successful status update %d", eventCount)
}
}
func TestStatefulPodControlUpdateReplicasConflictFailure(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
})
if err := control.UpdateStatefulSetStatus(set, 2, 1); err == nil {
t.Error("UpdateStatefulSetStatus failed to return an error on get failure")
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 0 {
t.Errorf("Expected 0 events for successful status update %d", eventCount)
}
}
func collectEvents(source <-chan string) []string {
done := false
events := make([]string, 0)

View File

@ -40,6 +40,7 @@ import (
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
"github.com/golang/glog"
)
@ -80,6 +81,7 @@ func NewStatefulSetController(
podInformer coreinformers.PodInformer,
setInformer appsinformers.StatefulSetInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
revInformer appsinformers.ControllerRevisionInformer,
kubeClient clientset.Interface,
) *StatefulSetController {
eventBroadcaster := record.NewBroadcaster()
@ -95,8 +97,9 @@ func NewStatefulSetController(
setInformer.Lister(),
podInformer.Lister(),
pvcInformer.Lister(),
recorder,
),
recorder),
NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
history.NewHistory(kubeClient, revInformer.Lister()),
),
pvcListerSynced: pvcInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
@ -305,6 +308,32 @@ func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, s
return cm.ClaimPods(pods, filter)
}
// adoptOrphanRevisions adopts any orphaned ControllerRevisions matched by set's Selector.
func (ssc *StatefulSetController) adoptOrphanRevisions(set *apps.StatefulSet) error {
revisions, err := ssc.control.ListRevisions(set)
if err != nil {
return err
}
hasOrphans := false
for i := range revisions {
if controller.GetControllerOf(revisions[i]) == nil {
hasOrphans = true
break
}
}
if hasOrphans {
fresh, err := ssc.kubeClient.AppsV1beta1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{})
if err != nil {
return err
}
if fresh.UID != set.UID {
return fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
}
return ssc.control.AdoptOrphanRevisions(set, revisions)
}
return nil
}
// getStatefulSetsForPod returns a list of StatefulSets that potentially match
// a given pod.
func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet {
@ -406,6 +435,10 @@ func (ssc *StatefulSetController) sync(key string) error {
return nil
}
if err := ssc.adoptOrphanRevisions(set); err != nil {
return err
}
pods, err := ssc.getPodsForStatefulSet(set, selector)
if err != nil {
return err

View File

@ -17,12 +17,14 @@ limitations under the License.
package statefulset
import (
"fmt"
"math"
"sort"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/controller/history"
"github.com/golang/glog"
)
@ -36,18 +38,30 @@ type StatefulSetControlInterface interface {
// Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
// exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error
// ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
// error is nil, the returns slice of ControllerRevisions is valid.
ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
// AdoptOrphanRevisions adopts any orphaned ControllerRevisions that match set's Selector. If all adoptions are
// successful the returned error is nil.
AdoptOrphanRevisions(set *apps.StatefulSet, revisions []*apps.ControllerRevision) error
}
// NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that
// implements the documented semantics for StatefulSets. podControl is the PodControlInterface used to create, update,
// and delete Pods and to create PersistentVolumeClaims. You should use an instance returned from
// NewRealStatefulPodControl() for any scenario other than testing.
func NewDefaultStatefulSetControl(podControl StatefulPodControlInterface) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl}
// and delete Pods and to create PersistentVolumeClaims. statusUpdater is the StatefulSetStatusUpdaterInterface used
// to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any
// scenario other than testing.
func NewDefaultStatefulSetControl(
podControl StatefulPodControlInterface,
statusUpdater StatefulSetStatusUpdaterInterface,
controllerHistory history.Interface) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory}
}
type defaultStatefulSetControl struct {
podControl StatefulPodControlInterface
podControl StatefulPodControlInterface
statusUpdater StatefulSetStatusUpdaterInterface
controllerHistory history.Interface
}
// UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
@ -57,20 +71,223 @@ type defaultStatefulSetControl struct {
// in no particular order. Clients using the burst strategy should be careful to ensure they
// understand the consistency implications of having unpredictable numbers of pods available.
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
// list all revisions and sort them
revisions, err := ssc.ListRevisions(set)
if err != nil {
return err
}
history.SortControllerRevisions(revisions)
// get the current, and update revisions
currentRevision, updateRevision, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
return err
}
// perform the main update function and get the status
status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, pods)
if err != nil {
return err
}
// update the set's status
err = ssc.updateStatefulSetStatus(set, status)
if err != nil {
return err
}
glog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
set.Namespace,
set.Name,
status.Replicas,
status.ReadyReplicas,
status.CurrentReplicas,
status.UpdatedReplicas)
glog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
set.Namespace,
set.Name,
status.CurrentRevision,
status.UpdateRevision)
// maintain the set's revision history limit
return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}
func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
return ssc.controllerHistory.ListControllerRevisions(set, selector)
}
func (ssc *defaultStatefulSetControl) AdoptOrphanRevisions(
set *apps.StatefulSet,
revisions []*apps.ControllerRevision) error {
for i := range revisions {
adopted, err := ssc.controllerHistory.AdoptControllerRevision(set, controllerKind, revisions[i])
if err != nil {
return err
}
revisions[i] = adopted
}
return nil
}
// truncateHistory truncates any non-live ControllerRevisions in revisions from set's history. The UpdateRevision and
// CurrentRevision in set's Status are considered to be live. Any revisions associated with the Pods in pods are also
// considered to be live. Non-live revisions are deleted, starting with the revision with the lowest Revision, until
// only RevisionHistoryLimit revisions remain. If the returned error is nil the operation was successful. This method
// expects that revisions is sorted when supplied.
func (ssc *defaultStatefulSetControl) truncateHistory(
set *apps.StatefulSet,
pods []*v1.Pod,
revisions []*apps.ControllerRevision,
current *apps.ControllerRevision,
update *apps.ControllerRevision) error {
history := make([]*apps.ControllerRevision, 0, len(revisions))
// mark all live revisions
live := map[string]bool{current.Name: true, update.Name: true}
for i := range pods {
live[getPodRevision(pods[i])] = true
}
// collect live revisions and historic revisions
for i := range revisions {
if !live[revisions[i].Name] {
history = append(history, revisions[i])
}
}
historyLen := len(history)
historyLimit := int(*set.Spec.RevisionHistoryLimit)
if historyLen <= historyLimit {
return nil
}
// delete any non-live history to maintain the revision limit.
history = history[:(historyLen - historyLimit)]
for i := 0; i < len(history); i++ {
if err := ssc.controllerHistory.DeleteControllerRevision(history[i]); err != nil {
return err
}
}
return nil
}
// getStatefulSetRevisions returns the current and update ControllerRevisions for set. This method may create a new revision,
// or modify the Revision of an existing revision if an update to set is detected. This method expects that revisions
// is sorted when supplied.
func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
set *apps.StatefulSet,
revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, error) {
var currentRevision, updateRevision *apps.ControllerRevision
revisionCount := len(revisions)
history.SortControllerRevisions(revisions)
// create a new revision from the current set
updateRevision, err := newRevision(set, nextRevision(revisions))
if err != nil {
return nil, nil, err
}
// find any equivalent revisions
equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
equalCount := len(equalRevisions)
if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
// if the equivalent revision is immediately prior the update revision has not changed
updateRevision = revisions[revisionCount-1]
} else if equalCount > 0 {
// if the equivalent revision is not immediately prior we will roll back by incrementing the
// Revision of the equivalent revision
updateRevision, err = ssc.controllerHistory.UpdateControllerRevision(
equalRevisions[equalCount-1],
updateRevision.Revision)
if err != nil {
return nil, nil, err
}
} else {
//if there is no equivalent revision we create a new one
updateRevision, err = ssc.controllerHistory.CreateControllerRevision(set, updateRevision)
if err != nil {
return nil, nil, err
}
}
// attempt to find the revision that corresponds to the current revision
for i := range revisions {
if revisions[i].Name == set.Status.CurrentRevision {
currentRevision = revisions[i]
}
}
// if the current revision is nil we initialize the history by setting it to the update revision
if currentRevision == nil {
currentRevision = updateRevision
}
return currentRevision, updateRevision, nil
}
// updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
// the set in order to conform the system to the target state for the set. The target state always contains
// set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
// RollingUpdateStatefulSetStrategyType then all Pods in the set must be at set.Status.CurrentRevision.
// If the UpdateStrategy.Type for the set is OnDeleteStatefulSetStrategyType, the target state implies nothing about
// the revisions of Pods in the set. If the UpdateStrategy.Type for the set is PartitionStatefulSetStrategyType, then
// all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other
// Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the
// update must be recorded. If the error is not nil, the method should be retried until successful.
func (ssc *defaultStatefulSetControl) updateStatefulSet(
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
// get the current and update revisions of the set.
currentSet, err := applyRevision(set, currentRevision)
if err != nil {
return nil, err
}
updateSet, err := applyRevision(set, updateRevision)
if err != nil {
return nil, err
}
// set the generation, and revisions in the returned status
status := apps.StatefulSetStatus{}
status.ObservedGeneration = new(int64)
*status.ObservedGeneration = set.Generation
status.CurrentRevision = currentRevision.Name
status.UpdateRevision = updateRevision.Name
replicaCount := int(*set.Spec.Replicas)
// slice that will contain all Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
replicas := make([]*v1.Pod, replicaCount)
// slice that will contain all Pods such that set.Spec.Replicas <= getOrdinal(pod)
condemned := make([]*v1.Pod, 0, len(pods))
ready := 0
unhealthy := 0
firstUnhealthyOrdinal := math.MaxInt32
var firstUnhealthyPod *v1.Pod
// First we partition pods into two lists valid replicas and condemned Pods
for i := range pods {
//count the number of running and ready replicas
status.Replicas++
// count the number of running and ready replicas
if isRunningAndReady(pods[i]) {
ready++
status.ReadyReplicas++
}
// count the number of current and update replicas
if isCreated(pods[i]) && !isTerminating(pods[i]) {
if getPodRevision(pods[i]) == currentRevision.Name {
status.CurrentReplicas++
} else if getPodRevision(pods[i]) == updateRevision.Name {
status.UpdatedReplicas++
}
}
if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
// if the ordinal of the pod is within the range of the current number of replicas,
// insert it at the indirection of its ordinal
@ -83,45 +300,53 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
// If the ordinal could not be parsed (ord < 0), ignore the Pod.
}
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
for ord := 0; ord < replicaCount; ord++ {
if replicas[ord] == nil {
replicas[ord] = newStatefulSetPod(set, ord)
}
}
// count the number of unhealthy pods
for i := range replicas {
if !isHealthy(replicas[i]) {
unhealthy++
}
}
for i := range condemned {
if !isHealthy(condemned[i]) {
unhealthy++
replicas[ord] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name, ord)
}
}
// sort the condemned Pods by their ordinals
sort.Sort(ascendingOrdinal(condemned))
// if the current number of replicas has changed update the statefulSets replicas
if set.Status.Replicas != int32(ready) || set.Status.ObservedGeneration == nil || set.Generation > *set.Status.ObservedGeneration {
obj, err := scheme.Scheme.Copy(set)
if err != nil {
return fmt.Errorf("unable to copy set: %v", err)
// find the first unhealthy Pod
for i := range replicas {
if !isHealthy(replicas[i]) {
unhealthy++
if ord := getOrdinal(replicas[i]); ord < firstUnhealthyOrdinal {
firstUnhealthyOrdinal = ord
firstUnhealthyPod = replicas[i]
}
}
set = obj.(*apps.StatefulSet)
}
if err := ssc.podControl.UpdateStatefulSetStatus(set, int32(ready), set.Generation); err != nil {
return err
for i := range condemned {
if !isHealthy(condemned[i]) {
unhealthy++
if ord := getOrdinal(condemned[i]); ord < firstUnhealthyOrdinal {
firstUnhealthyOrdinal = ord
firstUnhealthyPod = condemned[i]
}
}
}
if unhealthy > 0 {
glog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s",
set.Namespace,
set.Name,
unhealthy,
firstUnhealthyPod.Name)
}
// If the StatefulSet is being deleted, don't do anything other than updating
// status.
if set.DeletionTimestamp != nil {
return nil
return &status, nil
}
monotonic := !allowsBurst(set)
@ -130,20 +355,41 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
for i := range replicas {
// delete and recreate failed pods
if isFailed(replicas[i]) {
glog.V(2).Infof("StatefulSet %s is recreating failed Pod %s", set.Name, replicas[i].Name)
glog.V(4).Infof("StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return err
return &status, err
}
replicas[i] = newStatefulSetPod(set, i)
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas--
} else if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas--
}
status.Replicas--
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
i)
}
// If we find a Pod that has not been created we create the Pod
if !isCreated(replicas[i]) {
if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
return err
return &status, err
}
status.Replicas++
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas++
} else if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas++
}
// if the set does not allow bursting, return immediately
if monotonic {
return nil
return &status, nil
}
// pod created, no more work possible for this round
continue
@ -151,15 +397,23 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) && monotonic {
glog.V(2).Infof("StatefulSet %s is waiting for Pod %s to Terminate", set.Name, replicas[i].Name)
return nil
glog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// If we have a Pod that has been created but is not running and ready we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Running and Ready.
if !isRunningAndReady(replicas[i]) && monotonic {
glog.V(2).Infof("StatefulSet %s is waiting for Pod %s to be Running and Ready", set.Name, replicas[i].Name)
return nil
glog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// Enforce the StatefulSet invariants
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
@ -168,31 +422,120 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
// Make a deep copy so we don't mutate the shared cache
copy, err := scheme.Scheme.DeepCopy(replicas[i])
if err != nil {
return err
return &status, err
}
replica := copy.(*v1.Pod)
if err := ssc.podControl.UpdateStatefulPod(set, replica); err != nil {
return err
if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
return &status, err
}
}
// At this point, all of the current Replicas are Running and Ready, we can consider termination.
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
// We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas).
// Note that we do not resurrect Pods in this interval.
if unhealthy > 0 && monotonic {
glog.V(2).Infof("StatefulSet %s is waiting on %d Pods", set.Name, unhealthy)
return nil
}
// Note that we do not resurrect Pods in this interval. Also not that scaling will take precedence over
// updates.
for target := len(condemned) - 1; target >= 0; target-- {
glog.V(2).Infof("StatefulSet %s terminating Pod %s", set.Name, condemned[target])
// wait for terminating pods to expire
if isTerminating(condemned[target]) {
glog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate prior to scale down",
set.Namespace,
set.Name,
condemned[target].Name)
// block if we are in monotonic mode
if monotonic {
return &status, nil
}
continue
}
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
glog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to scale down",
set.Namespace,
set.Name,
firstUnhealthyPod.Name)
return &status, nil
}
glog.V(4).Infof("StatefulSet %s/%s terminating Pod %s for scale dowm",
set.Namespace,
set.Name,
condemned[target].Name)
if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
return err
return &status, err
}
if getPodRevision(condemned[target]) == currentRevision.Name {
status.CurrentReplicas--
} else if getPodRevision(condemned[target]) == updateRevision.Name {
status.UpdatedReplicas--
}
if monotonic {
return nil
return &status, nil
}
}
// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
return &status, nil
}
// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
updateMin := 0
if set.Spec.UpdateStrategy.Type == apps.PartitionStatefulSetStrategyType {
updateMin = int(set.Spec.UpdateStrategy.Partition.Ordinal)
}
// we terminate the Pod with the largest ordinal that does not match the update revision.
for target := len(replicas) - 1; target >= updateMin; target-- {
// all replicas should be healthy before an update progresses we allow termination of the firstUnhealthy
// Pod in any state allow for rolling back a failed update.
if !isRunningAndReady(replicas[target]) && replicas[target] != firstUnhealthyPod {
glog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to update",
set.Namespace,
set.Name,
firstUnhealthyPod.Name)
return &status, nil
}
if getPodRevision(replicas[target]) != updateRevision.Name {
glog.V(4).Infof("StatefulSet %s/%s terminating Pod %s for update",
set.Namespace,
set.Name,
replicas[target].Name)
err := ssc.podControl.DeleteStatefulPod(set, replicas[target])
status.CurrentReplicas--
return &status, err
}
}
return &status, nil
}
// updateStatefulSetStatus updates set's Status to be equal to status. If status indicates a complete update, it is
// mutated to indicate completion. If status is semantically equivalent to set's Status no update is performed. If the
// returned error is nil, the update is successful.
func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(
set *apps.StatefulSet,
status *apps.StatefulSetStatus) error {
// complete any in progress rolling update if necessary
completeRollingUpdate(set, status)
// if the status is not inconsistent do not perform an update
if !inconsistentStatus(set, status) {
return nil
}
// copy set and update its status
obj, err := scheme.Scheme.Copy(set)
if err != nil {
return err
}
set = obj.(*apps.StatefulSet)
if err := ssc.statusUpdater.UpdateStatefulSetStatus(set, status); err != nil {
return err
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,78 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statefulset
import (
"fmt"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes/scheme"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/retry"
)
// StatefulSetStatusUpdaterInterface is an interface used to update the StatefulSetStatus associated with a StatefulSet.
// For any use other than testing, clients should create an instance using NewRealStatefulSetStatusUpdater.
type StatefulSetStatusUpdaterInterface interface {
// UpdateStatefulSetStatus sets the set's Status to status. Implementations are required to retry on conflicts,
// but fail on other errors. If the returned error is nil set's Status has been successfully set to status.
UpdateStatefulSetStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) error
}
// NewRealStatefulSetStatusUpdater returns a StatefulSetStatusUpdaterInterface that updates the Status of a StatefulSet,
// using the supplied client and setLister.
func NewRealStatefulSetStatusUpdater(
client clientset.Interface,
setLister appslisters.StatefulSetLister) StatefulSetStatusUpdaterInterface {
return &realStatefulSetStatusUpdater{client, setLister}
}
type realStatefulSetStatusUpdater struct {
client clientset.Interface
setLister appslisters.StatefulSetLister
}
func (ssu *realStatefulSetStatusUpdater) UpdateStatefulSetStatus(
set *apps.StatefulSet,
status *apps.StatefulSetStatus) error {
// don't wait due to limited number of clients, but backoff after the default number of steps
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
set.Status = *status
_, updateErr := ssu.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set)
if updateErr == nil {
return nil
}
if updated, err := ssu.setLister.StatefulSets(set.Namespace).Get(set.Name); err == nil {
// make a copy so we don't mutate the shared cache
if copy, err := scheme.Scheme.DeepCopy(updated); err == nil {
set = copy.(*apps.StatefulSet)
} else {
utilruntime.HandleError(fmt.Errorf("error copying updated StatefulSet: %v", err))
}
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated StatefulSet %s/%s from lister: %v", set.Namespace, set.Name, err))
}
return updateErr
})
}
var _ StatefulSetStatusUpdaterInterface = &realStatefulSetStatusUpdater{}

View File

@ -0,0 +1,141 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statefulset
import (
"errors"
"testing"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
)
func TestStatefulSetUpdaterUpdatesSetStatus(t *testing.T) {
set := newStatefulSet(3)
status := apps.StatefulSetStatus{ObservedGeneration: func() *int64 {
i := int64(1)
return &i
}(), Replicas: 2}
fakeClient := &fake.Clientset{}
updater := NewRealStatefulSetStatusUpdater(fakeClient, nil)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), nil
})
if err := updater.UpdateStatefulSetStatus(set, &status); err != nil {
t.Errorf("Error returned on successful status update: %s", err)
}
if set.Status.Replicas != 2 {
t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.Replicas)
}
}
func TestStatefulSetStatusUpdaterUpdatesObservedGeneration(t *testing.T) {
set := newStatefulSet(3)
status := apps.StatefulSetStatus{ObservedGeneration: func() *int64 {
i := int64(3)
return &i
}(), Replicas: 2}
fakeClient := &fake.Clientset{}
updater := NewRealStatefulSetStatusUpdater(fakeClient, nil)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
sts := update.GetObject().(*apps.StatefulSet)
if sts.Status.ObservedGeneration == nil || *sts.Status.ObservedGeneration != int64(3) {
t.Errorf("expected observedGeneration to be synced with generation for statefulset %q", sts.Name)
}
return true, sts, nil
})
if err := updater.UpdateStatefulSetStatus(set, &status); err != nil {
t.Errorf("Error returned on successful status update: %s", err)
}
}
func TestStatefulSetStatusUpdaterUpdateReplicasFailure(t *testing.T) {
set := newStatefulSet(3)
status := apps.StatefulSetStatus{ObservedGeneration: func() *int64 {
i := int64(3)
return &i
}(), Replicas: 2}
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
updater := NewRealStatefulSetStatusUpdater(fakeClient, setLister)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
if err := updater.UpdateStatefulSetStatus(set, &status); err == nil {
t.Error("Failed update did not return error")
}
}
func TestStatefulSetStatusUpdaterUpdateReplicasConflict(t *testing.T) {
set := newStatefulSet(3)
status := apps.StatefulSetStatus{ObservedGeneration: func() *int64 {
i := int64(3)
return &i
}(), Replicas: 2}
conflict := false
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
updater := NewRealStatefulSetStatusUpdater(fakeClient, setLister)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
if !conflict {
conflict = true
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
} else {
return true, update.GetObject(), nil
}
})
if err := updater.UpdateStatefulSetStatus(set, &status); err != nil {
t.Errorf("UpdateStatefulSetStatus returned an error: %s", err)
}
if set.Status.Replicas != 2 {
t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.Replicas)
}
}
func TestStatefulSetStatusUpdaterUpdateReplicasConflictFailure(t *testing.T) {
set := newStatefulSet(3)
status := apps.StatefulSetStatus{ObservedGeneration: func() *int64 {
i := int64(3)
return &i
}(), Replicas: 2}
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
updater := NewRealStatefulSetStatusUpdater(fakeClient, setLister)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
})
if err := updater.UpdateStatefulSetStatus(set, &status); err == nil {
t.Error("UpdateStatefulSetStatus failed to return an error on get failure")
}
}

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
)
func alwaysReady() bool { return true }
@ -578,15 +579,18 @@ func newFakeStatefulSetController(initialObjects ...runtime.Object) (*StatefulSe
client := fake.NewSimpleClientset(initialObjects...)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
fpc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewStatefulSetController(
informerFactory.Core().V1().Pods(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Apps().V1beta1().ControllerRevisions(),
client,
)
ssh := history.NewFakeHistory(informerFactory.Apps().V1beta1().ControllerRevisions())
ssc.podListerSynced = alwaysReady
ssc.setListerSynced = alwaysReady
ssc.control = NewDefaultStatefulSetControl(fpc)
ssc.control = NewDefaultStatefulSetControl(fpc, ssu, ssh)
return ssc, fpc
}
@ -614,7 +618,7 @@ func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetControl
if err != nil {
return err
}
for set.Status.Replicas < *set.Spec.Replicas {
for set.Status.ReadyReplicas < *set.Spec.Replicas {
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
ord := len(pods) - 1
pod := getPodAtOrdinal(pods, ord)

View File

@ -17,15 +17,23 @@ limitations under the License.
package statefulset
import (
"encoding/json"
"fmt"
"regexp"
"strconv"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes/scheme"
"github.com/golang/glog"
)
@ -36,6 +44,7 @@ const maxUpdateRetries = 10
// updateConflictError is the error used to indicate that the maximum number of retries against the API server have
// been attempted and we need to back off
var updateConflictError = fmt.Errorf("aborting update after %d attempts", maxUpdateRetries)
var patchCodec = api.Codecs.LegacyCodec(apps.SchemeGroupVersion)
// overlappingStatefulSets sorts a list of StatefulSets by creation timestamp, using their names as a tie breaker.
// Generally used to tie break between StatefulSets that have overlapping selectors.
@ -246,6 +255,23 @@ func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference {
}
}
// setPodRevision sets the revision of Pod to revision by adding the StatefulSetRevisionLabel
func setPodRevision(pod *v1.Pod, revision string) {
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels[apps.StatefulSetRevisionLabel] = revision
}
// getPodRevision gets the revision of Pod by inspecting the StatefulSetRevisionLabel. If pod has no revision the empty
// string is returned.
func getPodRevision(pod *v1.Pod) string {
if pod.Labels == nil {
return ""
}
return pod.Labels[apps.StatefulSetRevisionLabel]
}
// newStatefulSetPod returns a new Pod conforming to the set's Spec with an identity generated from ordinal.
func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod {
pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, newControllerRef(set))
@ -255,6 +281,122 @@ func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod {
return pod
}
// newVersionedStatefulSetPod creates a new Pod for a StatefulSet. currentSet is the representation of the set at the
// current revision. updateSet is the representation of the set at the updateRevision. currentRevision is the name of
// the current revision. updateRevision is the name of the update revision. ordinal is the ordinal of the Pod. If the
// returned error is nil, the returned Pod is valid.
func newVersionedStatefulSetPod(currentSet, updateSet *apps.StatefulSet, currentRevision, updateRevision string, ordinal int) *v1.Pod {
if (currentSet.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
ordinal < int(currentSet.Status.CurrentReplicas)) ||
(currentSet.Spec.UpdateStrategy.Type == apps.PartitionStatefulSetStrategyType &&
ordinal < int(currentSet.Spec.UpdateStrategy.Partition.Ordinal)) {
pod := newStatefulSetPod(currentSet, ordinal)
setPodRevision(pod, currentRevision)
return pod
}
pod := newStatefulSetPod(updateSet, ordinal)
setPodRevision(pod, updateRevision)
return pod
}
// getPatch returns a strategic merge patch that can be applied to restore a StatefulSet to a
// previous version. If the returned error is nil the patch is valid. The current state that we save is just the
// PodSpecTemplate. We can modify this later to encompass more state (or less) and remain compatible with previously
// recorded patches.
func getPatch(set *apps.StatefulSet) ([]byte, error) {
str, err := runtime.Encode(patchCodec, set)
if err != nil {
return nil, err
}
var raw map[string]interface{}
json.Unmarshal([]byte(str), &raw)
objCopy := make(map[string]interface{})
specCopy := make(map[string]interface{})
spec := raw["spec"].(map[string]interface{})
template := spec["template"].(map[string]interface{})
specCopy["template"] = template
template["$patch"] = "replace"
objCopy["spec"] = specCopy
patch, err := json.Marshal(objCopy)
return patch, err
}
// newRevision creates a new ControllerRevision containing a patch that reapplies the target state of set.
// The Revision of the returned ControllerRevision is set to revision. If the returned error is nil, the returned
// ControllerRevision is valid. StatefulSet revisions are stored as patches that re-apply the current state of set
// to a new StatefulSet using a strategic merge patch to replace the saved state of the new StatefulSet.
func newRevision(set *apps.StatefulSet, revision int64) (*apps.ControllerRevision, error) {
patch, err := getPatch(set)
if err != nil {
return nil, err
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
return history.NewControllerRevision(set,
controllerKind,
selector,
runtime.RawExtension{Raw: patch},
revision)
}
// applyRevision returns a new StatefulSet constructed by restoring the state in revision to set. If the returned error
// is nil, the returned StatefulSet is valid.
func applyRevision(set *apps.StatefulSet, revision *apps.ControllerRevision) (*apps.StatefulSet, error) {
obj, err := scheme.Scheme.DeepCopy(set)
if err != nil {
return nil, err
}
clone := obj.(*apps.StatefulSet)
patched, err := strategicpatch.StrategicMergePatch([]byte(runtime.EncodeOrDie(patchCodec, clone)), revision.Data.Raw, clone)
if err != nil {
return nil, err
}
err = json.Unmarshal(patched, clone)
if err != nil {
return nil, err
}
return clone, nil
}
// nextRevision finds the next valid revision number based on revisions. If the length of revisions
// is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method
// assumes that revisions has been sorted by Revision.
func nextRevision(revisions []*apps.ControllerRevision) int64 {
count := len(revisions)
if count <= 0 {
return 1
}
return revisions[count-1].Revision + 1
}
// inconsistentStatus returns true if the ObservedGeneration of status is greater than set's
// Generation or if any of the status's fields do not match those of set's status.
func inconsistentStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) bool {
return set.Status.ObservedGeneration == nil ||
*status.ObservedGeneration > *set.Status.ObservedGeneration ||
status.Replicas != set.Status.Replicas ||
status.CurrentReplicas != set.Status.CurrentReplicas ||
status.ReadyReplicas != set.Status.ReadyReplicas ||
status.UpdatedReplicas != set.Status.UpdatedReplicas ||
status.CurrentRevision != set.Status.CurrentRevision ||
status.UpdateRevision != set.Status.UpdateRevision
}
// completeRollingUpdate completes a rolling update when all of set's replica Pods have been updated
// to the updateRevision. status's currentRevision is set to updateRevision and its' updateRevision
// is set to the empty string. status's currentReplicas is set to updateReplicas and its updateReplicas
// are set to 0.
func completeRollingUpdate(set *apps.StatefulSet, status *apps.StatefulSetStatus) {
if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
status.UpdatedReplicas == status.Replicas &&
status.ReadyReplicas == status.Replicas {
status.CurrentReplicas = status.UpdatedReplicas
status.CurrentRevision = status.UpdateRevision
}
}
// ascendingOrdinal is a sort.Interface that Sorts a list of Pods based on the ordinals extracted
// from the Pod. Pod's that have not been constructed by StatefulSet's have an ordinal of -1, and are therefore pushed
// to the front of the list.

View File

@ -32,6 +32,7 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
)
func TestGetParentNameAndOrdinal(t *testing.T) {
@ -287,6 +288,26 @@ func TestNewPodControllerRef(t *testing.T) {
}
}
func TestCreateApplyRevision(t *testing.T) {
set := newStatefulSet(1)
revision, err := newRevision(set, 1)
if err != nil {
t.Fatal(err)
}
set.Spec.Template.Spec.Containers[0].Name = "foo"
restoredSet, err := applyRevision(set, revision)
if err != nil {
t.Fatal(err)
}
restoredRevision, err := newRevision(restoredSet, 2)
if err != nil {
t.Fatal(err)
}
if !history.EqualRevision(revision, restoredRevision) {
t.Errorf("wanted %v got %v", string(revision.Data.Raw), string(restoredRevision.Data.Raw))
}
}
func newPVC(name string) v1.PersistentVolumeClaim {
return v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
@ -354,6 +375,11 @@ func newStatefulSetWithVolumes(replicas int, name string, petMounts []v1.VolumeM
Template: template,
VolumeClaimTemplates: claims,
ServiceName: "governingsvc",
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
RevisionHistoryLimit: func() *int32 {
limit := int32(2)
return &limit
}(),
},
}
}

View File

@ -76,7 +76,8 @@ func validNewStatefulSet() *apps.StatefulSet {
DNSPolicy: api.DNSClusterFirst,
},
},
Replicas: 7,
Replicas: 7,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
Status: apps.StatefulSetStatus{},
}

View File

@ -54,6 +54,7 @@ func TestStatefulSetStrategy(t *testing.T) {
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{MatchLabels: validSelector},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
Status: apps.StatefulSetStatus{Replicas: 3},
}
@ -74,6 +75,7 @@ func TestStatefulSetStrategy(t *testing.T) {
PodManagementPolicy: apps.OrderedReadyPodManagement,
Selector: ps.Spec.Selector,
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
Status: apps.StatefulSetStatus{Replicas: 4},
}
@ -122,9 +124,10 @@ func TestStatefulSetStatusStrategy(t *testing.T) {
oldPS := &apps.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault, ResourceVersion: "10"},
Spec: apps.StatefulSetSpec{
Replicas: 3,
Selector: &metav1.LabelSelector{MatchLabels: validSelector},
Template: validPodTemplate.Template,
Replicas: 3,
Selector: &metav1.LabelSelector{MatchLabels: validSelector},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
Status: apps.StatefulSetStatus{
Replicas: 1,
@ -133,9 +136,10 @@ func TestStatefulSetStatusStrategy(t *testing.T) {
newPS := &apps.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault, ResourceVersion: "9"},
Spec: apps.StatefulSetSpec{
Replicas: 1,
Selector: &metav1.LabelSelector{MatchLabels: validSelector},
Template: validPodTemplate.Template,
Replicas: 1,
Selector: &metav1.LabelSelector{MatchLabels: validSelector},
Template: validPodTemplate.Template,
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
},
Status: apps.StatefulSetStatus{
Replicas: 2,

View File

@ -264,6 +264,7 @@ func init() {
rbac.NewRule("get", "list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(),
rbac.NewRule("update").Groups(appsGroup).Resources("statefulsets/status").RuleOrDie(),
rbac.NewRule("get", "create", "delete", "update", "patch").Groups(legacyGroup).Resources("pods").RuleOrDie(),
rbac.NewRule("get", "create", "delete", "update", "patch", "list", "watch").Groups(appsGroup).Resources("controllerrevisions").RuleOrDie(),
rbac.NewRule("get", "create").Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie(),
eventsRule(),
},

View File

@ -975,6 +975,18 @@ items:
- get
- patch
- update
- apiGroups:
- apps
resources:
- controllerrevisions
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:

View File

@ -312,6 +312,22 @@ func (s *StatefulSetTester) waitForRunning(numStatefulPods int32, ss *apps.State
}
}
// WaitForState periodically polls for the ss and its pods until the until function returns either true or an error
func (s *StatefulSetTester) WaitForState(ss *apps.StatefulSet, until func(*apps.StatefulSet, *v1.PodList) (bool, error)) {
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
func() (bool, error) {
ssGet, err := s.c.Apps().StatefulSets(ss.Namespace).Get(ss.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
podList := s.GetPodList(ssGet)
return until(ssGet, podList)
})
if pollErr != nil {
Failf("Failed waiting for pods to enter running: %v", pollErr)
}
}
// WaitForRunningAndReady waits for numStatefulPods in ss to be Running and Ready.
func (s *StatefulSetTester) WaitForRunningAndReady(numStatefulPods int32, ss *apps.StatefulSet) {
s.waitForRunning(numStatefulPods, ss, true)
@ -365,8 +381,33 @@ func (s *StatefulSetTester) SetHealthy(ss *apps.StatefulSet) {
}
}
// WaitForStatus waits for the ss.Status.Replicas to be equal to expectedReplicas
func (s *StatefulSetTester) WaitForStatus(ss *apps.StatefulSet, expectedReplicas int32) {
// WaitForStatusReadyReplicas waits for the ss.Status.ReadyReplicas to be equal to expectedReplicas
func (s *StatefulSetTester) WaitForStatusReadyReplicas(ss *apps.StatefulSet, expectedReplicas int32) {
Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas)
ns, name := ss.Namespace, ss.Name
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
func() (bool, error) {
ssGet, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
if *ssGet.Status.ObservedGeneration < ss.Generation {
return false, nil
}
if ssGet.Status.ReadyReplicas != expectedReplicas {
Logf("Waiting for stateful set status to become %d, currently %d", expectedReplicas, ssGet.Status.Replicas)
return false, nil
}
return true, nil
})
if pollErr != nil {
Failf("Failed waiting for stateful set status.readyReplicas updated to %d: %v", expectedReplicas, pollErr)
}
}
// WaitForStatusReplicas waits for the ss.Status.Replicas to be equal to expectedReplicas
func (s *StatefulSetTester) WaitForStatusReplicas(ss *apps.StatefulSet, expectedReplicas int32) {
Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas)
ns, name := ss.Namespace, ss.Name
@ -416,7 +457,7 @@ func DeleteAllStatefulSets(c clientset.Interface, ns string) {
if err := sst.Scale(&ss, 0); err != nil {
errList = append(errList, fmt.Sprintf("%v", err))
}
sst.WaitForStatus(&ss, 0)
sst.WaitForStatusReplicas(&ss, 0)
Logf("Deleting statefulset %v", ss.Name)
// Use OrphanDependents=false so it's deleted synchronously.
// We already made sure the Pods are gone inside Scale().
@ -561,6 +602,7 @@ func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulP
Volumes: vols,
},
},
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
VolumeClaimTemplates: claims,
ServiceName: governingSvcName,
},

View File

@ -31,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/kubernetes/pkg/api/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/controller"
@ -250,8 +249,11 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
It("should allow template updates", func() {
By("Creating stateful set " + ssName + " in namespace " + ns)
*(ss.Spec.Replicas) = 2
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80}}}}
ss := framework.NewStatefulSet("ss2", ns, headlessSvcName, 2, nil, nil, labels)
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
ss, err := c.Apps().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred())
@ -268,79 +270,21 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
})
Expect(err).NotTo(HaveOccurred())
updateIndex := 0
By(fmt.Sprintf("Deleting stateful pod at index %d", updateIndex))
sst.DeleteStatefulPodAtIndex(updateIndex, ss)
By("Waiting for all stateful pods to be running again")
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
By(fmt.Sprintf("Verifying stateful pod at index %d is updated", updateIndex))
verify := func(pod *v1.Pod) {
podImage := pod.Spec.Containers[0].Image
Expect(podImage).To(Equal(newImage), fmt.Sprintf("Expected stateful pod image %s updated to %s", podImage, newImage))
}
sst.VerifyPodAtIndex(updateIndex, ss, verify)
})
It("Scaling down before scale up is finished should wait until current pod will be running and ready before it will be removed", func() {
By("Creating stateful set " + ssName + " in namespace " + ns + ", and pausing scale operations after each pod")
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80}}}}
ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, labels)
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
framework.SetStatefulSetInitializedAnnotation(ss, "false")
ss, err := c.Apps().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred())
sst := framework.NewStatefulSetTester(c)
sst.WaitForRunningAndReady(1, ss)
By("Scaling up stateful set " + ssName + " to 3 replicas and pausing after 2nd pod")
sst.SetHealthy(ss)
sst.UpdateReplicas(ss, 3)
sst.WaitForRunningAndReady(2, ss)
By("Before scale up finished setting 2nd pod to be not ready by breaking readiness probe")
sst.BreakProbe(ss, testProbe)
sst.WaitForStatus(ss, 0)
sst.WaitForRunningAndNotReady(2, ss)
By("Continue scale operation after the 2nd pod, and scaling down to 1 replica")
sst.SetHealthy(ss)
sst.UpdateReplicas(ss, 1)
By("Verifying that the 2nd pod wont be removed if it is not running and ready")
sst.ConfirmStatefulPodCount(2, ss, 10*time.Second, true)
expectedPodName := ss.Name + "-1"
expectedPod, err := f.ClientSet.Core().Pods(ns).Get(expectedPodName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
By("Verifying the 2nd pod is removed only when it becomes running and ready")
sst.RestoreProbe(ss, testProbe)
watcher, err := f.ClientSet.Core().Pods(ns).Watch(metav1.SingleObject(
metav1.ObjectMeta{
Name: expectedPod.Name,
ResourceVersion: expectedPod.ResourceVersion,
},
))
Expect(err).NotTo(HaveOccurred())
_, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) {
pod := event.Object.(*v1.Pod)
if event.Type == watch.Deleted && pod.Name == expectedPodName {
return false, fmt.Errorf("Pod %v was deleted before enter running", pod.Name)
}
framework.Logf("Observed event %v for pod %v. Phase %v, Pod is ready %v",
event.Type, pod.Name, pod.Status.Phase, podutil.IsPodReady(pod))
if pod.Name != expectedPodName {
sst.WaitForState(ss, func(set *apps.StatefulSet, pods *v1.PodList) (bool, error) {
if len(pods.Items) < 2 {
return false, nil
}
if pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod) {
return true, nil
for i := range pods.Items {
if pods.Items[i].Spec.Containers[0].Image != newImage {
framework.Logf("Waiting for pod %s to have image %s current image %s",
pods.Items[i].Name,
newImage,
pods.Items[i].Spec.Containers[0].Image)
return false, nil
}
}
return false, nil
return true, nil
})
Expect(err).NotTo(HaveOccurred())
})
It("Scaling should happen in predictable order and halt if any stateful pod is unhealthy", func() {
@ -367,7 +311,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
By("Confirming that stateful set scale up will halt with unhealthy stateful pod")
sst.BreakProbe(ss, testProbe)
sst.WaitForRunningAndNotReady(*ss.Spec.Replicas, ss)
sst.WaitForStatus(ss, 0)
sst.WaitForStatusReadyReplicas(ss, 0)
sst.UpdateReplicas(ss, 3)
sst.ConfirmStatefulPodCount(1, ss, 10*time.Second, true)
@ -397,7 +341,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
Expect(err).NotTo(HaveOccurred())
sst.BreakProbe(ss, testProbe)
sst.WaitForStatus(ss, 0)
sst.WaitForStatusReadyReplicas(ss, 0)
sst.WaitForRunningAndNotReady(3, ss)
sst.UpdateReplicas(ss, 0)
sst.ConfirmStatefulPodCount(3, ss, 10*time.Second, true)
@ -442,7 +386,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
By("Confirming that stateful set scale up will not halt with unhealthy stateful pod")
sst.BreakProbe(ss, testProbe)
sst.WaitForRunningAndNotReady(*ss.Spec.Replicas, ss)
sst.WaitForStatus(ss, 0)
sst.WaitForStatusReadyReplicas(ss, 0)
sst.UpdateReplicas(ss, 3)
sst.ConfirmStatefulPodCount(3, ss, 10*time.Second, false)
@ -452,7 +396,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
By("Scale down will not halt with unhealthy stateful pod")
sst.BreakProbe(ss, testProbe)
sst.WaitForStatus(ss, 0)
sst.WaitForStatusReadyReplicas(ss, 0)
sst.WaitForRunningAndNotReady(3, ss)
sst.UpdateReplicas(ss, 0)
sst.ConfirmStatefulPodCount(0, ss, 10*time.Second, false)
@ -460,7 +404,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns)
sst.RestoreProbe(ss, testProbe)
sst.Scale(ss, 0)
sst.WaitForStatus(ss, 0)
sst.WaitForStatusReadyReplicas(ss, 0)
})
It("Should recreate evicted statefulset", func() {