Patch service instead of update in service controller

Co-authored-by: Josh Horwitz <horwitzja@gmail.com>
k3s-v1.15.3
Zihong Zheng 2019-05-15 21:40:04 -07:00
parent aaec77a94b
commit a1f07049e5
4 changed files with 179 additions and 43 deletions

View File

@ -10,6 +10,7 @@ go_library(
name = "go_default_library",
srcs = [
"doc.go",
"patch.go",
"service_controller.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/service",
@ -20,8 +21,10 @@ go_library(
"//pkg/util/metrics:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
@ -39,7 +42,10 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["service_controller_test.go"],
srcs = [
"patch_test.go",
"service_controller_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/api/testapi:go_default_library",

View File

@ -0,0 +1,63 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"encoding/json"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
)
// patch patches service's Status or ObjectMeta given the origin and
// updated ones. Change to spec will be ignored.
func patch(c v1core.CoreV1Interface, oldSvc *v1.Service, newSvc *v1.Service) (*v1.Service, error) {
// Reset spec to make sure only patch for Status or ObjectMeta.
newSvc.Spec = oldSvc.Spec
patchBytes, err := getPatchBytes(oldSvc, newSvc)
if err != nil {
return nil, err
}
updatedSvc, err := c.Services(oldSvc.Namespace).Patch(oldSvc.Name, types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, fmt.Errorf("failed to patch %q for svc %s/%s: %v", patchBytes, oldSvc.Namespace, oldSvc.Name, err)
}
return updatedSvc, nil
}
func getPatchBytes(oldSvc *v1.Service, newSvc *v1.Service) ([]byte, error) {
oldData, err := json.Marshal(oldSvc)
if err != nil {
return nil, fmt.Errorf("failed to Marshal oldData for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err)
}
newData, err := json.Marshal(newSvc)
if err != nil {
return nil, fmt.Errorf("failed to Marshal newData for svc %s/%s: %v", newSvc.Namespace, newSvc.Name, err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Service{})
if err != nil {
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err)
}
return patchBytes, nil
}

View File

@ -0,0 +1,101 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)
func addAnnotations(svc *v1.Service) {
svc.Annotations["foo"] = "bar"
}
func TestPatch(t *testing.T) {
svcOrigin := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-patch",
Annotations: map[string]string{},
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
},
}
fakeCs := fake.NewSimpleClientset(svcOrigin)
// Issue a separate update and verify patch doesn't fail after this.
svcToUpdate := svcOrigin.DeepCopy()
addAnnotations(svcToUpdate)
if _, err := fakeCs.CoreV1().Services(svcOrigin.Namespace).Update(svcToUpdate); err != nil {
t.Fatalf("Failed to update service: %v", err)
}
// Attempt to patch based the original service.
svcToPatch := svcOrigin.DeepCopy()
svcToPatch.Finalizers = []string{"foo"}
svcToPatch.Spec.ClusterIP = "10.0.0.2"
svcToPatch.Status = v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{
{IP: "8.8.8.8"},
},
},
}
svcPatched, err := patch(fakeCs.CoreV1(), svcOrigin, svcToPatch)
if err != nil {
t.Fatalf("Failed to patch service: %v", err)
}
// Service returned by patch will contain latest content (e.g from
// the separate update).
addAnnotations(svcToPatch)
if !reflect.DeepEqual(svcPatched, svcToPatch) {
t.Errorf("PatchStatus() = %+v, want %+v", svcPatched, svcToPatch)
}
// Explicitly validate if spec is unchanged from origin.
if !reflect.DeepEqual(svcPatched.Spec, svcOrigin.Spec) {
t.Errorf("Got spec = %+v, want %+v", svcPatched.Spec, svcOrigin.Spec)
}
}
func TestGetPatchBytes(t *testing.T) {
origin := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-patch-bytes",
Finalizers: []string{"foo"},
},
}
updated := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-patch-bytes",
Finalizers: []string{"foo", "bar"},
},
}
b, err := getPatchBytes(origin, updated)
if err != nil {
t.Fatal(err)
}
expected := `{"metadata":{"$setElementOrder/finalizers":["foo","bar"],"finalizers":["bar"]}}`
if string(b) != expected {
t.Errorf("getPatchBytes(%+v, %+v) = %s ; want %s", origin, updated, string(b), expected)
}
}

View File

@ -24,7 +24,7 @@ import (
"reflect"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
@ -309,57 +309,23 @@ func (s *ServiceController) syncLoadBalancerIfNeeded(key string, service *v1.Ser
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer")
}
// Write the state if changed
// TODO: Be careful here ... what if there were other changes to the service?
// If there are any changes to the status then patch the service.
if !v1helper.LoadBalancerStatusEqual(previousState, newState) {
// Make a copy so we don't mutate the shared informer cache
service = service.DeepCopy()
updated := service.DeepCopy()
updated.Status.LoadBalancer = *newState
// Update the status on the copy
service.Status.LoadBalancer = *newState
if err := s.persistUpdate(service); err != nil {
// TODO: This logic needs to be revisited. We might want to retry on all the errors, not just conflicts.
if errors.IsConflict(err) {
return fmt.Errorf("not persisting update to service '%s/%s' that has been changed since we received it: %v", service.Namespace, service.Name, err)
}
runtime.HandleError(fmt.Errorf("failed to persist service %q updated status to apiserver, even after retries. Giving up: %v", key, err))
return nil
if _, err := patch(s.kubeClient.CoreV1(), service, updated); err != nil {
return fmt.Errorf("failed to patch status for service %s: %v", key, err)
}
klog.V(4).Infof("Successfully patched status for service %s", key)
} else {
klog.V(2).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key)
klog.V(4).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key)
}
return nil
}
func (s *ServiceController) persistUpdate(service *v1.Service) error {
var err error
for i := 0; i < clientRetryCount; i++ {
_, err = s.kubeClient.CoreV1().Services(service.Namespace).UpdateStatus(service)
if err == nil {
return nil
}
// If the object no longer exists, we don't want to recreate it. Just bail
// out so that we can process the delete, which we should soon be receiving
// if we haven't already.
if errors.IsNotFound(err) {
klog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
service.Namespace, service.Name, err)
return nil
}
// TODO: Try to resolve the conflict if the change was unrelated to load
// balancer status. For now, just pass it up the stack.
if errors.IsConflict(err) {
return err
}
klog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v",
service.Namespace, service.Name, err)
time.Sleep(clientRetryInterval)
}
return err
}
func (s *ServiceController) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
nodes, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
if err != nil {