mirror of https://github.com/k3s-io/k3s
Merge pull request #7620 from brendandburns/fix
Support recovery from in the middle of a rename.pull/6/head
commit
7b1e9adb47
|
@ -17,12 +17,15 @@ limitations under the License.
|
|||
package api
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
|
@ -130,3 +133,11 @@ func AddToNodeAddresses(addresses *[]NodeAddress, addAddresses ...NodeAddress) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func HashObject(obj runtime.Object, codec runtime.Codec) (string, error) {
|
||||
data, err := codec.Encode(obj)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return fmt.Sprintf("%x", md5.Sum(data)), nil
|
||||
}
|
||||
|
|
|
@ -18,23 +18,15 @@ package cmd
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
|
||||
cmdutil "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/cmd/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/resource"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
|
@ -131,16 +123,26 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
|
|||
return err
|
||||
}
|
||||
|
||||
updaterClient := kubectl.NewRollingUpdaterClient(client)
|
||||
|
||||
var newRc *api.ReplicationController
|
||||
// fetch rc
|
||||
oldRc, err := client.ReplicationControllers(cmdNamespace).Get(oldName)
|
||||
if err != nil {
|
||||
return err
|
||||
if !errors.IsNotFound(err) || len(image) == 0 || len(args) > 1 {
|
||||
return err
|
||||
}
|
||||
// We're in the middle of a rename, look for an RC with a source annotation of oldName
|
||||
newRc, err := kubectl.FindSourceController(updaterClient, cmdNamespace, oldName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return kubectl.Rename(kubectl.NewRollingUpdaterClient(client), newRc, oldName)
|
||||
}
|
||||
|
||||
keepOldName := false
|
||||
var keepOldName bool
|
||||
|
||||
mapper, typer := f.Object()
|
||||
var newRc *api.ReplicationController
|
||||
|
||||
if len(filename) != 0 {
|
||||
obj, err := resource.NewBuilder(mapper, typer, f.ClientMapperForCommand()).
|
||||
|
@ -161,73 +163,36 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
|
|||
// than the old rc. This selector is the hash of the rc, which will differ because the new rc has a
|
||||
// different image.
|
||||
if len(image) != 0 {
|
||||
var newName string
|
||||
var err error
|
||||
|
||||
if len(args) >= 2 {
|
||||
newName = args[1]
|
||||
keepOldName = len(args) == 1
|
||||
newName := findNewName(args, oldRc)
|
||||
if newRc, err = kubectl.LoadExistingNextReplicationController(client, cmdNamespace, newName); err != nil {
|
||||
return err
|
||||
}
|
||||
if newRc != nil {
|
||||
fmt.Fprintf(out, "Found existing update in progress (%s), resuming.\n", newRc.Name)
|
||||
} else {
|
||||
newName, _ = kubectl.GetNextControllerAnnotation(oldRc)
|
||||
}
|
||||
|
||||
if len(newName) > 0 {
|
||||
newRc, err = client.ReplicationControllers(cmdNamespace).Get(newName)
|
||||
if err != nil {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
return err
|
||||
} else {
|
||||
newRc = nil
|
||||
}
|
||||
} else {
|
||||
fmt.Fprint(out, "Found existing update in progress (%s), resuming.\n", newName)
|
||||
}
|
||||
}
|
||||
if newRc == nil {
|
||||
// load the old RC into the "new" RC
|
||||
if newRc, err = client.ReplicationControllers(cmdNamespace).Get(oldName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(newRc.Spec.Template.Spec.Containers) > 1 {
|
||||
// TODO: support multi-container image update.
|
||||
return errors.New("Image update is not supported for multi-container pods")
|
||||
}
|
||||
if len(newRc.Spec.Template.Spec.Containers) == 0 {
|
||||
return cmdutil.UsageError(cmd, "Pod has no containers! (%v)", newRc)
|
||||
}
|
||||
newRc.Spec.Template.Spec.Containers[0].Image = image
|
||||
|
||||
newHash, err := hashObject(newRc, client.Codec)
|
||||
newRc, err = kubectl.CreateNewControllerFromCurrentController(client, cmdNamespace, oldName, newName, image, deploymentKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(newName) == 0 {
|
||||
keepOldName = true
|
||||
newName = fmt.Sprintf("%s-%s", newRc.Name, newHash)
|
||||
}
|
||||
newRc.Name = newName
|
||||
|
||||
newRc.Spec.Selector[deploymentKey] = newHash
|
||||
newRc.Spec.Template.Labels[deploymentKey] = newHash
|
||||
// Clear resource version after hashing so that identical updates get different hashes.
|
||||
newRc.ResourceVersion = ""
|
||||
|
||||
kubectl.SetNextControllerAnnotation(oldRc, newName)
|
||||
if _, found := oldRc.Spec.Selector[deploymentKey]; !found {
|
||||
if oldRc, err = addDeploymentKeyToReplicationController(oldRc, client, deploymentKey, cmdNamespace, out); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
// Update the existing replication controller with pointers to the 'next' controller
|
||||
// and adding the <deploymentKey> label if necessary to distinguish it from the 'next' controller.
|
||||
oldHash, err := api.HashObject(oldRc, client.Codec)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
oldRc, err = kubectl.UpdateExistingReplicationController(client, oldRc, cmdNamespace, newRc.Name, deploymentKey, oldHash, out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
newName := newRc.Name
|
||||
if oldName == newName {
|
||||
if oldName == newRc.Name {
|
||||
return cmdutil.UsageError(cmd, "%s cannot have the same name as the existing ReplicationController %s",
|
||||
filename, oldName)
|
||||
}
|
||||
|
||||
updater := kubectl.NewRollingUpdater(newRc.Namespace, kubectl.NewRollingUpdaterClient(client))
|
||||
updater := kubectl.NewRollingUpdater(newRc.Namespace, updaterClient)
|
||||
|
||||
// To successfully pull off a rolling update the new and old rc have to differ
|
||||
// by at least one selector. Every new pod should have the selector and every
|
||||
|
@ -284,124 +249,18 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
|
|||
if keepOldName {
|
||||
fmt.Fprintf(out, "%s\n", oldName)
|
||||
} else {
|
||||
fmt.Fprintf(out, "%s\n", newName)
|
||||
fmt.Fprintf(out, "%s\n", newRc.Name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func hashObject(obj runtime.Object, codec runtime.Codec) (string, error) {
|
||||
data, err := codec.Encode(obj)
|
||||
if err != nil {
|
||||
return "", err
|
||||
func findNewName(args []string, oldRc *api.ReplicationController) string {
|
||||
if len(args) >= 2 {
|
||||
return args[1]
|
||||
}
|
||||
return fmt.Sprintf("%x", md5.Sum(data)), nil
|
||||
}
|
||||
|
||||
const MaxRetries = 3
|
||||
|
||||
type updateFunc func(controller *api.ReplicationController)
|
||||
|
||||
// updateWithRetries updates applies the given rc as an update.
|
||||
func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.ReplicationController, applyUpdate updateFunc) (*api.ReplicationController, error) {
|
||||
// Each update could take ~100ms, so give it 0.5 second
|
||||
var err error
|
||||
oldRc := rc
|
||||
err = wait.Poll(10*time.Millisecond, 500*time.Millisecond, func() (bool, error) {
|
||||
// Apply the update, then attempt to push it to the apiserver.
|
||||
applyUpdate(rc)
|
||||
if rc, err = rcClient.Update(rc); err == nil {
|
||||
// rc contains the latest controller post update
|
||||
return true, nil
|
||||
}
|
||||
// Update the controller with the latest resource version, if the update failed we
|
||||
// can't trust rc so use oldRc.Name.
|
||||
if rc, err = rcClient.Get(oldRc.Name); err != nil {
|
||||
// The Get failed: Value in rc cannot be trusted.
|
||||
rc = oldRc
|
||||
}
|
||||
// The Get passed: rc contains the latest controller, expect a poll for the update.
|
||||
return false, nil
|
||||
})
|
||||
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
|
||||
// controller contains the applied update.
|
||||
return rc, err
|
||||
}
|
||||
|
||||
func addDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client *client.Client, deploymentKey, namespace string, out io.Writer) (*api.ReplicationController, error) {
|
||||
oldHash, err := hashObject(oldRc, client.Codec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// First, update the template label. This ensures that any newly created pods will have the new label
|
||||
if oldRc.Spec.Template.Labels == nil {
|
||||
oldRc.Spec.Template.Labels = map[string]string{}
|
||||
}
|
||||
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
|
||||
rc.Spec.Template.Labels[deploymentKey] = oldHash
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Update all pods managed by the rc to have the new hash label, so they are correctly adopted
|
||||
// TODO: extract the code from the label command and re-use it here.
|
||||
podList, err := client.Pods(namespace).List(labels.SelectorFromSet(oldRc.Spec.Selector), fields.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for ix := range podList.Items {
|
||||
pod := &podList.Items[ix]
|
||||
if pod.Labels == nil {
|
||||
pod.Labels = map[string]string{
|
||||
deploymentKey: oldHash,
|
||||
}
|
||||
} else {
|
||||
pod.Labels[deploymentKey] = oldHash
|
||||
}
|
||||
err = nil
|
||||
delay := 3
|
||||
for i := 0; i < MaxRetries; i++ {
|
||||
_, err = client.Pods(namespace).Update(pod)
|
||||
if err != nil {
|
||||
fmt.Fprint(out, "Error updating pod (%v), retrying after %d seconds", err, delay)
|
||||
time.Sleep(time.Second * time.Duration(delay))
|
||||
delay *= delay
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if oldRc.Spec.Selector == nil {
|
||||
oldRc.Spec.Selector = map[string]string{}
|
||||
}
|
||||
// Copy the old selector, so that we can scrub out any orphaned pods
|
||||
selectorCopy := map[string]string{}
|
||||
for k, v := range oldRc.Spec.Selector {
|
||||
selectorCopy[k] = v
|
||||
}
|
||||
|
||||
// Update the selector of the rc so it manages all the pods we updated above
|
||||
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
|
||||
rc.Spec.Selector[deploymentKey] = oldHash
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Clean up any orphaned pods that don't have the new label, this can happen if the rc manager
|
||||
// doesn't see the update to its pod template and creates a new pod with the old labels after
|
||||
// we've finished re-adopting existing pods to the rc.
|
||||
podList, err = client.Pods(namespace).List(labels.SelectorFromSet(selectorCopy), fields.Everything())
|
||||
for ix := range podList.Items {
|
||||
pod := &podList.Items[ix]
|
||||
if value, found := pod.Labels[deploymentKey]; !found || value != oldHash {
|
||||
if err := client.Pods(namespace).Delete(pod.Name, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return oldRc, nil
|
||||
if oldRc != nil {
|
||||
newName, _ := kubectl.GetNextControllerAnnotation(oldRc)
|
||||
return newName
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
|
|
@ -18,16 +18,7 @@ package cmd
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
func TestValidateArgs(t *testing.T) {
|
||||
|
@ -98,192 +89,3 @@ func TestValidateArgs(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddDeploymentHash(t *testing.T) {
|
||||
buf := &bytes.Buffer{}
|
||||
f, tf, codec := NewAPIFactory()
|
||||
rc := &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{Name: "rc"},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Selector: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
podList := &api.PodList{
|
||||
Items: []api.Pod{
|
||||
{ObjectMeta: api.ObjectMeta{Name: "foo"}},
|
||||
{ObjectMeta: api.ObjectMeta{Name: "bar"}},
|
||||
{ObjectMeta: api.ObjectMeta{Name: "baz"}},
|
||||
},
|
||||
}
|
||||
|
||||
seen := util.StringSet{}
|
||||
updatedRc := false
|
||||
tf.Client = &client.FakeRESTClient{
|
||||
Codec: codec,
|
||||
Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) {
|
||||
switch p, m := req.URL.Path, req.Method; {
|
||||
case p == "/api/v1beta3/namespaces/default/pods" && m == "GET":
|
||||
if req.URL.RawQuery != "labelSelector=foo%3Dbar" {
|
||||
t.Errorf("Unexpected query string: %s", req.URL.RawQuery)
|
||||
}
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, podList)}, nil
|
||||
case p == "/api/v1beta3/namespaces/default/pods/foo" && m == "PUT":
|
||||
seen.Insert("foo")
|
||||
obj := readOrDie(t, req, codec)
|
||||
podList.Items[0] = *(obj.(*api.Pod))
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[0])}, nil
|
||||
case p == "/api/v1beta3/namespaces/default/pods/bar" && m == "PUT":
|
||||
seen.Insert("bar")
|
||||
obj := readOrDie(t, req, codec)
|
||||
podList.Items[1] = *(obj.(*api.Pod))
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[1])}, nil
|
||||
case p == "/api/v1beta3/namespaces/default/pods/baz" && m == "PUT":
|
||||
seen.Insert("baz")
|
||||
obj := readOrDie(t, req, codec)
|
||||
podList.Items[2] = *(obj.(*api.Pod))
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[2])}, nil
|
||||
case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "PUT":
|
||||
updatedRc = true
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, rc)}, nil
|
||||
default:
|
||||
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
|
||||
return nil, nil
|
||||
}
|
||||
}),
|
||||
}
|
||||
tf.ClientConfig = &client.Config{Version: latest.Version}
|
||||
tf.Namespace = "test"
|
||||
|
||||
client, err := f.Client()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
t.Fail()
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := addDeploymentKeyToReplicationController(rc, client, "hash", api.NamespaceDefault, buf); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
for _, pod := range podList.Items {
|
||||
if !seen.Has(pod.Name) {
|
||||
t.Errorf("Missing update for pod: %s", pod.Name)
|
||||
}
|
||||
}
|
||||
if !updatedRc {
|
||||
t.Errorf("Failed to update replication controller with new labels")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateWithRetries(t *testing.T) {
|
||||
f, tf, codec := NewAPIFactory()
|
||||
rc := &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{Name: "rc",
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Selector: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Test end to end updating of the rc with retries. Essentially make sure the update handler
|
||||
// sees the right updates, failures in update/get are handled properly, and that the updated
|
||||
// rc with new resource version is returned to the caller. Without any of these rollingupdate
|
||||
// will fail cryptically.
|
||||
newRc := *rc
|
||||
newRc.ResourceVersion = "2"
|
||||
newRc.Spec.Selector["baz"] = "foobar"
|
||||
updates := []*http.Response{
|
||||
{StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})},
|
||||
{StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})},
|
||||
{StatusCode: 200, Body: objBody(codec, &newRc)},
|
||||
}
|
||||
gets := []*http.Response{
|
||||
{StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})},
|
||||
{StatusCode: 200, Body: objBody(codec, rc)},
|
||||
}
|
||||
tf.Client = &client.FakeRESTClient{
|
||||
Codec: codec,
|
||||
Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) {
|
||||
switch p, m := req.URL.Path, req.Method; {
|
||||
case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "PUT":
|
||||
update := updates[0]
|
||||
updates = updates[1:]
|
||||
// We should always get an update with a valid rc even when the get fails. The rc should always
|
||||
// contain the update.
|
||||
if c, ok := readOrDie(t, req, codec).(*api.ReplicationController); !ok || !reflect.DeepEqual(rc, c) {
|
||||
t.Errorf("Unexpected update body, got %+v expected %+v", c, rc)
|
||||
} else if sel, ok := c.Spec.Selector["baz"]; !ok || sel != "foobar" {
|
||||
t.Errorf("Expected selector label update, got %+v", c.Spec.Selector)
|
||||
} else {
|
||||
delete(c.Spec.Selector, "baz")
|
||||
}
|
||||
return update, nil
|
||||
case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "GET":
|
||||
get := gets[0]
|
||||
gets = gets[1:]
|
||||
return get, nil
|
||||
default:
|
||||
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
|
||||
return nil, nil
|
||||
}
|
||||
}),
|
||||
}
|
||||
tf.ClientConfig = &client.Config{Version: latest.Version}
|
||||
client, err := f.Client()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
t.Fail()
|
||||
return
|
||||
}
|
||||
|
||||
if rc, err := updateWithRetries(
|
||||
client.ReplicationControllers("default"), rc, func(c *api.ReplicationController) {
|
||||
c.Spec.Selector["baz"] = "foobar"
|
||||
}); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
} else if sel, ok := rc.Spec.Selector["baz"]; !ok || sel != "foobar" || rc.ResourceVersion != "2" {
|
||||
t.Errorf("Expected updated rc, got %+v", rc)
|
||||
}
|
||||
if len(updates) != 0 || len(gets) != 0 {
|
||||
t.Errorf("Remaining updates %+v gets %+v", updates, gets)
|
||||
}
|
||||
}
|
||||
|
||||
func readOrDie(t *testing.T, req *http.Request, codec runtime.Codec) runtime.Object {
|
||||
data, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
t.Errorf("Error reading: %v", err)
|
||||
t.FailNow()
|
||||
}
|
||||
obj, err := codec.Decode(data)
|
||||
if err != nil {
|
||||
t.Errorf("error decoding: %v", err)
|
||||
t.FailNow()
|
||||
}
|
||||
return obj
|
||||
}
|
||||
|
|
|
@ -17,13 +17,18 @@ limitations under the License.
|
|||
package kubectl
|
||||
|
||||
import (
|
||||
goerrors "errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
|
@ -71,6 +76,50 @@ const (
|
|||
RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename"
|
||||
)
|
||||
|
||||
func LoadExistingNextReplicationController(c *client.Client, namespace, newName string) (*api.ReplicationController, error) {
|
||||
if len(newName) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
newRc, err := c.ReplicationControllers(namespace).Get(newName)
|
||||
if err != nil && errors.IsNotFound(err) {
|
||||
return nil, nil
|
||||
}
|
||||
return newRc, err
|
||||
}
|
||||
|
||||
func CreateNewControllerFromCurrentController(c *client.Client, namespace, oldName, newName, image, deploymentKey string) (*api.ReplicationController, error) {
|
||||
// load the old RC into the "new" RC
|
||||
newRc, err := c.ReplicationControllers(namespace).Get(oldName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(newRc.Spec.Template.Spec.Containers) > 1 {
|
||||
// TODO: support multi-container image update.
|
||||
return nil, goerrors.New("Image update is not supported for multi-container pods")
|
||||
}
|
||||
if len(newRc.Spec.Template.Spec.Containers) == 0 {
|
||||
return nil, goerrors.New(fmt.Sprintf("Pod has no containers! (%v)", newRc))
|
||||
}
|
||||
newRc.Spec.Template.Spec.Containers[0].Image = image
|
||||
|
||||
newHash, err := api.HashObject(newRc, c.Codec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(newName) == 0 {
|
||||
newName = fmt.Sprintf("%s-%s", newRc.Name, newHash)
|
||||
}
|
||||
newRc.Name = newName
|
||||
|
||||
newRc.Spec.Selector[deploymentKey] = newHash
|
||||
newRc.Spec.Template.Labels[deploymentKey] = newHash
|
||||
// Clear resource version after hashing so that identical updates get different hashes.
|
||||
newRc.ResourceVersion = ""
|
||||
return newRc, nil
|
||||
}
|
||||
|
||||
// NewRollingUpdater creates a RollingUpdater from a client
|
||||
func NewRollingUpdater(namespace string, c RollingUpdaterClient) *RollingUpdater {
|
||||
return &RollingUpdater{
|
||||
|
@ -115,6 +164,138 @@ func SetNextControllerAnnotation(rc *api.ReplicationController, name string) {
|
|||
rc.Annotations[nextControllerAnnotation] = name
|
||||
}
|
||||
|
||||
func UpdateExistingReplicationController(c client.Interface, oldRc *api.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*api.ReplicationController, error) {
|
||||
SetNextControllerAnnotation(oldRc, newName)
|
||||
if _, found := oldRc.Spec.Selector[deploymentKey]; !found {
|
||||
return AddDeploymentKeyToReplicationController(oldRc, c, deploymentKey, deploymentValue, namespace, out)
|
||||
} else {
|
||||
// If we didn't need to update the controller for the deployment key, we still need to write
|
||||
// the "next" controller.
|
||||
return c.ReplicationControllers(namespace).Update(oldRc)
|
||||
}
|
||||
}
|
||||
|
||||
const MaxRetries = 3
|
||||
|
||||
func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) {
|
||||
var err error
|
||||
// First, update the template label. This ensures that any newly created pods will have the new label
|
||||
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
|
||||
if rc.Spec.Template.Labels == nil {
|
||||
rc.Spec.Template.Labels = map[string]string{}
|
||||
}
|
||||
rc.Spec.Template.Labels[deploymentKey] = deploymentValue
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Update all pods managed by the rc to have the new hash label, so they are correctly adopted
|
||||
// TODO: extract the code from the label command and re-use it here.
|
||||
podList, err := client.Pods(namespace).List(labels.SelectorFromSet(oldRc.Spec.Selector), fields.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for ix := range podList.Items {
|
||||
pod := &podList.Items[ix]
|
||||
if pod.Labels == nil {
|
||||
pod.Labels = map[string]string{
|
||||
deploymentKey: deploymentValue,
|
||||
}
|
||||
} else {
|
||||
pod.Labels[deploymentKey] = deploymentValue
|
||||
}
|
||||
err = nil
|
||||
delay := 3
|
||||
for i := 0; i < MaxRetries; i++ {
|
||||
_, err = client.Pods(namespace).Update(pod)
|
||||
if err != nil {
|
||||
fmt.Fprint(out, "Error updating pod (%v), retrying after %d seconds", err, delay)
|
||||
time.Sleep(time.Second * time.Duration(delay))
|
||||
delay *= delay
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if oldRc.Spec.Selector == nil {
|
||||
oldRc.Spec.Selector = map[string]string{}
|
||||
}
|
||||
// Copy the old selector, so that we can scrub out any orphaned pods
|
||||
selectorCopy := map[string]string{}
|
||||
for k, v := range oldRc.Spec.Selector {
|
||||
selectorCopy[k] = v
|
||||
}
|
||||
oldRc.Spec.Selector[deploymentKey] = deploymentValue
|
||||
|
||||
// Update the selector of the rc so it manages all the pods we updated above
|
||||
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
|
||||
rc.Spec.Selector[deploymentKey] = deploymentValue
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Clean up any orphaned pods that don't have the new label, this can happen if the rc manager
|
||||
// doesn't see the update to its pod template and creates a new pod with the old labels after
|
||||
// we've finished re-adopting existing pods to the rc.
|
||||
podList, err = client.Pods(namespace).List(labels.SelectorFromSet(selectorCopy), fields.Everything())
|
||||
for ix := range podList.Items {
|
||||
pod := &podList.Items[ix]
|
||||
if value, found := pod.Labels[deploymentKey]; !found || value != deploymentValue {
|
||||
if err := client.Pods(namespace).Delete(pod.Name, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return oldRc, nil
|
||||
}
|
||||
|
||||
type updateFunc func(controller *api.ReplicationController)
|
||||
|
||||
// updateWithRetries updates applies the given rc as an update.
|
||||
func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.ReplicationController, applyUpdate updateFunc) (*api.ReplicationController, error) {
|
||||
// Each update could take ~100ms, so give it 0.5 second
|
||||
var err error
|
||||
oldRc := rc
|
||||
err = wait.Poll(10*time.Millisecond, 500*time.Millisecond, func() (bool, error) {
|
||||
// Apply the update, then attempt to push it to the apiserver.
|
||||
applyUpdate(rc)
|
||||
if rc, err = rcClient.Update(rc); err == nil {
|
||||
// rc contains the latest controller post update
|
||||
return true, nil
|
||||
}
|
||||
// Update the controller with the latest resource version, if the update failed we
|
||||
// can't trust rc so use oldRc.Name.
|
||||
if rc, err = rcClient.Get(oldRc.Name); err != nil {
|
||||
// The Get failed: Value in rc cannot be trusted.
|
||||
rc = oldRc
|
||||
}
|
||||
// The Get passed: rc contains the latest controller, expect a poll for the update.
|
||||
return false, nil
|
||||
})
|
||||
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
|
||||
// controller contains the applied update.
|
||||
return rc, err
|
||||
}
|
||||
|
||||
func FindSourceController(r RollingUpdaterClient, namespace, name string) (*api.ReplicationController, error) {
|
||||
list, err := r.ListReplicationControllers(namespace, labels.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for ix := range list.Items {
|
||||
rc := &list.Items[ix]
|
||||
if rc.Annotations != nil && strings.HasPrefix(rc.Annotations[sourceIdAnnotation], name) {
|
||||
return rc, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("couldn't find a replication controller with source id == %s/%s", namespace, name)
|
||||
}
|
||||
|
||||
// Update all pods for a ReplicationController (oldRc) by creating a new
|
||||
// controller (newRc) with 0 replicas, and synchronously resizing oldRc,newRc
|
||||
// by 1 until oldRc has 0 replicas and newRc has the original # of desired
|
||||
|
@ -286,19 +467,28 @@ func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval,
|
|||
}
|
||||
|
||||
func (r *RollingUpdater) rename(rc *api.ReplicationController, newName string) error {
|
||||
return Rename(r.c, rc, newName)
|
||||
}
|
||||
|
||||
func Rename(c RollingUpdaterClient, rc *api.ReplicationController, newName string) error {
|
||||
oldName := rc.Name
|
||||
rc.Name = newName
|
||||
rc.ResourceVersion = ""
|
||||
|
||||
_, err := r.c.CreateReplicationController(rc.Namespace, rc)
|
||||
_, err := c.CreateReplicationController(rc.Namespace, rc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.c.DeleteReplicationController(rc.Namespace, oldName)
|
||||
err = c.DeleteReplicationController(rc.Namespace, oldName)
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RollingUpdaterClient abstracts access to ReplicationControllers.
|
||||
type RollingUpdaterClient interface {
|
||||
ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error)
|
||||
GetReplicationController(namespace, name string) (*api.ReplicationController, error)
|
||||
UpdateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error)
|
||||
CreateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error)
|
||||
|
@ -315,6 +505,10 @@ type realRollingUpdaterClient struct {
|
|||
client client.Interface
|
||||
}
|
||||
|
||||
func (c *realRollingUpdaterClient) ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) {
|
||||
return c.client.ReplicationControllers(namespace).List(selector)
|
||||
}
|
||||
|
||||
func (c *realRollingUpdaterClient) GetReplicationController(namespace, name string) (*api.ReplicationController, error) {
|
||||
return c.client.ReplicationControllers(namespace).Get(name)
|
||||
}
|
||||
|
|
|
@ -19,13 +19,21 @@ package kubectl
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
|
@ -380,8 +388,430 @@ func TestRollingUpdater_preserveCleanup(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestRename(t *testing.T) {
|
||||
tests := []struct {
|
||||
namespace string
|
||||
newName string
|
||||
oldName string
|
||||
err error
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
namespace: "default",
|
||||
newName: "bar",
|
||||
oldName: "foo",
|
||||
},
|
||||
{
|
||||
namespace: "default",
|
||||
newName: "bar",
|
||||
oldName: "foo",
|
||||
err: fmt.Errorf("Test Error"),
|
||||
expectError: true,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
fakeClient := &rollingUpdaterClientImpl{
|
||||
CreateReplicationControllerFn: func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) {
|
||||
if namespace != test.namespace {
|
||||
t.Errorf("unexepected namespace: %s, expected %s", namespace, test.namespace)
|
||||
}
|
||||
if rc.Name != test.newName {
|
||||
t.Errorf("unexepected name: %s, expected %s", rc.Name, test.newName)
|
||||
}
|
||||
return rc, test.err
|
||||
},
|
||||
DeleteReplicationControllerFn: func(namespace, name string) error {
|
||||
if namespace != test.namespace {
|
||||
t.Errorf("unexepected namespace: %s, expected %s", namespace, test.namespace)
|
||||
}
|
||||
if name != test.oldName {
|
||||
t.Errorf("unexepected name: %s, expected %s", name, test.oldName)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
err := Rename(fakeClient, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: test.namespace, Name: test.oldName}}, test.newName)
|
||||
if err != nil && !test.expectError {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if err == nil && test.expectError {
|
||||
t.Errorf("unexpected non-error")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindSourceController(t *testing.T) {
|
||||
ctrl1 := api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Annotations: map[string]string{
|
||||
sourceIdAnnotation: "bar:1234",
|
||||
},
|
||||
},
|
||||
}
|
||||
ctrl2 := api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "bar",
|
||||
Annotations: map[string]string{
|
||||
sourceIdAnnotation: "foo:12345",
|
||||
},
|
||||
},
|
||||
}
|
||||
ctrl3 := api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Annotations: map[string]string{
|
||||
sourceIdAnnotation: "baz:45667",
|
||||
},
|
||||
},
|
||||
}
|
||||
tests := []struct {
|
||||
list *api.ReplicationControllerList
|
||||
expectedController *api.ReplicationController
|
||||
err error
|
||||
name string
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
list: &api.ReplicationControllerList{},
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
list: &api.ReplicationControllerList{
|
||||
Items: []api.ReplicationController{ctrl1},
|
||||
},
|
||||
name: "foo",
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
list: &api.ReplicationControllerList{
|
||||
Items: []api.ReplicationController{ctrl1},
|
||||
},
|
||||
name: "bar",
|
||||
expectedController: &ctrl1,
|
||||
},
|
||||
{
|
||||
list: &api.ReplicationControllerList{
|
||||
Items: []api.ReplicationController{ctrl1, ctrl2},
|
||||
},
|
||||
name: "bar",
|
||||
expectedController: &ctrl1,
|
||||
},
|
||||
{
|
||||
list: &api.ReplicationControllerList{
|
||||
Items: []api.ReplicationController{ctrl1, ctrl2},
|
||||
},
|
||||
name: "foo",
|
||||
expectedController: &ctrl2,
|
||||
},
|
||||
{
|
||||
list: &api.ReplicationControllerList{
|
||||
Items: []api.ReplicationController{ctrl1, ctrl2, ctrl3},
|
||||
},
|
||||
name: "baz",
|
||||
expectedController: &ctrl3,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
fakeClient := rollingUpdaterClientImpl{
|
||||
ListReplicationControllersFn: func(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) {
|
||||
return test.list, test.err
|
||||
},
|
||||
}
|
||||
ctrl, err := FindSourceController(&fakeClient, "default", test.name)
|
||||
if test.expectError && err == nil {
|
||||
t.Errorf("unexpected non-error")
|
||||
}
|
||||
if !test.expectError && err != nil {
|
||||
t.Errorf("unexpected error")
|
||||
}
|
||||
if !reflect.DeepEqual(ctrl, test.expectedController) {
|
||||
t.Errorf("expected:\n%v\ngot:\n%v\n", test.expectedController, ctrl)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateExistingReplicationController(t *testing.T) {
|
||||
tests := []struct {
|
||||
rc *api.ReplicationController
|
||||
name string
|
||||
deploymentKey string
|
||||
deploymentValue string
|
||||
|
||||
expectedRc *api.ReplicationController
|
||||
expectErr bool
|
||||
}{
|
||||
{
|
||||
rc: &api.ReplicationController{
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Template: &api.PodTemplateSpec{},
|
||||
},
|
||||
},
|
||||
name: "foo",
|
||||
deploymentKey: "dk",
|
||||
deploymentValue: "some-hash",
|
||||
|
||||
expectedRc: &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Annotations: map[string]string{
|
||||
"kubectl.kubernetes.io/next-controller-id": "foo",
|
||||
},
|
||||
},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Selector: map[string]string{
|
||||
"dk": "some-hash",
|
||||
},
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"dk": "some-hash",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
rc: &api.ReplicationController{
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"dk": "some-other-hash",
|
||||
},
|
||||
},
|
||||
},
|
||||
Selector: map[string]string{
|
||||
"dk": "some-other-hash",
|
||||
},
|
||||
},
|
||||
},
|
||||
name: "foo",
|
||||
deploymentKey: "dk",
|
||||
deploymentValue: "some-hash",
|
||||
|
||||
expectedRc: &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Annotations: map[string]string{
|
||||
"kubectl.kubernetes.io/next-controller-id": "foo",
|
||||
},
|
||||
},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Selector: map[string]string{
|
||||
"dk": "some-other-hash",
|
||||
},
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"dk": "some-other-hash",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
buffer := &bytes.Buffer{}
|
||||
fakeClient := fakeClientFor("default", []fakeResponse{})
|
||||
rc, err := UpdateExistingReplicationController(fakeClient, test.rc, "default", test.name, test.deploymentKey, test.deploymentValue, buffer)
|
||||
if !reflect.DeepEqual(rc, test.expectedRc) {
|
||||
t.Errorf("expected:\n%#v\ngot:\n%#v\n", test.expectedRc, rc)
|
||||
}
|
||||
if test.expectErr && err == nil {
|
||||
t.Errorf("unexpected non-error")
|
||||
}
|
||||
if !test.expectErr && err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateWithRetries(t *testing.T) {
|
||||
codec := testapi.Codec()
|
||||
rc := &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{Name: "rc",
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Selector: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Test end to end updating of the rc with retries. Essentially make sure the update handler
|
||||
// sees the right updates, failures in update/get are handled properly, and that the updated
|
||||
// rc with new resource version is returned to the caller. Without any of these rollingupdate
|
||||
// will fail cryptically.
|
||||
newRc := *rc
|
||||
newRc.ResourceVersion = "2"
|
||||
newRc.Spec.Selector["baz"] = "foobar"
|
||||
updates := []*http.Response{
|
||||
{StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})},
|
||||
{StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})},
|
||||
{StatusCode: 200, Body: objBody(codec, &newRc)},
|
||||
}
|
||||
gets := []*http.Response{
|
||||
{StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})},
|
||||
{StatusCode: 200, Body: objBody(codec, rc)},
|
||||
}
|
||||
fakeClient := &client.FakeRESTClient{
|
||||
Codec: codec,
|
||||
Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) {
|
||||
switch p, m := req.URL.Path, req.Method; {
|
||||
case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "PUT":
|
||||
update := updates[0]
|
||||
updates = updates[1:]
|
||||
// We should always get an update with a valid rc even when the get fails. The rc should always
|
||||
// contain the update.
|
||||
if c, ok := readOrDie(t, req, codec).(*api.ReplicationController); !ok || !reflect.DeepEqual(rc, c) {
|
||||
t.Errorf("Unexpected update body, got %+v expected %+v", c, rc)
|
||||
} else if sel, ok := c.Spec.Selector["baz"]; !ok || sel != "foobar" {
|
||||
t.Errorf("Expected selector label update, got %+v", c.Spec.Selector)
|
||||
} else {
|
||||
delete(c.Spec.Selector, "baz")
|
||||
}
|
||||
return update, nil
|
||||
case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "GET":
|
||||
get := gets[0]
|
||||
gets = gets[1:]
|
||||
return get, nil
|
||||
default:
|
||||
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
|
||||
return nil, nil
|
||||
}
|
||||
}),
|
||||
}
|
||||
clientConfig := &client.Config{Version: latest.Version}
|
||||
client := client.NewOrDie(clientConfig)
|
||||
client.Client = fakeClient.Client
|
||||
|
||||
if rc, err := updateWithRetries(
|
||||
client.ReplicationControllers("default"), rc, func(c *api.ReplicationController) {
|
||||
c.Spec.Selector["baz"] = "foobar"
|
||||
}); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
} else if sel, ok := rc.Spec.Selector["baz"]; !ok || sel != "foobar" || rc.ResourceVersion != "2" {
|
||||
t.Errorf("Expected updated rc, got %+v", rc)
|
||||
}
|
||||
if len(updates) != 0 || len(gets) != 0 {
|
||||
t.Errorf("Remaining updates %+v gets %+v", updates, gets)
|
||||
}
|
||||
}
|
||||
|
||||
func readOrDie(t *testing.T, req *http.Request, codec runtime.Codec) runtime.Object {
|
||||
data, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
t.Errorf("Error reading: %v", err)
|
||||
t.FailNow()
|
||||
}
|
||||
obj, err := codec.Decode(data)
|
||||
if err != nil {
|
||||
t.Errorf("error decoding: %v", err)
|
||||
t.FailNow()
|
||||
}
|
||||
return obj
|
||||
}
|
||||
|
||||
func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser {
|
||||
return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj))))
|
||||
}
|
||||
|
||||
func TestAddDeploymentHash(t *testing.T) {
|
||||
buf := &bytes.Buffer{}
|
||||
codec := testapi.Codec()
|
||||
rc := &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{Name: "rc"},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Selector: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
podList := &api.PodList{
|
||||
Items: []api.Pod{
|
||||
{ObjectMeta: api.ObjectMeta{Name: "foo"}},
|
||||
{ObjectMeta: api.ObjectMeta{Name: "bar"}},
|
||||
{ObjectMeta: api.ObjectMeta{Name: "baz"}},
|
||||
},
|
||||
}
|
||||
|
||||
seen := util.StringSet{}
|
||||
updatedRc := false
|
||||
fakeClient := &client.FakeRESTClient{
|
||||
Codec: codec,
|
||||
Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) {
|
||||
switch p, m := req.URL.Path, req.Method; {
|
||||
case p == "/api/v1beta3/namespaces/default/pods" && m == "GET":
|
||||
if req.URL.RawQuery != "labelSelector=foo%3Dbar" {
|
||||
t.Errorf("Unexpected query string: %s", req.URL.RawQuery)
|
||||
}
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, podList)}, nil
|
||||
case p == "/api/v1beta3/namespaces/default/pods/foo" && m == "PUT":
|
||||
seen.Insert("foo")
|
||||
obj := readOrDie(t, req, codec)
|
||||
podList.Items[0] = *(obj.(*api.Pod))
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[0])}, nil
|
||||
case p == "/api/v1beta3/namespaces/default/pods/bar" && m == "PUT":
|
||||
seen.Insert("bar")
|
||||
obj := readOrDie(t, req, codec)
|
||||
podList.Items[1] = *(obj.(*api.Pod))
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[1])}, nil
|
||||
case p == "/api/v1beta3/namespaces/default/pods/baz" && m == "PUT":
|
||||
seen.Insert("baz")
|
||||
obj := readOrDie(t, req, codec)
|
||||
podList.Items[2] = *(obj.(*api.Pod))
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[2])}, nil
|
||||
case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "PUT":
|
||||
updatedRc = true
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, rc)}, nil
|
||||
default:
|
||||
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
|
||||
return nil, nil
|
||||
}
|
||||
}),
|
||||
}
|
||||
clientConfig := &client.Config{Version: latest.Version}
|
||||
client := client.NewOrDie(clientConfig)
|
||||
client.Client = fakeClient.Client
|
||||
|
||||
if _, err := AddDeploymentKeyToReplicationController(rc, client, "dk", "hash", api.NamespaceDefault, buf); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
for _, pod := range podList.Items {
|
||||
if !seen.Has(pod.Name) {
|
||||
t.Errorf("Missing update for pod: %s", pod.Name)
|
||||
}
|
||||
}
|
||||
if !updatedRc {
|
||||
t.Errorf("Failed to update replication controller with new labels")
|
||||
}
|
||||
}
|
||||
|
||||
// rollingUpdaterClientImpl is a dynamic RollingUpdaterClient.
|
||||
type rollingUpdaterClientImpl struct {
|
||||
ListReplicationControllersFn func(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error)
|
||||
GetReplicationControllerFn func(namespace, name string) (*api.ReplicationController, error)
|
||||
UpdateReplicationControllerFn func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error)
|
||||
CreateReplicationControllerFn func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error)
|
||||
|
@ -389,6 +819,10 @@ type rollingUpdaterClientImpl struct {
|
|||
ControllerHasDesiredReplicasFn func(rc *api.ReplicationController) wait.ConditionFunc
|
||||
}
|
||||
|
||||
func (c *rollingUpdaterClientImpl) ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) {
|
||||
return c.ListReplicationControllersFn(namespace, selector)
|
||||
}
|
||||
|
||||
func (c *rollingUpdaterClientImpl) GetReplicationController(namespace, name string) (*api.ReplicationController, error) {
|
||||
return c.GetReplicationControllerFn(namespace, name)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue