diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 9903535998..08a7934fd4 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -42,6 +42,7 @@ import ( namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace" replicasetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" + servicednscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service/dns" synccontroller "k8s.io/kubernetes/federation/pkg/federation-controller/sync" "k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/version" @@ -136,9 +137,9 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err clustercontroller.StartClusterController(restClientCfg, stopChan, s.ClusterMonitorPeriod.Duration) if controllerEnabled(s.Controllers, serverResources, servicecontroller.ControllerName, servicecontroller.RequiredResources, true) { - if controllerEnabled(s.Controllers, serverResources, servicecontroller.DNSControllerName, servicecontroller.RequiredResources, true) { - serviceDNScontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.DNSUserAgentName)) - serviceDNSController, err := servicecontroller.NewServiceDNSController(serviceDNScontrollerClientset, s.DnsProvider, s.DnsConfigFile, s.FederationName, s.ServiceDnsSuffix, s.ZoneName, s.ZoneID) + if controllerEnabled(s.Controllers, serverResources, servicednscontroller.ControllerName, servicecontroller.RequiredResources, true) { + serviceDNScontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicednscontroller.UserAgentName)) + serviceDNSController, err := servicednscontroller.NewServiceDNSController(serviceDNScontrollerClientset, s.DnsProvider, s.DnsConfigFile, s.FederationName, s.ServiceDnsSuffix, s.ZoneName, s.ZoneID) if err != nil { glog.Fatalf("Failed to start service dns controller: %v", err) } else { diff --git a/federation/pkg/federation-controller/service/dns.go b/federation/pkg/federation-controller/service/dns/dns.go similarity index 98% rename from federation/pkg/federation-controller/service/dns.go rename to federation/pkg/federation-controller/service/dns/dns.go index 69b9e663dd..5d92989b3c 100644 --- a/federation/pkg/federation-controller/service/dns.go +++ b/federation/pkg/federation-controller/service/dns/dns.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package service +package dns import ( "fmt" @@ -35,18 +35,21 @@ import ( fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/federation/pkg/dnsprovider" "k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype" + "k8s.io/kubernetes/federation/pkg/federation-controller/service/ingress" "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/pkg/api/v1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" ) const ( - DNSControllerName = "service-dns" + ControllerName = "service-dns" - DNSUserAgentName = "federation-service-dns-controller" + UserAgentName = "federation-service-dns-controller" // minDNSTTL is the minimum safe DNS TTL value to use (in seconds). We use this as the TTL for all DNS records. minDNSTTL = 180 + + serviceSyncPeriod = 30 * time.Second ) type ServiceDNSController struct { @@ -149,7 +152,7 @@ func (s *ServiceDNSController) workerFunction() bool { return false } - ingress, err := ParseFederatedServiceIngress(service) + ingress, err := ingress.ParseFederatedServiceIngress(service) if err != nil { runtime.HandleError(fmt.Errorf("Error in parsing lb ingress for service %s/%s: %v", service.Namespace, service.Name, err)) return false @@ -237,7 +240,7 @@ func (s *ServiceDNSController) getHealthyEndpoints(clusterName string, service * return zoneEndpoints, regionEndpoints, globalEndpoints, nil } - serviceIngress, err := ParseFederatedServiceIngress(service) + serviceIngress, err := ingress.ParseFederatedServiceIngress(service) if err != nil { return nil, nil, nil, err } diff --git a/federation/pkg/federation-controller/service/dns_test.go b/federation/pkg/federation-controller/service/dns/dns_test.go similarity index 89% rename from federation/pkg/federation-controller/service/dns_test.go rename to federation/pkg/federation-controller/service/dns/dns_test.go index 6b9e40890a..d79678822d 100644 --- a/federation/pkg/federation-controller/service/dns_test.go +++ b/federation/pkg/federation-controller/service/dns/dns_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package service +package dns import ( "fmt" @@ -27,10 +27,19 @@ import ( "k8s.io/kubernetes/federation/apis/federation/v1beta1" fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes. + "k8s.io/kubernetes/federation/pkg/federation-controller/service/ingress" . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" "k8s.io/kubernetes/pkg/api/v1" ) +// NewClusterWithRegionZone builds a new cluster object with given region and zone attributes. +func NewClusterWithRegionZone(name string, readyStatus v1.ConditionStatus, region, zone string) *v1beta1.Cluster { + cluster := NewCluster(name, readyStatus) + cluster.Status.Zones = []string{zone} + cluster.Status.Region = region + return cluster +} + func TestServiceController_ensureDnsRecords(t *testing.T) { cluster1Name := "c1" cluster2Name := "c2" @@ -51,7 +60,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { name: "ServiceWithSingleLBIngress", service: v1.Service{ ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ - FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). + ingress.FederatedServiceIngressAnnotation: ingress.NewFederatedServiceIngress(). AddEndpoints(cluster1Name, []string{"198.51.100.1"}). AddEndpoints(cluster2Name, []string{}). String()}, @@ -84,7 +93,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { name: "ServiceWithNoLBIngress", service: v1.Service{ ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ - FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). + ingress.FederatedServiceIngressAnnotation: ingress.NewFederatedServiceIngress(). AddEndpoints(cluster1Name, []string{}). AddEndpoints(cluster2Name, []string{}). String()}, @@ -101,7 +110,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { name: "ServiceWithMultipleLBIngress", service: v1.Service{ ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ - FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). + ingress.FederatedServiceIngressAnnotation: ingress.NewFederatedServiceIngress(). AddEndpoints(cluster1Name, []string{"198.51.100.1"}). AddEndpoints(cluster2Name, []string{"198.51.200.1"}). String()}, @@ -119,7 +128,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { name: "ServiceWithLBIngressAndServiceDeleted", service: v1.Service{ ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ - FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). + ingress.FederatedServiceIngressAnnotation: ingress.NewFederatedServiceIngress(). AddEndpoints(cluster1Name, []string{"198.51.100.1"}). AddEndpoints(cluster2Name, []string{"198.51.200.1"}). String()}, @@ -138,7 +147,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { name: "ServiceWithMultipleLBIngressAndOneLBIngressGettingRemoved", service: v1.Service{ ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ - FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). + ingress.FederatedServiceIngressAnnotation: ingress.NewFederatedServiceIngress(). AddEndpoints(cluster1Name, []string{"198.51.100.1"}). AddEndpoints(cluster2Name, []string{"198.51.200.1"}). RemoveEndpoint(cluster2Name, "198.51.200.1"). @@ -157,7 +166,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { name: "ServiceWithMultipleLBIngressAndAllLBIngressGettingRemoved", service: v1.Service{ ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ - FederatedServiceIngressAnnotation: NewFederatedServiceIngress(). + ingress.FederatedServiceIngressAnnotation: ingress.NewFederatedServiceIngress(). AddEndpoints(cluster1Name, []string{"198.51.100.1"}). AddEndpoints(cluster2Name, []string{"198.51.200.1"}). RemoveEndpoint(cluster1Name, "198.51.100.1"). @@ -198,7 +207,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) { test.service.Name = "servicename" test.service.Namespace = "servicenamespace" - ingress, err := ParseFederatedServiceIngress(&test.service) + ingress, err := ingress.ParseFederatedServiceIngress(&test.service) if err != nil { t.Errorf("Error in parsing lb ingress for service %s/%s: %v", test.service.Namespace, test.service.Name, err) return diff --git a/federation/pkg/federation-controller/service/ingress.go b/federation/pkg/federation-controller/service/ingress/ingress.go similarity index 99% rename from federation/pkg/federation-controller/service/ingress.go rename to federation/pkg/federation-controller/service/ingress/ingress.go index 0fdf910340..d78f7d4835 100644 --- a/federation/pkg/federation-controller/service/ingress.go +++ b/federation/pkg/federation-controller/service/ingress/ingress.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package service +package ingress import ( "encoding/json" diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 10b6c2a2b8..2e89f15095 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -41,6 +41,7 @@ import ( fedapi "k8s.io/kubernetes/federation/apis/federation" v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" + "k8s.io/kubernetes/federation/pkg/federation-controller/service/ingress" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util/clusterselector" "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" @@ -52,7 +53,7 @@ import ( ) const ( - serviceSyncPeriod = 10 * time.Minute + serviceSyncPeriod = 30 * time.Second UserAgentName = "federation-service-controller" @@ -472,7 +473,7 @@ func (s *ServiceController) reconcileService(key string) reconciliationStatus { } newLBStatus := newLoadbalancerStatus() - newServiceIngress := NewFederatedServiceIngress() + newServiceIngress := ingress.NewFederatedServiceIngress() operations := make([]fedutil.FederatedOperation, 0) for _, cluster := range clusters { // Aggregate all operations to perform on all federated clusters @@ -660,7 +661,7 @@ func (s *ServiceController) getServiceEndpointsInCluster(cluster *v1beta1.Cluste // updateFederatedService updates the federated service with aggregated lbStatus and serviceIngresses // and also updates the dns records as needed -func (s *ServiceController) updateFederatedService(fedService *v1.Service, newLBStatus *loadbalancerStatus, newServiceIngress *FederatedServiceIngress) error { +func (s *ServiceController) updateFederatedService(fedService *v1.Service, newLBStatus *loadbalancerStatus, newServiceIngress *ingress.FederatedServiceIngress) error { key := types.NamespacedName{Namespace: fedService.Namespace, Name: fedService.Name}.String() needUpdate := false @@ -672,7 +673,7 @@ func (s *ServiceController) updateFederatedService(fedService *v1.Service, newLB needUpdate = true } - existingServiceIngress, err := ParseFederatedServiceIngress(fedService) + existingServiceIngress, err := ingress.ParseFederatedServiceIngress(fedService) if err != nil { runtime.HandleError(fmt.Errorf("Failed to parse endpoint annotations for service %s: %v", key, err)) return err @@ -695,7 +696,7 @@ func (s *ServiceController) updateFederatedService(fedService *v1.Service, newLB // Update federated service status and/or ingress annotations if changed sort.Sort(newServiceIngress) if !reflect.DeepEqual(existingServiceIngress.Items, newServiceIngress.Items) { - fedService = UpdateIngressAnnotation(fedService, newServiceIngress) + fedService = ingress.UpdateIngressAnnotation(fedService, newServiceIngress) glog.V(3).Infof("Federated service loadbalancer ingress updated for %s: existing: %#v, desired: %#v", key, existingServiceIngress, newServiceIngress) needUpdate = true } diff --git a/federation/pkg/federation-controller/service/servicecontroller_test.go b/federation/pkg/federation-controller/service/servicecontroller_test.go index a98ae0936c..8e6b196719 100644 --- a/federation/pkg/federation-controller/service/servicecontroller_test.go +++ b/federation/pkg/federation-controller/service/servicecontroller_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/federation/apis/federation/v1beta1" fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" + "k8s.io/kubernetes/federation/pkg/federation-controller/service/ingress" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" "k8s.io/kubernetes/pkg/api/v1" @@ -58,8 +59,8 @@ var awfulError error = errors.NewGone("Something bad happened") func TestServiceController(t *testing.T) { glog.Infof("Creating fake infrastructure") fedClient := &fakefedclientset.Clientset{} - cluster1 := NewClusterWithRegionZone("cluster1", v1.ConditionTrue, "region1", "zone1") - cluster2 := NewClusterWithRegionZone("cluster2", v1.ConditionTrue, "region2", "zone2") + cluster1 := NewCluster("cluster1", v1.ConditionTrue) + cluster2 := NewCluster("cluster2", v1.ConditionTrue) RegisterFakeClusterGet(&fedClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*cluster1, *cluster2}}) RegisterFakeList(clusters, &fedClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*cluster1, *cluster2}}) @@ -151,10 +152,10 @@ func TestServiceController(t *testing.T) { key, desiredService, serviceStatusCompare, wait.ForeverTestTimeout)) glog.Infof("Test federation service is updated when cluster1 endpoint for the service is created") - desiredIngressAnnotation := NewFederatedServiceIngress(). + desiredIngressAnnotation := ingress.NewFederatedServiceIngress(). AddEndpoints("cluster1", []string{lbIngress1}). String() - desiredService = &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{FederatedServiceIngressAnnotation: desiredIngressAnnotation}}} + desiredService = &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ingress.FederatedServiceIngressAnnotation: desiredIngressAnnotation}}} c1EndpointWatch.Add(NewEndpoint("test-service-1", serviceEndpoint1)) require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore, key, desiredService, serviceIngressCompare, wait.ForeverTestTimeout)) @@ -175,20 +176,20 @@ func TestServiceController(t *testing.T) { key, desiredService, serviceStatusCompare, wait.ForeverTestTimeout)) glog.Infof("Test federation service is updated when cluster2 endpoint for the service is created") - desiredIngressAnnotation = NewFederatedServiceIngress(). + desiredIngressAnnotation = ingress.NewFederatedServiceIngress(). AddEndpoints("cluster1", []string{lbIngress1}). AddEndpoints("cluster2", []string{lbIngress2}). String() - desiredService = &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{FederatedServiceIngressAnnotation: desiredIngressAnnotation}}} + desiredService = &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ingress.FederatedServiceIngressAnnotation: desiredIngressAnnotation}}} c2EndpointWatch.Add(NewEndpoint("test-service-1", serviceEndpoint2)) require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore, key, desiredService, serviceIngressCompare, wait.ForeverTestTimeout)) glog.Infof("Test federation service is updated when cluster1 endpoint for the service is deleted") - desiredIngressAnnotation = NewFederatedServiceIngress(). + desiredIngressAnnotation = ingress.NewFederatedServiceIngress(). AddEndpoints("cluster2", []string{lbIngress2}). String() - desiredService = &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{FederatedServiceIngressAnnotation: desiredIngressAnnotation}}} + desiredService = &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ingress.FederatedServiceIngressAnnotation: desiredIngressAnnotation}}} c1EndpointWatch.Delete(NewEndpoint("test-service-1", serviceEndpoint1)) require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore, key, desiredService, serviceIngressCompare, wait.ForeverTestTimeout)) @@ -211,7 +212,7 @@ func TestServiceController(t *testing.T) { func TestGetOperationsToPerformOnCluster(t *testing.T) { obj := NewService("test-service-1", 80) - cluster1 := NewClusterWithRegionZone("cluster1", v1.ConditionTrue, "region1", "zone1") + cluster1 := NewCluster("cluster1", v1.ConditionTrue) fedClient := &fakefedclientset.Clientset{} sc := New(fedClient) @@ -287,14 +288,6 @@ func NewEndpoint(name, ip string) *v1.Endpoints { } } -// NewClusterWithRegionZone builds a new cluster object with given region and zone attributes. -func NewClusterWithRegionZone(name string, readyStatus v1.ConditionStatus, region, zone string) *v1beta1.Cluster { - cluster := NewCluster(name, readyStatus) - cluster.Status.Zones = []string{zone} - cluster.Status.Region = region - return cluster -} - // WaitForClusterService waits for the cluster service to be created matching the desiredService. func WaitForClusterService(t *testing.T, store fedutil.FederatedReadOnlyStore, clusterName, key string, desiredService *v1.Service, timeout time.Duration) error { err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { @@ -325,11 +318,11 @@ func serviceStatusCompare(current, desired *v1.Service) bool { } func serviceIngressCompare(current, desired *v1.Service) bool { - if strings.Compare(current.Annotations[FederatedServiceIngressAnnotation], desired.Annotations[FederatedServiceIngressAnnotation]) != 0 { - glog.V(5).Infof("Waiting for loadbalancer ingress, Current: %v, Desired: %v", current.Annotations[FederatedServiceIngressAnnotation], desired.Annotations[FederatedServiceIngressAnnotation]) + if strings.Compare(current.Annotations[ingress.FederatedServiceIngressAnnotation], desired.Annotations[ingress.FederatedServiceIngressAnnotation]) != 0 { + glog.V(5).Infof("Waiting for loadbalancer ingress, Current: %v, Desired: %v", current.Annotations[ingress.FederatedServiceIngressAnnotation], desired.Annotations[ingress.FederatedServiceIngressAnnotation]) return false } - glog.V(5).Infof("Loadbalancer ingress match: %v", current.Annotations[FederatedServiceIngressAnnotation]) + glog.V(5).Infof("Loadbalancer ingress match: %v", current.Annotations[ingress.FederatedServiceIngressAnnotation]) return true }