Merge pull request #70409 from yue9944882/use-loopback-connection-master-ep-reconciliation

Use versioned loopbacke connection in master endpoint reconciliation
pull/58/head
k8s-ci-robot 2018-11-05 16:32:25 -08:00 committed by GitHub
commit c86944b5b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 651 additions and 624 deletions

View File

@ -50,7 +50,6 @@ go_library(
"//pkg/registry/batch/rest:go_default_library",
"//pkg/registry/certificates/rest:go_default_library",
"//pkg/registry/coordination/rest:go_default_library",
"//pkg/registry/core/endpoint/storage:go_default_library",
"//pkg/registry/core/rangeallocation:go_default_library",
"//pkg/registry/core/rest:go_default_library",
"//pkg/registry/core/service/ipallocator:go_default_library",

View File

@ -69,7 +69,7 @@ func (h ClientCARegistrationHook) PostStartHook(hookContext genericapiserver.Pos
// tryToWriteClientCAs is here for unit testing with a fake client. This is a wait.ConditionFunc so the bool
// indicates if the condition was met. True when its finished, false when it should retry.
func (h ClientCARegistrationHook) tryToWriteClientCAs(client coreclient.CoreInterface) (bool, error) {
if err := createNamespaceIfNeeded(client, metav1.NamespaceSystem); err != nil {
if err := createNamespaceIfNeededWithInternalClient(client, metav1.NamespaceSystem); err != nil {
utilruntime.HandleError(err)
return false, nil
}

View File

@ -17,13 +17,34 @@ limitations under the License.
package master
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
api "k8s.io/kubernetes/pkg/apis/core"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
)
func createNamespaceIfNeeded(c coreclient.NamespacesGetter, ns string) error {
func createNamespaceIfNeeded(c corev1client.NamespacesGetter, ns string) error {
if _, err := c.Namespaces().Get(ns, metav1.GetOptions{}); err == nil {
// the namespace already exists
return nil
}
newNs := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns,
Namespace: "",
},
}
_, err := c.Namespaces().Create(newNs)
if err != nil && errors.IsAlreadyExists(err) {
err = nil
}
return err
}
// TODO(yue9944882): Remove it once we switch ClientCARegistrationHook to external types
func createNamespaceIfNeededWithInternalClient(c coreclient.NamespacesGetter, ns string) error {
if _, err := c.Namespaces().Get(ns, metav1.GetOptions{}); err == nil {
// the namespace already exists
return nil

View File

@ -31,8 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
api "k8s.io/kubernetes/pkg/apis/core"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/master/reconcilers"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
@ -49,9 +48,9 @@ const kubernetesServiceName = "kubernetes"
// "default", "kube-system" and "kube-public" namespaces, and provide the IP
// repair check on service IPs
type Controller struct {
ServiceClient coreclient.ServicesGetter
NamespaceClient coreclient.NamespacesGetter
EventClient coreclient.EventsGetter
ServiceClient corev1client.ServicesGetter
NamespaceClient corev1client.NamespacesGetter
EventClient corev1client.EventsGetter
ServiceClusterIPRegistry rangeallocation.RangeRegistry
ServiceClusterIPInterval time.Duration
@ -72,8 +71,8 @@ type Controller struct {
// ServiceIP indicates where the kubernetes service will live. It may not be nil.
ServiceIP net.IP
ServicePort int
ExtraServicePorts []api.ServicePort
ExtraEndpointPorts []api.EndpointPort
ExtraServicePorts []corev1.ServicePort
ExtraEndpointPorts []corev1.EndpointPort
PublicServicePort int
KubernetesServiceNodePort int
@ -81,7 +80,7 @@ type Controller struct {
}
// NewBootstrapController returns a controller for watching the core capabilities of the master
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient coreclient.ServicesGetter, nsClient coreclient.NamespacesGetter, eventClient coreclient.EventsGetter) *Controller {
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter) *Controller {
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
if err != nil {
glog.Fatalf("failed to get listener address: %v", err)
@ -230,17 +229,17 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error {
// createPortAndServiceSpec creates an array of service ports.
// If the NodePort value is 0, just the servicePort is used, otherwise, a node port is exposed.
func createPortAndServiceSpec(servicePort int, targetServicePort int, nodePort int, servicePortName string, extraServicePorts []api.ServicePort) ([]api.ServicePort, api.ServiceType) {
func createPortAndServiceSpec(servicePort int, targetServicePort int, nodePort int, servicePortName string, extraServicePorts []corev1.ServicePort) ([]corev1.ServicePort, corev1.ServiceType) {
//Use the Cluster IP type for the service port if NodePort isn't provided.
//Otherwise, we will be binding the master service to a NodePort.
servicePorts := []api.ServicePort{{Protocol: api.ProtocolTCP,
servicePorts := []corev1.ServicePort{{Protocol: corev1.ProtocolTCP,
Port: int32(servicePort),
Name: servicePortName,
TargetPort: intstr.FromInt(targetServicePort)}}
serviceType := api.ServiceTypeClusterIP
serviceType := corev1.ServiceTypeClusterIP
if nodePort > 0 {
servicePorts[0].NodePort = int32(nodePort)
serviceType = api.ServiceTypeNodePort
serviceType = corev1.ServiceTypeNodePort
}
if extraServicePorts != nil {
servicePorts = append(servicePorts, extraServicePorts...)
@ -249,8 +248,8 @@ func createPortAndServiceSpec(servicePort int, targetServicePort int, nodePort i
}
// createEndpointPortSpec creates an array of endpoint ports
func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndpointPorts []api.EndpointPort) []api.EndpointPort {
endpointPorts := []api.EndpointPort{{Protocol: api.ProtocolTCP,
func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndpointPorts []corev1.EndpointPort) []corev1.EndpointPort {
endpointPorts := []corev1.EndpointPort{{Protocol: corev1.ProtocolTCP,
Port: int32(endpointPort),
Name: endpointPortName,
}}
@ -262,7 +261,7 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndp
// CreateMasterServiceIfNeeded will create the specified service if it
// doesn't already exist.
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []api.ServicePort, serviceType api.ServiceType, reconcile bool) error {
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
if s, err := c.ServiceClient.Services(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}); err == nil {
// The service already exists.
if reconcile {
@ -274,18 +273,18 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
}
return nil
}
svc := &api.Service{
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: metav1.NamespaceDefault,
Labels: map[string]string{"provider": "kubernetes", "component": "apiserver"},
},
Spec: api.ServiceSpec{
Spec: corev1.ServiceSpec{
Ports: servicePorts,
// maintained by this code, not by the pod selector
Selector: nil,
ClusterIP: serviceIP.String(),
SessionAffinity: api.ServiceAffinityNone,
SessionAffinity: corev1.ServiceAffinityNone,
Type: serviceType,
},
}

File diff suppressed because it is too large Load Diff

View File

@ -68,12 +68,10 @@ import (
"k8s.io/client-go/informers"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
api "k8s.io/kubernetes/pkg/apis/core"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/reconcilers"
"k8s.io/kubernetes/pkg/master/tunneler"
endpointsstorage "k8s.io/kubernetes/pkg/registry/core/endpoint/storage"
"k8s.io/kubernetes/pkg/routes"
"k8s.io/kubernetes/pkg/serviceaccount"
nodeutil "k8s.io/kubernetes/pkg/util/node"
@ -144,10 +142,10 @@ type ExtraConfig struct {
// service because this pkg is linked by out-of-tree projects
// like openshift which want to use the GenericAPIServer but also do
// more stuff.
ExtraServicePorts []api.ServicePort
ExtraServicePorts []apiv1.ServicePort
// Additional ports to be exposed on the GenericAPIServer endpoints
// Port names should align with ports defined in ExtraServicePorts
ExtraEndpointPorts []api.EndpointPort
ExtraEndpointPorts []apiv1.EndpointPort
// If non-zero, the "kubernetes" services uses this port as NodePort.
KubernetesServiceNodePort int
@ -206,7 +204,7 @@ type Master struct {
}
func (c *Config) createMasterCountReconciler() reconcilers.EndpointReconciler {
endpointClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
return reconcilers.NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointClient)
}
@ -215,6 +213,7 @@ func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler {
}
func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
ttl := c.ExtraConfig.MasterEndpointReconcileTTL
config, err := c.ExtraConfig.StorageFactory.NewConfig(api.Resource("apiServerIPInfo"))
if err != nil {
@ -224,18 +223,8 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
if err != nil {
glog.Fatalf("Error creating storage factory: %v", err)
}
endpointConfig, err := c.ExtraConfig.StorageFactory.NewConfig(api.Resource("endpoints"))
if err != nil {
glog.Fatalf("Error getting storage config: %v", err)
}
endpointsStorage := endpointsstorage.NewREST(generic.RESTOptions{
StorageConfig: endpointConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: 0,
ResourcePrefix: c.ExtraConfig.StorageFactory.ResourcePrefix(api.Resource("endpoints")),
})
masterLeases := reconcilers.NewLeases(leaseStorage, "/masterleases/", ttl)
return reconcilers.NewLeaseEndpointReconciler(endpointsStorage.Store, masterLeases)
return reconcilers.NewLeaseEndpointReconciler(endpointClient, masterLeases)
}
func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
@ -386,7 +375,7 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.
}
controllerName := "bootstrap-controller"
coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient)
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)

View File

@ -12,15 +12,14 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/master/reconcilers",
visibility = ["//visibility:public"],
deps = [
"//pkg/api/endpoints:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/api/v1/endpoints:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
@ -31,9 +30,9 @@ go_test(
srcs = ["lease_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/registry/registrytest:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
],
)

View File

@ -30,14 +30,14 @@ import (
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/api/endpoints"
api "k8s.io/kubernetes/pkg/apis/core"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
)
// Leases is an interface which assists in managing the set of active masters
@ -62,7 +62,7 @@ var _ Leases = &storageLeases{}
// ListLeases retrieves a list of the current master IPs from storage
func (s *storageLeases) ListLeases() ([]string, error) {
ipInfoList := &api.EndpointsList{}
ipInfoList := &corev1.EndpointsList{}
if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil {
return nil, err
}
@ -80,12 +80,12 @@ func (s *storageLeases) ListLeases() ([]string, error) {
// UpdateLease resets the TTL on a master IP in storage
func (s *storageLeases) UpdateLease(ip string) error {
key := path.Join(s.baseKey, ip)
return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &api.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
// just make sure we've got the right IP set, and then refresh the TTL
existing := input.(*api.Endpoints)
existing.Subsets = []api.EndpointSubset{
existing := input.(*corev1.Endpoints)
existing.Subsets = []corev1.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: ip}},
Addresses: []corev1.EndpointAddress{{IP: ip}},
},
}
@ -106,7 +106,7 @@ func (s *storageLeases) UpdateLease(ip string) error {
// RemoveLease removes the lease on a master IP in storage
func (s *storageLeases) RemoveLease(ip string) error {
return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &api.Endpoints{}, nil)
return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &corev1.Endpoints{}, nil)
}
// NewLeases creates a new etcd-based Leases implementation.
@ -119,16 +119,16 @@ func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duratio
}
type leaseEndpointReconciler struct {
endpointStorage rest.StandardStorage
endpointClient corev1client.EndpointsGetter
masterLeases Leases
stopReconcilingCalled bool
reconcilingLock sync.Mutex
}
// NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler
func NewLeaseEndpointReconciler(endpointStorage rest.StandardStorage, masterLeases Leases) EndpointReconciler {
func NewLeaseEndpointReconciler(endpointClient corev1client.EndpointsGetter, masterLeases Leases) EndpointReconciler {
return &leaseEndpointReconciler{
endpointStorage: endpointStorage,
endpointClient: endpointClient,
masterLeases: masterLeases,
stopReconcilingCalled: false,
}
@ -141,7 +141,7 @@ func NewLeaseEndpointReconciler(endpointStorage rest.StandardStorage, masterLeas
// expire. ReconcileEndpoints will notice that the endpoints object is
// different from the directory listing, and update the endpoints object
// accordingly.
func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
@ -159,25 +159,21 @@ func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.
return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
}
func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
ctx := apirequest.NewDefaultContext()
// Retrieve the current list of endpoints...
var e *api.Endpoints
obj, err := r.endpointStorage.Get(ctx, serviceName, &metav1.GetOptions{})
func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
e, err := r.endpointClient.Endpoints(corev1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
shouldCreate := false
if err != nil {
if !errors.IsNotFound(err) {
return err
}
e = &api.Endpoints{
shouldCreate = true
e = &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: api.NamespaceDefault,
Namespace: corev1.NamespaceDefault,
},
}
} else {
e = obj.(*api.Endpoints)
}
// ... and the list of master IP keys from etcd
@ -201,21 +197,21 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts
if !formatCorrect {
// Something is egregiously wrong, just re-make the endpoints record.
e.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{},
e.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{},
Ports: endpointPorts,
}}
}
if !formatCorrect || !ipCorrect {
// repopulate the addresses according to the expected IPs from etcd
e.Subsets[0].Addresses = make([]api.EndpointAddress, len(masterIPs))
e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs))
for ind, ip := range masterIPs {
e.Subsets[0].Addresses[ind] = api.EndpointAddress{IP: ip}
e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip}
}
// Lexicographic order is retained by this step.
e.Subsets = endpoints.RepackSubsets(e.Subsets)
e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
}
if !portsCorrect {
@ -224,7 +220,13 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts
}
glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs)
_, _, err = r.endpointStorage.Update(ctx, e.Name, rest.DefaultUpdatedObjectInfo(e), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if shouldCreate {
if _, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Create(e); errors.IsAlreadyExists(err) {
err = nil
}
} else {
_, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Update(e)
}
return err
}
@ -236,7 +238,7 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts
// * ipsCorrect when the addresses in the endpoints match the expected addresses list
// * portsCorrect is true when endpoint ports exactly match provided ports.
// portsCorrect is only evaluated when reconcilePorts is set to true.
func checkEndpointSubsetFormatWithLease(e *api.Endpoints, expectedIPs []string, ports []api.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) {
func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []string, ports []corev1.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) {
if len(e.Subsets) != 1 {
return false, false, false
}
@ -281,7 +283,7 @@ func checkEndpointSubsetFormatWithLease(e *api.Endpoints, expectedIPs []string,
return true, ipsCorrect, portsCorrect
}
func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
r.stopReconcilingCalled = true

View File

@ -26,9 +26,9 @@ import (
"reflect"
"testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/client-go/kubernetes/fake"
)
type fakeLeases struct {
@ -76,7 +76,7 @@ func (f *fakeLeases) GetUpdatedKeys() []string {
}
func TestLeaseEndpointReconciler(t *testing.T) {
ns := api.NamespaceDefault
ns := corev1.NamespaceDefault
om := func(name string) metav1.ObjectMeta {
return metav1.ObjectMeta{Namespace: ns, Name: name}
}
@ -84,22 +84,22 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName string
serviceName string
ip string
endpointPorts []api.EndpointPort
endpointPorts []corev1.EndpointPort
endpointKeys []string
endpoints *api.EndpointsList
expectUpdate *api.Endpoints // nil means none expected
endpoints *corev1.EndpointsList
expectUpdate *corev1.Endpoints // nil means none expected
}{
{
testName: "no existing endpoints",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: nil,
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -107,13 +107,13 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints satisfy",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
@ -122,14 +122,14 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints satisfy + refresh existing key",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4"},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
@ -138,21 +138,21 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints satisfy but too many",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -160,33 +160,33 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints satisfy but too many + extra masters",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -194,33 +194,33 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints satisfy but too many + extra masters + delete first",
serviceName: "foo",
ip: "4.3.2.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -228,27 +228,27 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints current IP missing",
serviceName: "foo",
ip: "4.3.2.2",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"4.3.2.1"},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "4.3.2.1"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -256,21 +256,21 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints wrong name",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("bar"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -278,21 +278,21 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints wrong IP",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "4.3.2.1"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "4.3.2.1"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -300,21 +300,21 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints wrong port",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -322,21 +322,21 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints wrong protocol",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -344,21 +344,21 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints wrong port name",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -366,17 +366,17 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints extra service ports satisfy",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{
endpointPorts: []corev1.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"},
{Name: "baz", Port: 1010, Protocol: "TCP"},
},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"},
{Name: "baz", Port: 1010, Protocol: "TCP"},
@ -389,24 +389,24 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints extra service ports missing port",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{
endpointPorts: []corev1.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"},
},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"},
},
@ -417,24 +417,29 @@ func TestLeaseEndpointReconciler(t *testing.T) {
for _, test := range reconcileTests {
fakeLeases := newFakeLeases()
fakeLeases.SetKeys(test.endpointKeys)
registry := &registrytest.EndpointRegistry{
Endpoints: test.endpoints,
clientset := fake.NewSimpleClientset()
if test.endpoints != nil {
for _, ep := range test.endpoints.Items {
if _, err := clientset.CoreV1().Endpoints(ep.Namespace).Create(&ep); err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
continue
}
}
}
r := NewLeaseEndpointReconciler(registry, fakeLeases)
r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases)
err := r.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
actualEndpoints, err := clientset.CoreV1().Endpoints(corev1.NamespaceDefault).Get(test.serviceName, metav1.GetOptions{})
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
if test.expectUpdate != nil {
if len(registry.Updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates)
} else if e, a := test.expectUpdate, &registry.Updates[0]; !reflect.DeepEqual(e, a) {
if e, a := test.expectUpdate, actualEndpoints; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectUpdate == nil && len(registry.Updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates)
}
if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip {
t.Errorf("case %q: expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", test.testName, updatedKeys)
}
@ -444,25 +449,25 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName string
serviceName string
ip string
endpointPorts []api.EndpointPort
endpointPorts []corev1.EndpointPort
endpointKeys []string
endpoints *api.EndpointsList
expectUpdate *api.Endpoints // nil means none expected
endpoints *corev1.EndpointsList
expectUpdate *corev1.Endpoints // nil means none expected
}{
{
testName: "existing endpoints extra service ports missing port no update",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{
endpointPorts: []corev1.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"},
},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
@ -472,24 +477,24 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "existing endpoints extra service ports, wrong ports, wrong IP",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{
endpointPorts: []corev1.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"},
},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "4.3.2.1"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "4.3.2.1"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -497,13 +502,13 @@ func TestLeaseEndpointReconciler(t *testing.T) {
testName: "no existing endpoints",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: nil,
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -512,24 +517,29 @@ func TestLeaseEndpointReconciler(t *testing.T) {
t.Run(test.testName, func(t *testing.T) {
fakeLeases := newFakeLeases()
fakeLeases.SetKeys(test.endpointKeys)
registry := &registrytest.EndpointRegistry{
Endpoints: test.endpoints,
clientset := fake.NewSimpleClientset()
if test.endpoints != nil {
for _, ep := range test.endpoints.Items {
if _, err := clientset.CoreV1().Endpoints(ep.Namespace).Create(&ep); err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
continue
}
}
}
r := NewLeaseEndpointReconciler(registry, fakeLeases)
r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases)
err := r.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
actualEndpoints, err := clientset.CoreV1().Endpoints(corev1.NamespaceDefault).Get(test.serviceName, metav1.GetOptions{})
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
if test.expectUpdate != nil {
if len(registry.Updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates)
} else if e, a := test.expectUpdate, &registry.Updates[0]; !reflect.DeepEqual(e, a) {
if e, a := test.expectUpdate, actualEndpoints; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectUpdate == nil && len(registry.Updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates)
}
if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip {
t.Errorf("case %q: expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", test.testName, updatedKeys)
}
@ -538,7 +548,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
}
func TestLeaseStopReconciling(t *testing.T) {
ns := api.NamespaceDefault
ns := corev1.NamespaceDefault
om := func(name string) metav1.ObjectMeta {
return metav1.ObjectMeta{Namespace: ns, Name: name}
}
@ -546,40 +556,40 @@ func TestLeaseStopReconciling(t *testing.T) {
testName string
serviceName string
ip string
endpointPorts []api.EndpointPort
endpointPorts []corev1.EndpointPort
endpointKeys []string
endpoints *api.EndpointsList
expectUpdate *api.Endpoints // nil means none expected
endpoints *corev1.EndpointsList
expectUpdate *corev1.Endpoints // nil means none expected
}{
{
testName: "successful stop reconciling",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -587,19 +597,19 @@ func TestLeaseStopReconciling(t *testing.T) {
testName: "stop reconciling with ip not in endpoint ip list",
serviceName: "foo",
ip: "5.6.7.8",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
endpoints: &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
@ -609,24 +619,27 @@ func TestLeaseStopReconciling(t *testing.T) {
t.Run(test.testName, func(t *testing.T) {
fakeLeases := newFakeLeases()
fakeLeases.SetKeys(test.endpointKeys)
registry := &registrytest.EndpointRegistry{
Endpoints: test.endpoints,
clientset := fake.NewSimpleClientset()
for _, ep := range test.endpoints.Items {
if _, err := clientset.CoreV1().Endpoints(ep.Namespace).Create(&ep); err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
continue
}
}
r := NewLeaseEndpointReconciler(registry, fakeLeases)
r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases)
err := r.StopReconciling(test.serviceName, net.ParseIP(test.ip), test.endpointPorts)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
actualEndpoints, err := clientset.CoreV1().Endpoints(corev1.NamespaceDefault).Get(test.serviceName, metav1.GetOptions{})
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
if test.expectUpdate != nil {
if len(registry.Updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates)
} else if e, a := test.expectUpdate, &registry.Updates[0]; !reflect.DeepEqual(e, a) {
if e, a := test.expectUpdate, actualEndpoints; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectUpdate == nil && len(registry.Updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates)
}
for _, key := range fakeLeases.GetUpdatedKeys() {
if key == test.ip {
t.Errorf("case %q: Found ip %s in leases but shouldn't be there", test.testName, key)

View File

@ -22,26 +22,26 @@ import (
"sync"
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/api/endpoints"
api "k8s.io/kubernetes/pkg/apis/core"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
)
// masterCountEndpointReconciler reconciles endpoints based on a specified expected number of
// masters. masterCountEndpointReconciler implements EndpointReconciler.
type masterCountEndpointReconciler struct {
masterCount int
endpointClient coreclient.EndpointsGetter
endpointClient corev1client.EndpointsGetter
stopReconcilingCalled bool
reconcilingLock sync.Mutex
}
// NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a
// specified expected number of masters.
func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient.EndpointsGetter) EndpointReconciler {
func NewMasterCountEndpointReconciler(masterCount int, endpointClient corev1client.EndpointsGetter) EndpointReconciler {
return &masterCountEndpointReconciler{
masterCount: masterCount,
endpointClient: endpointClient,
@ -60,7 +60,7 @@ func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient
// * All apiservers MUST know and agree on the number of apiservers expected
// to be running (c.masterCount).
// * ReconcileEndpoints is called periodically from all apiservers.
func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
@ -70,7 +70,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
if err != nil {
e = &api.Endpoints{
e = &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: metav1.NamespaceDefault,
@ -79,8 +79,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
}
if errors.IsNotFound(err) {
// Simply create non-existing endpoints for the service.
e.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: ip.String()}},
e.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
Ports: endpointPorts,
}}
_, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Create(e)
@ -92,8 +92,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts)
if !formatCorrect {
// Something is egregiously wrong, just re-make the endpoints record.
e.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: ip.String()}},
e.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
Ports: endpointPorts,
}}
glog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e)
@ -105,10 +105,10 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
}
if !ipCorrect {
// We *always* add our own IP address.
e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, api.EndpointAddress{IP: ip.String()})
e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, corev1.EndpointAddress{IP: ip.String()})
// Lexicographic order is retained by this step.
e.Subsets = endpoints.RepackSubsets(e.Subsets)
e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
// If too many IP addresses, remove the ones lexicographically after our
// own IP address. Given the requirements stated at the top of
@ -137,7 +137,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
return err
}
func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
r.stopReconcilingCalled = true
@ -152,14 +152,14 @@ func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip n
}
// Remove our IP from the list of addresses
new := []api.EndpointAddress{}
new := []corev1.EndpointAddress{}
for _, addr := range e.Subsets[0].Addresses {
if addr.IP != ip.String() {
new = append(new, addr)
}
}
e.Subsets[0].Addresses = new
e.Subsets = endpoints.RepackSubsets(e.Subsets)
e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
_, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e)
return err
@ -175,7 +175,7 @@ func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip n
// of addresses is less than or equal to the master count.
// * portsCorrect is true when endpoint ports exactly match provided ports.
// portsCorrect is only evaluated when reconcilePorts is set to true.
func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) {
func checkEndpointSubsetFormat(e *corev1.Endpoints, ip string, ports []corev1.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) {
if len(e.Subsets) != 1 {
return false, false, false
}
@ -214,7 +214,7 @@ func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.Endpoint
// * All apiservers MUST use GetMasterServiceUpdateIfNeeded and only
// GetMasterServiceUpdateIfNeeded to manage service attributes
// * updateMasterService is called periodically from all apiservers.
func GetMasterServiceUpdateIfNeeded(svc *api.Service, servicePorts []api.ServicePort, serviceType api.ServiceType) (s *api.Service, updated bool) {
func GetMasterServiceUpdateIfNeeded(svc *corev1.Service, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType) (s *corev1.Service, updated bool) {
// Determine if the service is in the format we expect
// (servicePorts are present and service type matches)
formatCorrect := checkServiceFormat(svc, servicePorts, serviceType)
@ -229,7 +229,7 @@ func GetMasterServiceUpdateIfNeeded(svc *api.Service, servicePorts []api.Service
// Determine if the service is in the correct format
// GetMasterServiceUpdateIfNeeded expects (servicePorts are correct
// and service type matches).
func checkServiceFormat(s *api.Service, ports []api.ServicePort, serviceType api.ServiceType) (formatCorrect bool) {
func checkServiceFormat(s *corev1.Service, ports []corev1.ServicePort, serviceType corev1.ServiceType) (formatCorrect bool) {
if s.Spec.Type != serviceType {
return false
}

View File

@ -18,7 +18,7 @@ limitations under the License.
package reconcilers
import (
api "k8s.io/kubernetes/pkg/apis/core"
corev1 "k8s.io/api/core/v1"
"net"
)
@ -32,11 +32,11 @@ func NewNoneEndpointReconciler() EndpointReconciler {
}
// ReconcileEndpoints noop reconcile
func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
return nil
}
// StopReconciling noop reconcile
func (r *noneEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
func (r *noneEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
return nil
}

View File

@ -18,7 +18,7 @@ limitations under the License.
package reconcilers
import (
api "k8s.io/kubernetes/pkg/apis/core"
corev1 "k8s.io/api/core/v1"
"net"
)
@ -34,8 +34,8 @@ type EndpointReconciler interface {
// * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the
// endpoints for their {rw, ro} services.
// * ReconcileEndpoints is called periodically from all apiservers.
ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error
StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error
ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error
StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error
}
// Type the reconciler type

View File

@ -13,8 +13,7 @@ go_library(
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/helper:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/registry/core/rangeallocation:go_default_library",
"//pkg/registry/core/service/ipallocator:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -22,6 +21,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
],
@ -33,9 +33,10 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/registry/core/service/ipallocator:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
],
)

View File

@ -26,12 +26,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/helper"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
)
@ -53,7 +53,7 @@ import (
// TODO: perform repair?
type Repair struct {
interval time.Duration
serviceClient coreclient.ServicesGetter
serviceClient corev1client.ServicesGetter
network *net.IPNet
alloc rangeallocation.RangeRegistry
leaks map[string]int // counter per leaked IP
@ -66,9 +66,9 @@ const numRepairsBeforeLeakCleanup = 3
// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, eventClient coreclient.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&coreclient.EventSinkImpl{Interface: eventClient.Events("")})
eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: eventClient.Events("")})
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "ipallocator-repair-controller"})
return &Repair{

View File

@ -22,9 +22,10 @@ import (
"strings"
"testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
)
@ -134,29 +135,29 @@ func TestRepairWithExisting(t *testing.T) {
}
fakeClient := fake.NewSimpleClientset(
&api.Service{
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "one"},
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
Spec: corev1.ServiceSpec{ClusterIP: "192.168.1.1"},
},
&api.Service{
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "two", Name: "two"},
Spec: api.ServiceSpec{ClusterIP: "192.168.1.100"},
Spec: corev1.ServiceSpec{ClusterIP: "192.168.1.100"},
},
&api.Service{ // outside CIDR, will be dropped
&corev1.Service{ // outside CIDR, will be dropped
ObjectMeta: metav1.ObjectMeta{Namespace: "three", Name: "three"},
Spec: api.ServiceSpec{ClusterIP: "192.168.0.1"},
Spec: corev1.ServiceSpec{ClusterIP: "192.168.0.1"},
},
&api.Service{ // empty, ignored
&corev1.Service{ // empty, ignored
ObjectMeta: metav1.ObjectMeta{Namespace: "four", Name: "four"},
Spec: api.ServiceSpec{ClusterIP: ""},
Spec: corev1.ServiceSpec{ClusterIP: ""},
},
&api.Service{ // duplicate, dropped
&corev1.Service{ // duplicate, dropped
ObjectMeta: metav1.ObjectMeta{Namespace: "five", Name: "five"},
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
Spec: corev1.ServiceSpec{ClusterIP: "192.168.1.1"},
},
&api.Service{ // headless
&corev1.Service{ // headless
ObjectMeta: metav1.ObjectMeta{Namespace: "six", Name: "six"},
Spec: api.ServiceSpec{ClusterIP: "None"},
Spec: corev1.ServiceSpec{ClusterIP: "None"},
},
)

View File

@ -13,7 +13,6 @@ go_library(
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/registry/core/rangeallocation:go_default_library",
"//pkg/registry/core/service/portallocator:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -22,6 +21,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
],
@ -33,10 +33,11 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/registry/core/service/portallocator:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
],
)

View File

@ -21,16 +21,17 @@ import (
"time"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
)
@ -38,7 +39,7 @@ import (
// See ipallocator/controller/repair.go; this is a copy for ports.
type Repair struct {
interval time.Duration
serviceClient coreclient.ServicesGetter
serviceClient corev1client.ServicesGetter
portRange net.PortRange
alloc rangeallocation.RangeRegistry
leaks map[int]int // counter per leaked port
@ -51,9 +52,9 @@ const numRepairsBeforeLeakCleanup = 3
// NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, eventClient coreclient.EventsGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&coreclient.EventSinkImpl{Interface: eventClient.Events("")})
eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: eventClient.Events("")})
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "portallocator-repair-controller"})
return &Repair{
@ -196,7 +197,7 @@ func (c *Repair) runOnce() error {
return nil
}
func collectServiceNodePorts(service *api.Service) []int {
func collectServiceNodePorts(service *corev1.Service) []int {
servicePorts := []int{}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]

View File

@ -21,10 +21,11 @@ import (
"strings"
"testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/kubernetes/fake"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
)
@ -134,39 +135,39 @@ func TestRepairWithExisting(t *testing.T) {
}
fakeClient := fake.NewSimpleClientset(
&api.Service{
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "one"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{NodePort: 111}},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{NodePort: 111}},
},
},
&api.Service{
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "two", Name: "two"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{NodePort: 122}, {NodePort: 133}},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{NodePort: 122}, {NodePort: 133}},
},
},
&api.Service{ // outside range, will be dropped
&corev1.Service{ // outside range, will be dropped
ObjectMeta: metav1.ObjectMeta{Namespace: "three", Name: "three"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{NodePort: 201}},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{NodePort: 201}},
},
},
&api.Service{ // empty, ignored
&corev1.Service{ // empty, ignored
ObjectMeta: metav1.ObjectMeta{Namespace: "four", Name: "four"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{}},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{}},
},
},
&api.Service{ // duplicate, dropped
&corev1.Service{ // duplicate, dropped
ObjectMeta: metav1.ObjectMeta{Namespace: "five", Name: "five"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{NodePort: 111}},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{NodePort: 111}},
},
},
&api.Service{
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "six", Name: "six"},
Spec: api.ServiceSpec{
Spec: corev1.ServiceSpec{
HealthCheckNodePort: 144,
},
},