k3s/pkg/controller/deployment/deployment_controller.go

280 lines
9.7 KiB
Go
Raw Normal View History

2015-09-03 00:02:22 +00:00
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deployment
import (
"fmt"
"math"
2015-09-03 00:02:22 +00:00
"time"
"github.com/golang/glog"
2015-09-03 00:02:22 +00:00
"k8s.io/kubernetes/pkg/api"
2015-10-09 22:04:41 +00:00
"k8s.io/kubernetes/pkg/apis/extensions"
2015-09-29 23:55:06 +00:00
"k8s.io/kubernetes/pkg/client/record"
2015-09-03 00:02:22 +00:00
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
2015-10-19 21:08:35 +00:00
deploymentutil "k8s.io/kubernetes/pkg/util/deployment"
2015-09-03 00:02:22 +00:00
)
type DeploymentController struct {
2015-09-29 23:55:06 +00:00
client client.Interface
2015-10-12 18:05:52 +00:00
expClient client.ExtensionsInterface
2015-09-29 23:55:06 +00:00
eventRecorder record.EventRecorder
2015-09-03 00:02:22 +00:00
}
func New(client client.Interface) *DeploymentController {
2015-09-29 23:55:06 +00:00
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(client.Events(""))
2015-09-03 00:02:22 +00:00
return &DeploymentController{
2015-09-29 23:55:06 +00:00
client: client,
2015-10-12 18:18:50 +00:00
expClient: client.Extensions(),
2015-09-29 23:55:06 +00:00
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
2015-09-03 00:02:22 +00:00
}
}
func (d *DeploymentController) Run(syncPeriod time.Duration) {
go util.Until(func() {
errs := d.reconcileDeployments()
for _, err := range errs {
glog.Errorf("Failed to reconcile: %v", err)
2015-09-03 00:02:22 +00:00
}
}, syncPeriod, util.NeverStop)
}
func (d *DeploymentController) reconcileDeployments() []error {
2015-09-03 00:02:22 +00:00
list, err := d.expClient.Deployments(api.NamespaceAll).List(labels.Everything(), fields.Everything())
if err != nil {
return []error{fmt.Errorf("error listing deployments: %v", err)}
2015-09-03 00:02:22 +00:00
}
errs := []error{}
2015-09-03 00:02:22 +00:00
for _, deployment := range list.Items {
if err := d.reconcileDeployment(&deployment); err != nil {
errs = append(errs, fmt.Errorf("error in reconciling deployment %s: %v", deployment.Name, err))
2015-09-03 00:02:22 +00:00
}
}
return errs
2015-09-03 00:02:22 +00:00
}
2015-10-09 22:49:10 +00:00
func (d *DeploymentController) reconcileDeployment(deployment *extensions.Deployment) error {
switch deployment.Spec.Strategy.Type {
2015-10-09 22:49:10 +00:00
case extensions.RecreateDeploymentStrategyType:
return d.reconcileRecreateDeployment(*deployment)
2015-10-09 22:49:10 +00:00
case extensions.RollingUpdateDeploymentStrategyType:
return d.reconcileRollingUpdateDeployment(*deployment)
}
return fmt.Errorf("unexpected deployment strategy type: %s", deployment.Spec.Strategy.Type)
}
2015-10-09 22:49:10 +00:00
func (d *DeploymentController) reconcileRecreateDeployment(deployment extensions.Deployment) error {
// TODO: implement me.
return nil
}
2015-10-09 22:49:10 +00:00
func (d *DeploymentController) reconcileRollingUpdateDeployment(deployment extensions.Deployment) error {
newRC, err := d.getNewRC(deployment)
if err != nil {
return err
}
oldRCs, err := d.getOldRCs(deployment)
if err != nil {
return err
}
2015-09-29 23:55:06 +00:00
allRCs := append(oldRCs, newRC)
// Scale up, if we can.
2015-10-07 20:13:18 +00:00
scaledUp, err := d.reconcileNewRC(allRCs, newRC, deployment)
2015-09-03 00:02:22 +00:00
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return d.updateDeploymentStatus(allRCs, newRC, deployment)
}
// Scale down, if we can.
2015-10-07 20:13:18 +00:00
scaledDown, err := d.reconcileOldRCs(allRCs, oldRCs, newRC, deployment)
2015-09-03 00:02:22 +00:00
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return d.updateDeploymentStatus(allRCs, newRC, deployment)
2015-09-03 00:02:22 +00:00
}
// TODO: raise an event, neither scaled up nor down.
2015-09-03 00:02:22 +00:00
return nil
}
2015-10-09 22:49:10 +00:00
func (d *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) {
2015-10-19 21:08:35 +00:00
return deploymentutil.GetOldRCs(deployment, d.client)
2015-09-03 00:02:22 +00:00
}
// Returns an RC that matches the intent of the given deployment.
// It creates a new RC if required.
2015-10-09 22:49:10 +00:00
func (d *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.ReplicationController, error) {
2015-10-19 21:08:35 +00:00
existingNewRC, err := deploymentutil.GetNewRC(deployment, d.client)
if err != nil || existingNewRC != nil {
return existingNewRC, err
2015-09-03 00:02:22 +00:00
}
// new RC does not exist, create one.
namespace := deployment.ObjectMeta.Namespace
2015-10-19 21:08:35 +00:00
podTemplateSpecHash := deploymentutil.GetPodTemplateSpecHash(deployment.Spec.Template)
2015-09-03 00:02:22 +00:00
rcName := fmt.Sprintf("deploymentrc-%d", podTemplateSpecHash)
2015-10-19 21:08:35 +00:00
newRCTemplate := deploymentutil.GetNewRCTemplate(deployment)
// Add podTemplateHash label to selector.
2015-10-19 21:08:35 +00:00
newRCSelector := deploymentutil.CloneAndAddLabel(deployment.Spec.Selector, deployment.Spec.UniqueLabelKey, podTemplateSpecHash)
newRC := api.ReplicationController{
2015-09-03 00:02:22 +00:00
ObjectMeta: api.ObjectMeta{
Name: rcName,
Namespace: namespace,
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Selector: newRCSelector,
Template: &newRCTemplate,
2015-09-03 00:02:22 +00:00
},
}
createdRC, err := d.client.ReplicationControllers(namespace).Create(&newRC)
2015-09-03 00:02:22 +00:00
if err != nil {
return nil, fmt.Errorf("error creating replication controller: %v", err)
}
return createdRC, nil
}
2015-10-09 22:49:10 +00:00
func (d *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) {
if newRC.Spec.Replicas == deployment.Spec.Replicas {
2015-10-07 20:13:18 +00:00
// Scaling not required.
return false, nil
}
2015-10-07 20:13:18 +00:00
if newRC.Spec.Replicas > deployment.Spec.Replicas {
// Scale down.
_, err := d.scaleRCAndRecordEvent(newRC, deployment.Spec.Replicas, deployment)
return true, err
}
// Check if we can scale up.
maxSurge, isPercent, err := util.GetIntOrPercentValue(&deployment.Spec.Strategy.RollingUpdate.MaxSurge)
if err != nil {
return false, fmt.Errorf("invalid value for MaxSurge: %v", err)
}
if isPercent {
maxSurge = util.GetValueFromPercent(maxSurge, deployment.Spec.Replicas)
}
// Find the total number of pods
2015-10-19 21:08:35 +00:00
currentPodCount := deploymentutil.GetReplicaCountForRCs(allRCs)
maxTotalPods := deployment.Spec.Replicas + maxSurge
if currentPodCount >= maxTotalPods {
// Cannot scale up.
return false, nil
}
// Scale up.
scaleUpCount := maxTotalPods - currentPodCount
2015-10-07 20:13:18 +00:00
// Do not exceed the number of desired replicas.
scaleUpCount = int(math.Min(float64(scaleUpCount), float64(deployment.Spec.Replicas-newRC.Spec.Replicas)))
2015-09-29 23:55:06 +00:00
newReplicasCount := newRC.Spec.Replicas + scaleUpCount
2015-10-07 20:13:18 +00:00
_, err = d.scaleRCAndRecordEvent(newRC, newReplicasCount, deployment)
return true, err
}
2015-10-09 22:49:10 +00:00
func (d *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) {
2015-10-19 21:08:35 +00:00
oldPodsCount := deploymentutil.GetReplicaCountForRCs(oldRCs)
if oldPodsCount == 0 {
// Cant scale down further
return false, nil
}
maxUnavailable, isPercent, err := util.GetIntOrPercentValue(&deployment.Spec.Strategy.RollingUpdate.MaxUnavailable)
if err != nil {
return false, fmt.Errorf("invalid value for MaxUnavailable: %v", err)
}
if isPercent {
maxUnavailable = util.GetValueFromPercent(maxUnavailable, deployment.Spec.Replicas)
}
// Check if we can scale down.
minAvailable := deployment.Spec.Replicas - maxUnavailable
// Find the number of ready pods.
2015-10-19 21:08:35 +00:00
readyPodCount, err := deploymentutil.GetAvailablePodsForRCs(d.client, allRCs)
2015-09-29 23:55:06 +00:00
if err != nil {
return false, fmt.Errorf("could not find available pods: %v", err)
}
if readyPodCount <= minAvailable {
// Cannot scale down.
return false, nil
}
totalScaleDownCount := readyPodCount - minAvailable
for _, targetRC := range oldRCs {
if totalScaleDownCount == 0 {
// No further scaling required.
break
}
if targetRC.Spec.Replicas == 0 {
// cannot scale down this RC.
continue
}
// Scale down.
scaleDownCount := int(math.Min(float64(targetRC.Spec.Replicas), float64(totalScaleDownCount)))
2015-09-29 23:55:06 +00:00
newReplicasCount := targetRC.Spec.Replicas - scaleDownCount
2015-10-07 20:13:18 +00:00
_, err = d.scaleRCAndRecordEvent(targetRC, newReplicasCount, deployment)
if err != nil {
return false, err
}
totalScaleDownCount -= scaleDownCount
}
return true, err
}
2015-10-09 22:49:10 +00:00
func (d *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) error {
2015-10-19 21:08:35 +00:00
totalReplicas := deploymentutil.GetReplicaCountForRCs(allRCs)
updatedReplicas := deploymentutil.GetReplicaCountForRCs([]*api.ReplicationController{newRC})
newDeployment := deployment
// TODO: Reconcile this with API definition. API definition talks about ready pods, while this just computes created pods.
2015-10-09 22:49:10 +00:00
newDeployment.Status = extensions.DeploymentStatus{
Replicas: totalReplicas,
UpdatedReplicas: updatedReplicas,
}
2015-10-17 01:01:08 +00:00
_, err := d.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment)
return err
}
2015-10-09 22:49:10 +00:00
func (d *DeploymentController) scaleRCAndRecordEvent(rc *api.ReplicationController, newScale int, deployment extensions.Deployment) (*api.ReplicationController, error) {
2015-10-07 20:13:18 +00:00
scalingOperation := "down"
if rc.Spec.Replicas < newScale {
scalingOperation = "up"
}
newRC, err := d.scaleRC(rc, newScale)
if err == nil {
d.eventRecorder.Eventf(&deployment, "ScalingRC", "Scaled %s rc %s to %d", scalingOperation, rc.Name, newScale)
}
return newRC, err
}
func (d *DeploymentController) scaleRC(rc *api.ReplicationController, newScale int) (*api.ReplicationController, error) {
// TODO: Using client for now, update to use store when it is ready.
rc.Spec.Replicas = newScale
return d.client.ReplicationControllers(rc.ObjectMeta.Namespace).Update(rc)
}
2015-10-09 22:49:10 +00:00
func (d *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) {
// TODO: Using client for now, update to use store when it is ready.
2015-10-12 18:18:50 +00:00
return d.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
}