Merge pull request #3061 from jlowdermilk/rollingupdate

Add a rollingupdate lib and command to kubectl
pull/6/head
bgrant0607 2015-01-07 10:33:39 -08:00
commit 6e24273937
8 changed files with 576 additions and 5 deletions

View File

@ -251,7 +251,7 @@ func runReplicationControllerTest(c *client.Client) {
glog.Infof("Done creating replication controllers")
// Give the controllers some time to actually create the pods
if err := wait.Poll(time.Second, time.Second*30, c.ControllerHasDesiredReplicas(controller)); err != nil {
if err := wait.Poll(time.Second, time.Second*30, client.ControllerHasDesiredReplicas(c, &controller)); err != nil {
glog.Fatalf("FAILED: pods never created %v", err)
}

View File

@ -18,18 +18,17 @@ package client
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
)
// ControllerHasDesiredReplicas returns a condition that will be true iff the desired replica count
// for a controller's ReplicaSelector equals the Replicas count.
func (c *Client) ControllerHasDesiredReplicas(controller api.ReplicationController) wait.ConditionFunc {
func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc {
return func() (bool, error) {
pods, err := c.Pods(controller.Namespace).List(labels.Set(controller.Spec.Selector).AsSelector())
ctrl, err := c.ReplicationControllers(controller.Namespace).Get(controller.Name)
if err != nil {
return false, err
}
return len(pods.Items) == controller.Spec.Replicas, nil
return ctrl.Status.Replicas == ctrl.Spec.Replicas, nil
}
}

View File

@ -118,6 +118,7 @@ Find more information at https://github.com/GoogleCloudPlatform/kubernetes.`,
cmds.AddCommand(NewCmdNamespace(out))
cmds.AddCommand(f.NewCmdLog(out))
cmds.AddCommand(f.NewCmdRollingUpdate(out))
if err := cmds.Execute(); err != nil {
os.Exit(1)

View File

@ -24,6 +24,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"
"github.com/golang/glog"
"github.com/spf13/cobra"
@ -82,6 +83,16 @@ func GetFlagInt(cmd *cobra.Command, flag string) int {
return v
}
func GetFlagDuration(cmd *cobra.Command, flag string) time.Duration {
f := cmd.Flags().Lookup(flag)
if f == nil {
glog.Fatalf("Flag accessed but not defined for command %s: %s", cmd.Name(), flag)
}
v, err := time.ParseDuration(f.Value.String())
checkErr(err)
return v
}
// Returns the first non-empty string out of the ones provided. If all
// strings are empty, returns an empty string.
func FirstNonEmptyString(args ...string) string {

View File

@ -0,0 +1,109 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"fmt"
"io"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
"github.com/spf13/cobra"
)
const (
updatePeriod = "1m0s"
timeout = "5m0s"
pollInterval = "3s"
)
func (f *Factory) NewCmdRollingUpdate(out io.Writer) *cobra.Command {
cmd := &cobra.Command{
Use: "rollingupdate <old-controller-name> -f <new-controller.json>",
Short: "Perform a rolling update of the given replicationController",
Long: `Perform a rolling update of the given replicationController.",
Replaces named controller with new controller, updating one pod at a time to use the
new PodTemplate. The new-controller.json must specify the same namespace as the
existing controller and overwrite at least one (common) label in its replicaSelector.
Examples:
$ kubectl rollingupdate frontend-v1 -f frontend-v2.json
<update pods of frontend-v1 using new controller data in frontend-v2.json>
$ cat frontend-v2.json | kubectl rollingupdate frontend-v1 -f -
<update pods of frontend-v1 using json data passed into stdin>`,
Run: func(cmd *cobra.Command, args []string) {
filename := GetFlagString(cmd, "filename")
if len(filename) == 0 {
usageError(cmd, "Must specify filename for new controller")
}
period := GetFlagDuration(cmd, "update-period")
interval := GetFlagDuration(cmd, "poll-interval")
timeout := GetFlagDuration(cmd, "timeout")
if len(args) != 1 {
usageError(cmd, "Must specify the controller to update")
}
oldName := args[0]
schema, err := f.Validator(cmd)
checkErr(err)
mapping, namespace, newName, data := ResourceFromFile(cmd, filename, f.Typer, f.Mapper, schema)
if mapping.Kind != "ReplicationController" {
usageError(cmd, "%s does not specify a valid ReplicationController", filename)
}
err = CompareNamespaceFromFile(cmd, namespace)
checkErr(err)
client, err := f.ClientBuilder.Client()
checkErr(err)
obj, err := mapping.Codec.Decode(data)
checkErr(err)
newRc := obj.(*api.ReplicationController)
updater := kubectl.NewRollingUpdater(namespace, client)
// fetch rc
oldRc, err := client.ReplicationControllers(namespace).Get(oldName)
checkErr(err)
var hasLabel bool
for key, oldValue := range oldRc.Spec.Selector {
if newValue, ok := newRc.Spec.Selector[key]; ok && newValue != oldValue {
hasLabel = true
break
}
}
if !hasLabel {
usageError(cmd, "%s must specify a matching key with non-equal value in Selector for %s",
filename, oldName)
}
// TODO: handle resizes during rolling update
if newRc.Spec.Replicas == 0 {
newRc.Spec.Replicas = oldRc.Spec.Replicas
}
err = updater.Update(out, oldRc, newRc, period, interval, timeout)
checkErr(err)
fmt.Fprintf(out, "%s\n", newName)
},
}
cmd.Flags().String("update-period", updatePeriod, `Time to wait between updating pods. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".`)
cmd.Flags().String("poll-interval", pollInterval, `Time delay between polling controller status after update. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".`)
cmd.Flags().String("timeout", timeout, `Max time to wait for a controller to update before giving up. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".`)
cmd.Flags().StringP("filename", "f", "", "Filename or URL to file to use to create the new controller")
return cmd
}

View File

@ -34,6 +34,8 @@ import (
var apiVersionToUse = "v1beta1"
const kubectlAnnotationPrefix = "kubectl.kubernetes.io/"
func GetKubeClient(config *client.Config, matchVersion bool) (*client.Client, error) {
// TODO: get the namespace context when kubectl ns is completed
c, err := client.New(config)

View File

@ -0,0 +1,173 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubectl
import (
"fmt"
"io"
"strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
)
// RollingUpdater provides methods for updating replicated pods in a predictable,
// fault-tolerant way.
type RollingUpdater struct {
// Client interface for creating and updating controllers
c client.Interface
// Namespace for resources
ns string
}
// NewRollingUpdater creates a RollingUpdater from a client
func NewRollingUpdater(namespace string, c client.Interface) *RollingUpdater {
return &RollingUpdater{
c,
namespace,
}
}
const (
sourceIdAnnotation = kubectlAnnotationPrefix + "update-source-id"
desiredReplicasAnnotation = kubectlAnnotationPrefix + "desired-replicas"
)
// Update all pods for a ReplicationController (oldRc) by creating a new controller (newRc)
// with 0 replicas, and synchronously resizing oldRc,newRc by 1 until oldRc has 0 replicas
// and newRc has the original # of desired replicas. oldRc is then deleted.
// If an update from newRc to oldRc is already in progress, we attempt to drive it to completion.
// If an error occurs at any step of the update, the error will be returned.
// 'out' writer for progress output
// 'oldRc' existing controller to be replaced
// 'newRc' controller that will take ownership of updated pods (will be created if needed)
// 'updatePeriod' time to wait between individual pod updates
// 'interval' time to wait between polling controller status after update
// 'timeout' time to wait for controller updates before giving up
//
// TODO: make this handle performing a rollback of a partially completed rollout.
func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationController, updatePeriod, interval, timeout time.Duration) error {
oldName := oldRc.ObjectMeta.Name
newName := newRc.ObjectMeta.Name
if newRc.Spec.Replicas <= 0 {
return fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %s\n", newName, newRc.Spec)
}
desired := newRc.Spec.Replicas
sourceId := fmt.Sprintf("%s:%s", oldName, oldRc.ObjectMeta.UID)
// look for existing newRc, incase this update was previously started but interrupted
rc, existing, err := r.getExistingNewRc(sourceId, newName)
if existing {
fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newName)
if err != nil {
return err
}
replicas := rc.ObjectMeta.Annotations[desiredReplicasAnnotation]
desired, err = strconv.Atoi(replicas)
if err != nil {
return fmt.Errorf("Unable to parse annotation for %s: %s=%s",
newName, desiredReplicasAnnotation, replicas)
}
newRc = rc
} else {
fmt.Fprintf(out, "Creating %s\n", newName)
if newRc.ObjectMeta.Annotations == nil {
newRc.ObjectMeta.Annotations = map[string]string{}
}
newRc.ObjectMeta.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", desired)
newRc.ObjectMeta.Annotations[sourceIdAnnotation] = sourceId
newRc.Spec.Replicas = 0
newRc, err = r.c.ReplicationControllers(r.ns).Create(newRc)
if err != nil {
return err
}
}
// +1, -1 on oldRc, newRc until newRc has desired number of replicas or oldRc has 0 replicas
for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 {
newRc.Spec.Replicas += 1
oldRc.Spec.Replicas -= 1
fmt.Fprintf(out, "Updating %s replicas: %d, %s replicas: %d\n",
oldName, oldRc.Spec.Replicas,
newName, newRc.Spec.Replicas)
newRc, err = r.updateAndWait(newRc, interval, timeout)
if err != nil {
return err
}
time.Sleep(updatePeriod)
oldRc, err = r.updateAndWait(oldRc, interval, timeout)
if err != nil {
return err
}
}
// delete remaining replicas on oldRc
if oldRc.Spec.Replicas != 0 {
fmt.Fprintf(out, "Stopping %s replicas: %d -> %d\n",
oldName, oldRc.Spec.Replicas, 0)
oldRc.Spec.Replicas = 0
oldRc, err = r.updateAndWait(oldRc, interval, timeout)
if err != nil {
return err
}
}
// add remaining replicas on newRc, cleanup annotations
if newRc.Spec.Replicas != desired {
fmt.Fprintf(out, "Resizing %s replicas: %d -> %d\n",
newName, newRc.Spec.Replicas, desired)
newRc.Spec.Replicas = desired
}
delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation)
delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation)
newRc, err = r.updateAndWait(newRc, interval, timeout)
if err != nil {
return err
}
// delete old rc
fmt.Fprintf(out, "Update succeeded. Deleting %s\n", oldName)
return r.c.ReplicationControllers(r.ns).Delete(oldName)
}
func (r *RollingUpdater) getExistingNewRc(sourceId, name string) (rc *api.ReplicationController, existing bool, err error) {
if rc, err = r.c.ReplicationControllers(r.ns).Get(name); err == nil {
existing = true
annotations := rc.ObjectMeta.Annotations
source := annotations[sourceIdAnnotation]
_, ok := annotations[desiredReplicasAnnotation]
if source != sourceId || !ok {
err = fmt.Errorf("Missing/unexpected annotations for controller %s: %s", name, annotations)
}
return
}
err = nil
return
}
func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval, timeout time.Duration) (*api.ReplicationController, error) {
rc, err := r.c.ReplicationControllers(r.ns).Update(rc)
if err != nil {
return nil, err
}
if err := wait.Poll(interval, timeout,
client.ControllerHasDesiredReplicas(r.c, rc)); err != nil {
return nil, err
}
return r.c.ReplicationControllers(r.ns).Get(rc.ObjectMeta.Name)
}

View File

@ -0,0 +1,276 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubectl
import (
"bytes"
"fmt"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
)
type customFake struct {
*client.Fake
ctrl client.ReplicationControllerInterface
}
func (c *customFake) ReplicationControllers(namespace string) client.ReplicationControllerInterface {
return c.ctrl
}
func fakeClientFor(namespace string, responses []fakeResponse) client.Interface {
fake := client.Fake{}
return &customFake{
&fake,
&fakeRc{
&client.FakeReplicationControllers{
Fake: &fake,
Namespace: namespace,
},
responses,
},
}
}
type fakeResponse struct {
controller *api.ReplicationController
err error
}
type fakeRc struct {
*client.FakeReplicationControllers
responses []fakeResponse
}
func (c *fakeRc) Get(name string) (*api.ReplicationController, error) {
action := client.FakeAction{Action: "get-controller", Value: name}
if len(c.responses) == 0 {
return nil, fmt.Errorf("Unexpected Action: %s", action)
}
c.Fake.Actions = append(c.Fake.Actions, action)
result := c.responses[0]
c.responses = c.responses[1:]
return result.controller, result.err
}
func (c *fakeRc) Create(controller *api.ReplicationController) (*api.ReplicationController, error) {
c.Fake.Actions = append(c.Fake.Actions, client.FakeAction{Action: "create-controller", Value: controller.ObjectMeta.Name})
return controller, nil
}
func (c *fakeRc) Update(controller *api.ReplicationController) (*api.ReplicationController, error) {
c.Fake.Actions = append(c.Fake.Actions, client.FakeAction{Action: "update-controller", Value: controller.ObjectMeta.Name})
return controller, nil
}
func oldRc(replicas int) *api.ReplicationController {
return &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "foo-v1",
UID: "7764ae47-9092-11e4-8393-42010af018ff",
},
Spec: api.ReplicationControllerSpec{
Replicas: replicas,
Selector: map[string]string{"version": "v1"},
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Name: "foo-v1",
Labels: map[string]string{"version": "v1"},
},
},
},
Status: api.ReplicationControllerStatus{
Replicas: replicas,
},
}
}
func newRc(replicas int, desired int) *api.ReplicationController {
rc := oldRc(replicas)
rc.Spec.Template = &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Name: "foo-v2",
Labels: map[string]string{"version": "v2"},
},
}
rc.Spec.Selector = map[string]string{"version": "v2"}
rc.ObjectMeta = api.ObjectMeta{
Name: "foo-v2",
Annotations: map[string]string{
desiredReplicasAnnotation: fmt.Sprintf("%d", desired),
sourceIdAnnotation: "foo-v1:7764ae47-9092-11e4-8393-42010af018ff",
},
}
return rc
}
func TestUpdate(t *testing.T) {
tests := []struct {
oldRc, newRc *api.ReplicationController
responses []fakeResponse
output string
}{
{
oldRc(1), newRc(1, 1),
[]fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// one update round
{newRc(1, 1), nil},
{newRc(1, 1), nil},
{oldRc(0), nil},
{oldRc(0), nil},
// get newRc after final update (to cleanup annotations)
{newRc(1, 1), nil},
{newRc(1, 1), nil},
},
`Creating foo-v2
Updating foo-v1 replicas: 0, foo-v2 replicas: 1
Update succeeded. Deleting foo-v1
`,
}, {
oldRc(2), newRc(2, 2),
[]fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// 2 gets for each update (poll for condition, refetch)
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{oldRc(1), nil},
{oldRc(1), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{oldRc(0), nil},
{oldRc(0), nil},
// get newRc after final update (cleanup annotations)
{newRc(2, 2), nil},
{newRc(2, 2), nil},
},
`Creating foo-v2
Updating foo-v1 replicas: 1, foo-v2 replicas: 1
Updating foo-v1 replicas: 0, foo-v2 replicas: 2
Update succeeded. Deleting foo-v1
`,
}, {
oldRc(2), newRc(7, 7),
[]fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// 2 gets for each update (poll for condition, refetch)
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{oldRc(1), nil},
{oldRc(1), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{oldRc(0), nil},
{oldRc(0), nil},
// final update on newRc (resize + cleanup annotations)
{newRc(7, 7), nil},
{newRc(7, 7), nil},
},
`Creating foo-v2
Updating foo-v1 replicas: 1, foo-v2 replicas: 1
Updating foo-v1 replicas: 0, foo-v2 replicas: 2
Resizing foo-v2 replicas: 2 -> 7
Update succeeded. Deleting foo-v1
`,
}, {
oldRc(7), newRc(2, 2),
[]fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// 2 gets for each update (poll for condition, refetch)
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{oldRc(6), nil},
{oldRc(6), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{oldRc(5), nil},
{oldRc(5), nil},
// stop oldRc
{oldRc(0), nil},
{oldRc(0), nil},
// final update on newRc (cleanup annotations)
{newRc(2, 2), nil},
{newRc(2, 2), nil},
},
`Creating foo-v2
Updating foo-v1 replicas: 6, foo-v2 replicas: 1
Updating foo-v1 replicas: 5, foo-v2 replicas: 2
Stopping foo-v1 replicas: 5 -> 0
Update succeeded. Deleting foo-v1
`,
},
}
for _, test := range tests {
updater := RollingUpdater{
fakeClientFor("default", test.responses),
"default",
}
var buffer bytes.Buffer
if err := updater.Update(&buffer, test.oldRc, test.newRc, 0, 1*time.Millisecond, 1*time.Millisecond); err != nil {
t.Errorf("Update failed: %v", err)
}
if buffer.String() != test.output {
t.Errorf("Bad output. expected:\n%s\ngot:\n%s", test.output, buffer.String())
}
}
}
func TestUpdateRecovery(t *testing.T) {
// Test recovery from interruption
rc := oldRc(2)
rcExisting := newRc(1, 3)
output := `Continuing update with existing controller foo-v2.
Updating foo-v1 replicas: 1, foo-v2 replicas: 2
Updating foo-v1 replicas: 0, foo-v2 replicas: 3
Update succeeded. Deleting foo-v1
`
responses := []fakeResponse{
// Existing newRc
{rcExisting, nil},
// 2 gets for each update (poll for condition, refetch)
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{oldRc(1), nil},
{oldRc(1), nil},
{newRc(3, 3), nil},
{newRc(3, 3), nil},
{oldRc(0), nil},
{oldRc(0), nil},
// get newRc after final update (cleanup annotations)
{newRc(3, 3), nil},
{newRc(3, 3), nil},
}
updater := RollingUpdater{fakeClientFor("default", responses), "default"}
var buffer bytes.Buffer
if err := updater.Update(&buffer, rc, rcExisting, 0, 1*time.Millisecond, 1*time.Millisecond); err != nil {
t.Errorf("Update failed: %v", err)
}
if buffer.String() != output {
t.Errorf("Output was not as expected. Expected:\n%s\nGot:\n%s", output, buffer.String())
}
}