From 61f43c59246b5485049b595b72d761bc812e487f Mon Sep 17 00:00:00 2001 From: nikhiljindal Date: Mon, 20 Jun 2016 00:40:24 -0700 Subject: [PATCH] Updating KubeDNS to try a local service first for federation query --- cmd/kube-dns/app/server.go | 5 +- pkg/dns/dns.go | 154 ++++++++++++++++++++++++++++++++----- pkg/dns/dns_test.go | 126 +++++++++++++++++++++++++++--- 3 files changed, 253 insertions(+), 32 deletions(-) diff --git a/cmd/kube-dns/app/server.go b/cmd/kube-dns/app/server.go index e16439ff9d..11c76f6ce9 100644 --- a/cmd/kube-dns/app/server.go +++ b/cmd/kube-dns/app/server.go @@ -53,7 +53,10 @@ func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer { } ks.healthzPort = config.HealthzPort ks.dnsPort = config.DNSPort - ks.kd = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.Federations) + ks.kd, err = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.Federations) + if err != nil { + glog.Fatalf("Failed to start kubeDNS: %v", err) + } return &ks } diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index 10aa0363ac..08ce75fc65 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -92,8 +92,15 @@ type KubeDNS struct { // A Records and SRV Records for (regular) services and headless Services. cache *TreeCache + // TODO(nikhiljindal): Remove this. It can be recreated using clusterIPServiceMap. reverseRecordMap map[string]*skymsg.Service + // Map of cluster IP to service object. Headless services are not part of this map. + // Used to get a service when given its cluster IP. + // Access to this is coordinated using cacheLock. We use the same lock for cache and this map + // to ensure that they dont get out of sync. + clusterIPServiceMap map[string]*kapi.Service + // caller is responsible for using the cacheLock before invoking methods on cache // the cache is not thread-safe, and the caller can guarantee thread safety by using // the cacheLock @@ -119,20 +126,28 @@ type KubeDNS struct { nodesStore kcache.Store } -func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) *KubeDNS { +func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) (*KubeDNS, error) { + // Verify that federation names should not contain dots ('.') + // We can not allow dots since we use that as separator for path segments (svcname.nsname.fedname.svc.domain) + for key := range federations { + if strings.ContainsAny(key, ".") { + return nil, fmt.Errorf("invalid federation name: %s, cannot have '.'", key) + } + } kd := &KubeDNS{ - kubeClient: client, - domain: domain, - cache: NewTreeCache(), - cacheLock: sync.RWMutex{}, - nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc), - reverseRecordMap: make(map[string]*skymsg.Service), - domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")), - federations: federations, + kubeClient: client, + domain: domain, + cache: NewTreeCache(), + cacheLock: sync.RWMutex{}, + nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc), + reverseRecordMap: make(map[string]*skymsg.Service), + clusterIPServiceMap: make(map[string]*kapi.Service), + domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")), + federations: federations, } kd.setEndpointsStore() kd.setServicesStore() - return kd + return kd, nil } func (kd *KubeDNS) Start() { @@ -245,6 +260,7 @@ func (kd *KubeDNS) removeService(obj interface{}) { defer kd.cacheLock.Unlock() kd.cache.deletePath(subCachePath...) delete(kd.reverseRecordMap, s.Spec.ClusterIP) + delete(kd.clusterIPServiceMap, s.Spec.ClusterIP) } } @@ -319,6 +335,7 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) { defer kd.cacheLock.Unlock() kd.cache.setSubCache(service.Name, subCache, subCachePath...) kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord + kd.clusterIPServiceMap[service.Spec.ClusterIP] = service } func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kapi.Service) error { @@ -422,7 +439,74 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er glog.Infof("Received DNS Request:%s, exact:%v", name, exact) trimmed := strings.TrimRight(name, ".") segments := strings.Split(trimmed, ".") + isFederationQuery := false + federationSegments := []string{} + if !exact && kd.isFederationQuery(segments) { + glog.Infof("federation service query: Received federation query. Going to try to find local service first") + // Try quering the non-federation (local) service first. + // Will try the federation one later, if this fails. + isFederationQuery = true + federationSegments = append(federationSegments, segments...) + // To try local service, remove federation name from segments. + // Federation name is 3rd in the segment (after service name and namespace). + segments = append(segments[:2], segments[3:]...) + } path := reverseArray(segments) + records, err := kd.getRecordsForPath(path, exact) + if err != nil { + return nil, err + } + if !isFederationQuery { + if len(records) > 0 { + return records, nil + } + return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} + } + + // For federation query, verify that the local service has endpoints. + validRecord := false + for _, val := range records { + // We know that a headless service has endpoints for sure if a record was returned for it. + // The record contains endpoint IPs. So nothing to check for headless services. + if !kd.isHeadlessServiceRecord(&val) { + ok, err := kd.serviceWithClusterIPHasEndpoints(&val) + if err != nil { + glog.Infof("federation service query: unexpected error while trying to find if service has endpoint: %v") + continue + } + if !ok { + glog.Infof("federation service query: skipping record since service has no endpoint: %v", val) + continue + } + } + validRecord = true + break + } + if validRecord { + // There is a local service with valid endpoints, return its CNAME. + name := strings.Join(reverseArray(path), ".") + // Ensure that this name that we are returning as a CNAME response is a fully qualified + // domain name so that the client's resolver library doesn't have to go through its + // search list all over again. + if !strings.HasSuffix(name, ".") { + name = name + "." + } + glog.Infof("federation service query: Returning CNAME for local service : %s", name) + return []skymsg.Service{{Host: name}}, nil + } + + // If the name query is not an exact query and does not match any records in the local store, + // attempt to send a federation redirect (CNAME) response. + if !exact { + glog.Infof("federation service query: Did not find a local service. Trying federation redirect (CNAME) response") + return kd.federationRecords(reverseArray(federationSegments)) + } + + return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} +} + +func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Service, error) { + retval := []skymsg.Service{} if kd.isPodRecord(path) { ip, err := kd.getPodIP(path) if err == nil { @@ -448,21 +532,50 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er kd.cacheLock.RLock() defer kd.cacheLock.RUnlock() records := kd.cache.getValuesForPathWithWildcards(path...) + glog.V(2).Infof("Received %d records from cache", len(records)) for _, val := range records { retval = append(retval, *val) } glog.Infof("records:%v, retval:%v, path:%v", records, retval, path) - if len(retval) > 0 { - return retval, nil - } + return retval, nil +} - // If the name query is not an exact query and does not match any records in the local store, - // attempt to send a federation redirect (CNAME) response. - if !exact { - return kd.federationRecords(path) - } +// Returns true if the given record corresponds to a headless service. +// Important: Assumes that we already have the cacheLock. Callers responsibility to acquire it. +// This is because the code will panic, if we try to acquire it again if we already have it. +func (kd *KubeDNS) isHeadlessServiceRecord(msg *skymsg.Service) bool { + // If it is not a headless service, then msg.Host will be the cluster IP. + // So we can check if msg.host exists in our clusterIPServiceMap. + _, ok := kd.clusterIPServiceMap[msg.Host] + // It is headless service if no record was found. + return !ok +} - return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} +// Returns true if the service corresponding to the given message has endpoints. +// Note: Works only for services with ClusterIP. Will return an error for headless service (service without a clusterIP). +// Important: Assumes that we already have the cacheLock. Callers responsibility to acquire it. +// This is because the code will panic, if we try to acquire it again if we already have it. +func (kd *KubeDNS) serviceWithClusterIPHasEndpoints(msg *skymsg.Service) (bool, error) { + svc, ok := kd.clusterIPServiceMap[msg.Host] + if !ok { + // It is a headless service. + return false, fmt.Errorf("method not expected to be called for headless service") + } + key, err := kcache.MetaNamespaceKeyFunc(svc) + if err != nil { + return false, err + } + e, exists, err := kd.endpointsStore.GetByKey(key) + if err != nil { + return false, fmt.Errorf("failed to get endpoints object from endpoints store - %v", err) + } + if !exists { + return false, nil + } + if e, ok := e.(*kapi.Endpoints); ok { + return len(e.Subsets) > 0, nil + } + return false, fmt.Errorf("unexpected: found non-endpoint object in endpoint store: %v", e) } // ReverseRecords performs a reverse lookup for the given name. @@ -558,6 +671,9 @@ func getSkyMsg(ip string, port int) (*skymsg.Service, string) { // 5. Fourth segment is exactly "svc" // 6. The remaining segments match kd.domainPath. // 7. And federation must be one of the listed federations in the config. +// Note: Because of the above conditions, this method will treat wildcard queries such as +// *.mysvc.myns.myfederation.svc.domain.path as non-federation queries. +// We can add support for wildcard queries later, if needed. func (kd *KubeDNS) isFederationQuery(path []string) bool { if len(path) == 4+len(kd.domainPath) && len(validation.IsDNS952Label(path[0])) == 0 && diff --git a/pkg/dns/dns_test.go b/pkg/dns/dns_test.go index 9d7a1bb7d5..0d9e11dfa5 100644 --- a/pkg/dns/dns_test.go +++ b/pkg/dns/dns_test.go @@ -46,18 +46,27 @@ const ( func newKubeDNS() *KubeDNS { kd := &KubeDNS{ - domain: testDomain, - endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc), - servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc), - cache: NewTreeCache(), - reverseRecordMap: make(map[string]*skymsg.Service), - cacheLock: sync.RWMutex{}, - domainPath: reverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")), - nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc), + domain: testDomain, + endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc), + servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc), + cache: NewTreeCache(), + reverseRecordMap: make(map[string]*skymsg.Service), + clusterIPServiceMap: make(map[string]*kapi.Service), + cacheLock: sync.RWMutex{}, + domainPath: reverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")), + nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc), } return kd } +func TestNewKubeDNS(t *testing.T) { + // Verify that it returns an error for invalid federation names. + _, err := NewKubeDNS(nil, "domainName", map[string]string{"invalid.name.with.dot": "example.come"}) + if err == nil { + t.Errorf("Expected an error due to invalid federation name") + } +} + func TestPodDns(t *testing.T) { const ( testPodIP = "1.2.3.4" @@ -350,6 +359,98 @@ func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) { assertNoDNSForHeadlessService(t, kd, service) } +// Verifies that a single record with host "a" is returned for query "q". +func verifyRecord(q, a string, t *testing.T, kd *KubeDNS) { + records, err := kd.Records(q, false) + require.NoError(t, err) + assert.Equal(t, 1, len(records)) + assert.Equal(t, a, records[0].Host) +} + +// Verifies that quering KubeDNS for a headless federation service returns the DNS hostname when a local service does not exist and returns the endpoint IP when a local service exists. +func TestFederationHeadlessService(t *testing.T) { + kd := newKubeDNS() + kd.federations = map[string]string{ + "myfederation": "example.com", + } + kd.kubeClient = fake.NewSimpleClientset(newNodes()) + + // Verify that quering for federation service returns a federation domain name. + verifyRecord("testservice.default.myfederation.svc.cluster.local.", + "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", + t, kd) + + // Add a local service without any endpoint. + s := newHeadlessService() + assert.NoError(t, kd.servicesStore.Add(s)) + kd.newService(s) + + // Verify that quering for federation service still returns the federation domain name. + verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), + "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", + t, kd) + + // Now add an endpoint. + endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1")) + assert.NoError(t, kd.endpointsStore.Add(endpoints)) + kd.updateService(s, s) + + // Verify that quering for federation service returns the local service domain name this time. + verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), "testservice.default.svc.cluster.local.", t, kd) + + // Delete the endpoint. + endpoints.Subsets = []kapi.EndpointSubset{} + kd.handleEndpointAdd(endpoints) + kd.updateService(s, s) + + // Verify that quering for federation service returns the federation domain name again. + verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), + "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", + t, kd) +} + +// Verifies that quering KubeDNS for a federation service returns the DNS hostname if no endpoint exists and returns the local cluster IP if endpoints exist. +func TestFederationService(t *testing.T) { + kd := newKubeDNS() + kd.federations = map[string]string{ + "myfederation": "example.com", + } + kd.kubeClient = fake.NewSimpleClientset(newNodes()) + + // Verify that quering for federation service returns the federation domain name. + verifyRecord("testservice.default.myfederation.svc.cluster.local.", + "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", + t, kd) + + // Add a local service without any endpoint. + s := newService(testNamespace, testService, "1.2.3.4", "", 80) + assert.NoError(t, kd.servicesStore.Add(s)) + kd.newService(s) + + // Verify that quering for federation service still returns the federation domain name. + verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), + "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", + t, kd) + + // Now add an endpoint. + endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1")) + assert.NoError(t, kd.endpointsStore.Add(endpoints)) + kd.updateService(s, s) + + // Verify that quering for federation service returns the local service domain name this time. + verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), "testservice.default.svc.cluster.local.", t, kd) + + // Remove the endpoint. + endpoints.Subsets = []kapi.EndpointSubset{} + kd.handleEndpointAdd(endpoints) + kd.updateService(s, s) + + // Verify that quering for federation service returns the federation domain name again. + verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), + "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", + t, kd) +} + func TestFederationQueryWithoutCache(t *testing.T) { kd := newKubeDNS() kd.federations = map[string]string{ @@ -397,10 +498,7 @@ func testValidFederationQueries(t *testing.T, kd *KubeDNS) { } for _, query := range queries { - records, err := kd.Records(query.q, false) - require.NoError(t, err) - assert.Equal(t, 1, len(records)) - assert.Equal(t, query.a, records[0].Host) + verifyRecord(query.q, query.a, t, kd) } } @@ -630,6 +728,10 @@ func getEquivalentQueries(serviceFQDN, namespace string) []string { } } +func getFederationServiceFQDN(kd *KubeDNS, s *kapi.Service, federationName string) string { + return fmt.Sprintf("%s.%s.%s.svc.%s", s.Name, s.Namespace, federationName, kd.domain) +} + func getServiceFQDN(kd *KubeDNS, s *kapi.Service) string { return fmt.Sprintf("%s.%s.svc.%s", s.Name, s.Namespace, kd.domain) }