diff --git a/pkg/kubectl/BUILD b/pkg/kubectl/BUILD index ae10e5f14f..c0180bf03c 100644 --- a/pkg/kubectl/BUILD +++ b/pkg/kubectl/BUILD @@ -56,6 +56,7 @@ go_test( "//pkg/printers:go_default_library", "//vendor/github.com/spf13/cobra:go_default_library", "//vendor/k8s.io/api/apps/v1beta1:go_default_library", + "//vendor/k8s.io/api/apps/v1beta2:go_default_library", "//vendor/k8s.io/api/autoscaling/v1:go_default_library", "//vendor/k8s.io/api/batch/v1:go_default_library", "//vendor/k8s.io/api/batch/v1beta1:go_default_library", @@ -68,18 +69,24 @@ go_test( "//vendor/k8s.io/api/scheduling/v1alpha1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/client-go/discovery:go_default_library", + "//vendor/k8s.io/client-go/discovery/fake:go_default_library", + "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/rest/fake:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", "//vendor/k8s.io/client-go/util/testing:go_default_library", ], @@ -183,6 +190,7 @@ go_library( "//vendor/k8s.io/client-go/kubernetes/typed/apps/v1beta1:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/extensions/v1beta1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/util/integer:go_default_library", "//vendor/k8s.io/client-go/util/jsonpath:go_default_library", "//vendor/k8s.io/client-go/util/retry:go_default_library", diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index da6d4fbb79..1d4165f962 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -21,6 +21,7 @@ import ( "strconv" "time" + autoscalingapi "k8s.io/api/autoscaling/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -31,6 +32,8 @@ import ( "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/extensions" + + scaleclient "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/internalversion" batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion" @@ -516,3 +519,93 @@ func (scaler *DeploymentScaler) Scale(namespace, name string, newSize uint, prec } return nil } + +// validateGeneric ensures that the preconditions match. Returns nil if they are valid, otherwise an error +// TODO(p0lyn0mial): when the work on GenericScaler is done, rename validateGeneric to validate +func (precondition *ScalePrecondition) validateGeneric(scale *autoscalingapi.Scale) error { + if precondition.Size != -1 && int(scale.Spec.Replicas) != precondition.Size { + return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(scale.Spec.Replicas))} + } + if len(precondition.ResourceVersion) > 0 && scale.ResourceVersion != precondition.ResourceVersion { + return PreconditionError{"resource version", precondition.ResourceVersion, scale.ResourceVersion} + } + return nil +} + +// GenericScaler can update scales for resources in a particular namespace +// TODO(o0lyn0mial): when the work on GenericScaler is done, don't +// export the GenericScaler. Instead use ScalerFor method for getting the Scaler +// also update the UTs +type GenericScaler struct { + scaleNamespacer scaleclient.ScalesGetter + targetGR schema.GroupResource +} + +var _ Scaler = &GenericScaler{} + +// ScaleSimple updates a scale of a given resource. It returns the resourceVersion of the scale if the update was successful. +func (s *GenericScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) { + scale, err := s.scaleNamespacer.Scales(namespace).Get(s.targetGR, name) + if err != nil { + return "", ScaleError{ScaleGetFailure, "", err} + } + if preconditions != nil { + if err := preconditions.validateGeneric(scale); err != nil { + return "", err + } + } + + scale.Spec.Replicas = int32(newSize) + updatedScale, err := s.scaleNamespacer.Scales(namespace).Update(s.targetGR, scale) + if err != nil { + if errors.IsConflict(err) { + return "", ScaleError{ScaleUpdateConflictFailure, scale.ResourceVersion, err} + } + return "", ScaleError{ScaleUpdateFailure, scale.ResourceVersion, err} + } + return updatedScale.ResourceVersion, nil +} + +// Scale updates a scale of a given resource to a new size, with optional precondition check (if preconditions is not nil), +// optional retries (if retry is not nil), and then optionally waits for the status to reach desired count. +func (s *GenericScaler) Scale(namespace, resourceName string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error { + if preconditions == nil { + preconditions = &ScalePrecondition{-1, ""} + } + if retry == nil { + // make it try only once, immediately + retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} + } + cond := ScaleCondition(s, preconditions, namespace, resourceName, newSize, nil) + if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { + return err + } + if waitForReplicas != nil { + err := wait.PollImmediate( + waitForReplicas.Interval, + waitForReplicas.Timeout, + scaleHasDesiredReplicas(s.scaleNamespacer, s.targetGR, resourceName, namespace, int32(newSize))) + if err == wait.ErrWaitTimeout { + return fmt.Errorf("timed out waiting for %q to be synced", resourceName) + } + return err + } + return nil +} + +// scaleHasDesiredReplicas returns a condition that will be true if and only if the desired replica +// count for a scale (Spec) equals its updated replicas count (Status) +func scaleHasDesiredReplicas(sClient scaleclient.ScalesGetter, gr schema.GroupResource, resourceName string, namespace string, desiredReplicas int32) wait.ConditionFunc { + return func() (bool, error) { + actualScale, err := sClient.Scales(namespace).Get(gr, resourceName) + if err != nil { + return false, err + } + // this means the desired scale target has been reset by something else + if actualScale.Spec.Replicas != desiredReplicas { + return true, nil + } + return actualScale.Spec.Replicas == actualScale.Status.Replicas && + desiredReplicas == actualScale.Status.Replicas, nil + } +} diff --git a/pkg/kubectl/scale_test.go b/pkg/kubectl/scale_test.go index ba177982f2..1d9b5119ea 100644 --- a/pkg/kubectl/scale_test.go +++ b/pkg/kubectl/scale_test.go @@ -17,11 +17,28 @@ limitations under the License. package kubectl import ( + "bytes" + "encoding/json" "errors" + "fmt" + "io" + "io/ioutil" + "net/http" "testing" + "time" + appsv1beta2 "k8s.io/api/apps/v1beta2" kerrors "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/discovery" + fakedisco "k8s.io/client-go/discovery/fake" + "k8s.io/client-go/dynamic" + fakerest "k8s.io/client-go/rest/fake" + "k8s.io/client-go/scale" testcore "k8s.io/client-go/testing" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" @@ -1310,3 +1327,283 @@ func TestValidateReplicaSets(t *testing.T) { } } } + +// TestGenericScaleSimple exercises GenericScaler.ScaleSimple method +func TestGenericScaleSimple(t *testing.T) { + // test data + discoveryResources := []*metav1.APIResourceList{ + { + GroupVersion: appsv1beta2.SchemeGroupVersion.String(), + APIResources: []metav1.APIResource{ + {Name: "deployments", Namespaced: true, Kind: "Deployment"}, + {Name: "deployments/scale", Namespaced: true, Kind: "Scale", Group: "apps", Version: "v1beta2"}, + }, + }, + } + appsV1beta2Scale := &appsv1beta2.Scale{ + TypeMeta: metav1.TypeMeta{ + Kind: "Scale", + APIVersion: appsv1beta2.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "abc", + }, + Spec: appsv1beta2.ScaleSpec{Replicas: 10}, + Status: appsv1beta2.ScaleStatus{ + Replicas: 10, + }, + } + pathsResources := map[string]runtime.Object{ + "/apis/apps/v1beta2/namespaces/default/deployments/abc/scale": appsV1beta2Scale, + } + + scaleClient, err := fakeScaleClient(discoveryResources, pathsResources) + if err != nil { + t.Fatal(err) + } + + // test scenarios + scenarios := []struct { + name string + precondition ScalePrecondition + newSize int + targetGR schema.GroupResource + resName string + scaleGetter scale.ScalesGetter + expectError bool + }{ + // scenario 1: scale up the "abc" deployment + { + name: "scale up the \"abc\" deployment", + precondition: ScalePrecondition{10, ""}, + newSize: 20, + targetGR: schema.GroupResource{Group: "apps", Resource: "deployment"}, + resName: "abc", + scaleGetter: scaleClient, + }, + // scenario 2: scale down the "abc" deployment + { + name: "scale down the \"abs\" deplyment", + precondition: ScalePrecondition{20, ""}, + newSize: 5, + targetGR: schema.GroupResource{Group: "apps", Resource: "deployment"}, + resName: "abc", + scaleGetter: scaleClient, + }, + // scenario 3: precondition error, expected size is 1, + // note that the previous scenario (2) set the size to 5 + { + name: "precondition error, expected size is 1", + precondition: ScalePrecondition{1, ""}, + newSize: 5, + targetGR: schema.GroupResource{Group: "apps", Resource: "deployment"}, + resName: "abc", + scaleGetter: scaleClient, + expectError: true, + }, + // scenario 4: precondition is not validated when the precondition size is set to -1 + { + name: "precondition is not validated when the size is set to -1", + precondition: ScalePrecondition{-1, ""}, + newSize: 5, + targetGR: schema.GroupResource{Group: "apps", Resource: "deployment"}, + resName: "abc", + scaleGetter: scaleClient, + }, + // scenario 5: precondition error, resource version mismatch + { + name: "precondition error, resource version mismatch", + precondition: ScalePrecondition{5, "v1"}, + newSize: 5, + targetGR: schema.GroupResource{Group: "apps", Resource: "deployment"}, + resName: "abc", + scaleGetter: scaleClient, + expectError: true, + }, + } + + // act + for index, scenario := range scenarios { + t.Run(fmt.Sprintf("running scenario %d: %s", index+1, scenario.name), func(t *testing.T) { + target := GenericScaler{scenario.scaleGetter, scenario.targetGR} + + resVersion, err := target.ScaleSimple("default", scenario.resName, &scenario.precondition, uint(scenario.newSize)) + + if scenario.expectError && err == nil { + t.Fatal("expeced an error but was not returned") + } + if !scenario.expectError && err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resVersion != "" { + t.Fatalf("unexpected resource version returned = %s, wanted = %s", resVersion, "") + } + }) + } +} + +// TestGenericScale exercises GenericScaler.Scale method +func TestGenericScale(t *testing.T) { + // test data + discoveryResources := []*metav1.APIResourceList{ + { + GroupVersion: appsv1beta2.SchemeGroupVersion.String(), + APIResources: []metav1.APIResource{ + {Name: "deployments", Namespaced: true, Kind: "Deployment"}, + {Name: "deployments/scale", Namespaced: true, Kind: "Scale", Group: "apps", Version: "v1beta2"}, + }, + }, + } + appsV1beta2Scale := &appsv1beta2.Scale{ + TypeMeta: metav1.TypeMeta{ + Kind: "Scale", + APIVersion: appsv1beta2.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "abc", + }, + Spec: appsv1beta2.ScaleSpec{Replicas: 10}, + Status: appsv1beta2.ScaleStatus{ + Replicas: 10, + }, + } + pathsResources := map[string]runtime.Object{ + "/apis/apps/v1beta2/namespaces/default/deployments/abc/scale": appsV1beta2Scale, + } + + scaleClient, err := fakeScaleClient(discoveryResources, pathsResources) + if err != nil { + t.Fatal(err) + } + + // test scenarios + scenarios := []struct { + name string + precondition ScalePrecondition + newSize int + targetGR schema.GroupResource + resName string + scaleGetter scale.ScalesGetter + waitForReplicas *RetryParams + expectError bool + }{ + // scenario 1: scale up the "abc" deployment + { + name: "scale up the \"abc\" deployment", + precondition: ScalePrecondition{10, ""}, + newSize: 20, + targetGR: schema.GroupResource{Group: "apps", Resource: "deployment"}, + resName: "abc", + scaleGetter: scaleClient, + }, + // scenario 2: a resource name cannot be empty + { + name: "a resource name cannot be empty", + precondition: ScalePrecondition{10, ""}, + newSize: 20, + targetGR: schema.GroupResource{Group: "apps", Resource: "deployment"}, + resName: "", + scaleGetter: scaleClient, + expectError: true, + }, + // scenario 3: wait for replicas error due to status.Replicas != spec.Replicas + { + name: "wait for replicas error due to status.Replicas != spec.Replicas", + precondition: ScalePrecondition{10, ""}, + newSize: 20, + targetGR: schema.GroupResource{Group: "apps", Resource: "deployment"}, + resName: "abc", + scaleGetter: scaleClient, + waitForReplicas: &RetryParams{time.Duration(5 * time.Second), time.Duration(5 * time.Second)}, + expectError: true, + }, + } + + // act + for index, scenario := range scenarios { + t.Run(fmt.Sprintf("running scenario %d: %s", index+1, scenario.name), func(t *testing.T) { + target := GenericScaler{scenario.scaleGetter, scenario.targetGR} + + err := target.Scale("default", scenario.resName, uint(scenario.newSize), &scenario.precondition, nil, scenario.waitForReplicas) + + if scenario.expectError && err == nil { + t.Fatal("expeced an error but was not returned") + } + if !scenario.expectError && err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + } +} + +func fakeScaleClient(discoveryResources []*metav1.APIResourceList, pathsResources map[string]runtime.Object) (scale.ScalesGetter, error) { + fakeDiscoveryClient := &fakedisco.FakeDiscovery{Fake: &testcore.Fake{}} + fakeDiscoveryClient.Resources = discoveryResources + restMapperRes, err := discovery.GetAPIGroupResources(fakeDiscoveryClient) + if err != nil { + return nil, err + } + restMapper := discovery.NewRESTMapper(restMapperRes, apimeta.InterfacesForUnstructured) + codecs := serializer.NewCodecFactory(scale.NewScaleConverter().Scheme()) + fakeReqHandler := func(req *http.Request) (*http.Response, error) { + path := req.URL.Path + scale, isScalePath := pathsResources[path] + if !isScalePath { + return nil, fmt.Errorf("unexpected request for URL %q with method %q", req.URL.String(), req.Method) + } + + switch req.Method { + case "GET": + res, err := json.Marshal(scale) + if err != nil { + return nil, err + } + return &http.Response{StatusCode: 200, Header: defaultHeaders(), Body: bytesBody(res)}, nil + case "PUT": + decoder := codecs.UniversalDeserializer() + body, err := ioutil.ReadAll(req.Body) + if err != nil { + return nil, err + } + newScale, newScaleGVK, err := decoder.Decode(body, nil, nil) + if err != nil { + return nil, fmt.Errorf("unexpected request body: %v", err) + } + if *newScaleGVK != scale.GetObjectKind().GroupVersionKind() { + return nil, fmt.Errorf("unexpected scale API version %s (expected %s)", newScaleGVK.String(), scale.GetObjectKind().GroupVersionKind().String()) + } + res, err := json.Marshal(newScale) + if err != nil { + return nil, err + } + + pathsResources[path] = newScale + return &http.Response{StatusCode: 200, Header: defaultHeaders(), Body: bytesBody(res)}, nil + default: + return nil, fmt.Errorf("unexpected request for URL %q with method %q", req.URL.String(), req.Method) + } + } + + fakeClient := &fakerest.RESTClient{ + Client: fakerest.CreateHTTPClient(fakeReqHandler), + NegotiatedSerializer: serializer.DirectCodecFactory{ + CodecFactory: serializer.NewCodecFactory(scale.NewScaleConverter().Scheme()), + }, + GroupVersion: schema.GroupVersion{}, + VersionedAPIPath: "/not/a/real/path", + } + + resolver := scale.NewDiscoveryScaleKindResolver(fakeDiscoveryClient) + client := scale.New(fakeClient, restMapper, dynamic.LegacyAPIPathResolverFunc, resolver) + return client, nil +} + +func bytesBody(bodyBytes []byte) io.ReadCloser { + return ioutil.NopCloser(bytes.NewReader(bodyBytes)) +} + +func defaultHeaders() http.Header { + header := http.Header{} + header.Set("Content-Type", runtime.ContentTypeJSON) + return header +}