mirror of https://github.com/k3s-io/k3s
Scaler and reaper for petset
parent
4601ac7643
commit
30f3cb9d26
|
@ -22,6 +22,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/apps"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
@ -72,6 +73,17 @@ func ReplicaSetHasDesiredReplicas(c ExtensionsInterface, replicaSet *extensions.
|
|||
}
|
||||
}
|
||||
|
||||
func PetSetHasDesiredPets(c AppsInterface, petset *apps.PetSet) wait.ConditionFunc {
|
||||
// TODO: Differentiate between 0 pets and a really quick scale down using generation.
|
||||
return func() (bool, error) {
|
||||
ps, err := c.PetSets(petset.Namespace).Get(petset.Name)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return ps.Status.Replicas == ps.Spec.Replicas, nil
|
||||
}
|
||||
}
|
||||
|
||||
// JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count
|
||||
// for a job equals the current active counts or is less by an appropriate successful/unsuccessful count.
|
||||
func JobHasDesiredParallelism(c BatchInterface, job *batch.Job) wait.ConditionFunc {
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/apps"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
|
@ -51,6 +52,8 @@ func ScalerFor(kind unversioned.GroupKind, c client.Interface) (Scaler, error) {
|
|||
return &ReplicaSetScaler{c.Extensions()}, nil
|
||||
case extensions.Kind("Job"), batch.Kind("Job"):
|
||||
return &JobScaler{c.Batch()}, nil // Either kind of job can be scaled with Batch interface.
|
||||
case apps.Kind("PetSet"):
|
||||
return &PetSetScaler{c.Apps()}, nil
|
||||
case extensions.Kind("Deployment"):
|
||||
return &DeploymentScaler{c.Extensions()}, nil
|
||||
}
|
||||
|
@ -126,6 +129,17 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s
|
|||
}
|
||||
}
|
||||
|
||||
// ValidatePetSet ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
|
||||
func (precondition *ScalePrecondition) ValidatePetSet(ps *apps.PetSet) error {
|
||||
if precondition.Size != -1 && int(ps.Spec.Replicas) != precondition.Size {
|
||||
return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(ps.Spec.Replicas))}
|
||||
}
|
||||
if len(precondition.ResourceVersion) != 0 && ps.ResourceVersion != precondition.ResourceVersion {
|
||||
return PreconditionError{"resource version", precondition.ResourceVersion, ps.ResourceVersion}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateReplicationController ensures that the preconditions match. Returns nil if they are valid, an error otherwise
|
||||
func (precondition *ScalePrecondition) ValidateReplicationController(controller *api.ReplicationController) error {
|
||||
if precondition.Size != -1 && int(controller.Spec.Replicas) != precondition.Size {
|
||||
|
@ -276,6 +290,56 @@ func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type PetSetScaler struct {
|
||||
c client.AppsInterface
|
||||
}
|
||||
|
||||
func (scaler *PetSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error {
|
||||
ps, err := scaler.c.PetSets(namespace).Get(name)
|
||||
if err != nil {
|
||||
return ScaleError{ScaleGetFailure, "Unknown", err}
|
||||
}
|
||||
if preconditions != nil {
|
||||
if err := preconditions.ValidatePetSet(ps); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
ps.Spec.Replicas = int(newSize)
|
||||
if _, err := scaler.c.PetSets(namespace).Update(ps); err != nil {
|
||||
if errors.IsConflict(err) {
|
||||
return ScaleError{ScaleUpdateConflictFailure, ps.ResourceVersion, err}
|
||||
}
|
||||
return ScaleError{ScaleUpdateFailure, ps.ResourceVersion, err}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scaler *PetSetScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
|
||||
if preconditions == nil {
|
||||
preconditions = &ScalePrecondition{-1, ""}
|
||||
}
|
||||
if retry == nil {
|
||||
// Make it try only once, immediately
|
||||
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
|
||||
}
|
||||
cond := ScaleCondition(scaler, preconditions, namespace, name, newSize)
|
||||
if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil {
|
||||
return err
|
||||
}
|
||||
if waitForReplicas != nil {
|
||||
job, err := scaler.c.PetSets(namespace).Get(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, client.PetSetHasDesiredPets(scaler.c, job))
|
||||
if err == wait.ErrWaitTimeout {
|
||||
return fmt.Errorf("timed out waiting for %q to be synced", name)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type JobScaler struct {
|
||||
c client.BatchInterface
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/apps"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
|
@ -82,6 +83,9 @@ func ReaperFor(kind unversioned.GroupKind, c client.Interface) (Reaper, error) {
|
|||
case extensions.Kind("Job"), batch.Kind("Job"):
|
||||
return &JobReaper{c, Interval, Timeout}, nil
|
||||
|
||||
case apps.Kind("PetSet"):
|
||||
return &PetSetReaper{c, Interval, Timeout}, nil
|
||||
|
||||
case extensions.Kind("Deployment"):
|
||||
return &DeploymentReaper{c, Interval, Timeout}, nil
|
||||
|
||||
|
@ -119,6 +123,10 @@ type PodReaper struct {
|
|||
type ServiceReaper struct {
|
||||
client.Interface
|
||||
}
|
||||
type PetSetReaper struct {
|
||||
client.Interface
|
||||
pollInterval, timeout time.Duration
|
||||
}
|
||||
|
||||
type objInterface interface {
|
||||
Delete(name string) error
|
||||
|
@ -307,6 +315,7 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio
|
|||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return updatedDS.Status.CurrentNumberScheduled+updatedDS.Status.NumberMisscheduled == 0, nil
|
||||
}); err != nil {
|
||||
return err
|
||||
|
@ -315,6 +324,53 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio
|
|||
return reaper.Extensions().DaemonSets(namespace).Delete(name)
|
||||
}
|
||||
|
||||
func (reaper *PetSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
|
||||
petsets := reaper.Apps().PetSets(namespace)
|
||||
scaler, err := ScalerFor(apps.Kind("PetSet"), *reaper)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ps, err := petsets.Get(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if timeout == 0 {
|
||||
numPets := ps.Spec.Replicas
|
||||
timeout = Timeout + time.Duration(10*numPets)*time.Second
|
||||
}
|
||||
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
|
||||
waitForPetSet := NewRetryParams(reaper.pollInterval, reaper.timeout)
|
||||
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForPetSet); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: This shouldn't be needed, see corresponding TODO in PetSetHasDesiredPets.
|
||||
// PetSet should track generation number.
|
||||
pods := reaper.Pods(namespace)
|
||||
selector, _ := unversioned.LabelSelectorAsSelector(ps.Spec.Selector)
|
||||
options := api.ListOptions{LabelSelector: selector}
|
||||
podList, err := pods.List(options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
errList := []error{}
|
||||
for _, pod := range podList.Items {
|
||||
if err := pods.Delete(pod.Name, gracePeriod); err != nil {
|
||||
if !errors.IsNotFound(err) {
|
||||
errList = append(errList, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(errList) > 0 {
|
||||
return utilerrors.NewAggregate(errList)
|
||||
}
|
||||
|
||||
// TODO: Cleanup volumes? We don't want to accidentaly delete volumes from
|
||||
// stop, so just leave this up to the the petset.
|
||||
return petsets.Delete(name, nil)
|
||||
}
|
||||
|
||||
func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
|
||||
jobs := reaper.Batch().Jobs(namespace)
|
||||
pods := reaper.Pods(namespace)
|
||||
|
|
Loading…
Reference in New Issue