mirror of https://github.com/k3s-io/k3s
Updating KubeDNS to try a local service first for federation query
parent
38182e91c9
commit
61f43c5924
|
@ -53,7 +53,10 @@ func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer {
|
|||
}
|
||||
ks.healthzPort = config.HealthzPort
|
||||
ks.dnsPort = config.DNSPort
|
||||
ks.kd = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.Federations)
|
||||
ks.kd, err = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.Federations)
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to start kubeDNS: %v", err)
|
||||
}
|
||||
return &ks
|
||||
}
|
||||
|
||||
|
|
136
pkg/dns/dns.go
136
pkg/dns/dns.go
|
@ -92,8 +92,15 @@ type KubeDNS struct {
|
|||
// A Records and SRV Records for (regular) services and headless Services.
|
||||
cache *TreeCache
|
||||
|
||||
// TODO(nikhiljindal): Remove this. It can be recreated using clusterIPServiceMap.
|
||||
reverseRecordMap map[string]*skymsg.Service
|
||||
|
||||
// Map of cluster IP to service object. Headless services are not part of this map.
|
||||
// Used to get a service when given its cluster IP.
|
||||
// Access to this is coordinated using cacheLock. We use the same lock for cache and this map
|
||||
// to ensure that they dont get out of sync.
|
||||
clusterIPServiceMap map[string]*kapi.Service
|
||||
|
||||
// caller is responsible for using the cacheLock before invoking methods on cache
|
||||
// the cache is not thread-safe, and the caller can guarantee thread safety by using
|
||||
// the cacheLock
|
||||
|
@ -119,7 +126,14 @@ type KubeDNS struct {
|
|||
nodesStore kcache.Store
|
||||
}
|
||||
|
||||
func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) *KubeDNS {
|
||||
func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) (*KubeDNS, error) {
|
||||
// Verify that federation names should not contain dots ('.')
|
||||
// We can not allow dots since we use that as separator for path segments (svcname.nsname.fedname.svc.domain)
|
||||
for key := range federations {
|
||||
if strings.ContainsAny(key, ".") {
|
||||
return nil, fmt.Errorf("invalid federation name: %s, cannot have '.'", key)
|
||||
}
|
||||
}
|
||||
kd := &KubeDNS{
|
||||
kubeClient: client,
|
||||
domain: domain,
|
||||
|
@ -127,12 +141,13 @@ func NewKubeDNS(client clientset.Interface, domain string, federations map[strin
|
|||
cacheLock: sync.RWMutex{},
|
||||
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc),
|
||||
reverseRecordMap: make(map[string]*skymsg.Service),
|
||||
clusterIPServiceMap: make(map[string]*kapi.Service),
|
||||
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
|
||||
federations: federations,
|
||||
}
|
||||
kd.setEndpointsStore()
|
||||
kd.setServicesStore()
|
||||
return kd
|
||||
return kd, nil
|
||||
}
|
||||
|
||||
func (kd *KubeDNS) Start() {
|
||||
|
@ -245,6 +260,7 @@ func (kd *KubeDNS) removeService(obj interface{}) {
|
|||
defer kd.cacheLock.Unlock()
|
||||
kd.cache.deletePath(subCachePath...)
|
||||
delete(kd.reverseRecordMap, s.Spec.ClusterIP)
|
||||
delete(kd.clusterIPServiceMap, s.Spec.ClusterIP)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -319,6 +335,7 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) {
|
|||
defer kd.cacheLock.Unlock()
|
||||
kd.cache.setSubCache(service.Name, subCache, subCachePath...)
|
||||
kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord
|
||||
kd.clusterIPServiceMap[service.Spec.ClusterIP] = service
|
||||
}
|
||||
|
||||
func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kapi.Service) error {
|
||||
|
@ -422,7 +439,74 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
|
|||
glog.Infof("Received DNS Request:%s, exact:%v", name, exact)
|
||||
trimmed := strings.TrimRight(name, ".")
|
||||
segments := strings.Split(trimmed, ".")
|
||||
isFederationQuery := false
|
||||
federationSegments := []string{}
|
||||
if !exact && kd.isFederationQuery(segments) {
|
||||
glog.Infof("federation service query: Received federation query. Going to try to find local service first")
|
||||
// Try quering the non-federation (local) service first.
|
||||
// Will try the federation one later, if this fails.
|
||||
isFederationQuery = true
|
||||
federationSegments = append(federationSegments, segments...)
|
||||
// To try local service, remove federation name from segments.
|
||||
// Federation name is 3rd in the segment (after service name and namespace).
|
||||
segments = append(segments[:2], segments[3:]...)
|
||||
}
|
||||
path := reverseArray(segments)
|
||||
records, err := kd.getRecordsForPath(path, exact)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !isFederationQuery {
|
||||
if len(records) > 0 {
|
||||
return records, nil
|
||||
}
|
||||
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
|
||||
}
|
||||
|
||||
// For federation query, verify that the local service has endpoints.
|
||||
validRecord := false
|
||||
for _, val := range records {
|
||||
// We know that a headless service has endpoints for sure if a record was returned for it.
|
||||
// The record contains endpoint IPs. So nothing to check for headless services.
|
||||
if !kd.isHeadlessServiceRecord(&val) {
|
||||
ok, err := kd.serviceWithClusterIPHasEndpoints(&val)
|
||||
if err != nil {
|
||||
glog.Infof("federation service query: unexpected error while trying to find if service has endpoint: %v")
|
||||
continue
|
||||
}
|
||||
if !ok {
|
||||
glog.Infof("federation service query: skipping record since service has no endpoint: %v", val)
|
||||
continue
|
||||
}
|
||||
}
|
||||
validRecord = true
|
||||
break
|
||||
}
|
||||
if validRecord {
|
||||
// There is a local service with valid endpoints, return its CNAME.
|
||||
name := strings.Join(reverseArray(path), ".")
|
||||
// Ensure that this name that we are returning as a CNAME response is a fully qualified
|
||||
// domain name so that the client's resolver library doesn't have to go through its
|
||||
// search list all over again.
|
||||
if !strings.HasSuffix(name, ".") {
|
||||
name = name + "."
|
||||
}
|
||||
glog.Infof("federation service query: Returning CNAME for local service : %s", name)
|
||||
return []skymsg.Service{{Host: name}}, nil
|
||||
}
|
||||
|
||||
// If the name query is not an exact query and does not match any records in the local store,
|
||||
// attempt to send a federation redirect (CNAME) response.
|
||||
if !exact {
|
||||
glog.Infof("federation service query: Did not find a local service. Trying federation redirect (CNAME) response")
|
||||
return kd.federationRecords(reverseArray(federationSegments))
|
||||
}
|
||||
|
||||
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
|
||||
}
|
||||
|
||||
func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Service, error) {
|
||||
retval := []skymsg.Service{}
|
||||
if kd.isPodRecord(path) {
|
||||
ip, err := kd.getPodIP(path)
|
||||
if err == nil {
|
||||
|
@ -448,21 +532,50 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
|
|||
kd.cacheLock.RLock()
|
||||
defer kd.cacheLock.RUnlock()
|
||||
records := kd.cache.getValuesForPathWithWildcards(path...)
|
||||
glog.V(2).Infof("Received %d records from cache", len(records))
|
||||
for _, val := range records {
|
||||
retval = append(retval, *val)
|
||||
}
|
||||
glog.Infof("records:%v, retval:%v, path:%v", records, retval, path)
|
||||
if len(retval) > 0 {
|
||||
return retval, nil
|
||||
}
|
||||
}
|
||||
|
||||
// If the name query is not an exact query and does not match any records in the local store,
|
||||
// attempt to send a federation redirect (CNAME) response.
|
||||
if !exact {
|
||||
return kd.federationRecords(path)
|
||||
}
|
||||
// Returns true if the given record corresponds to a headless service.
|
||||
// Important: Assumes that we already have the cacheLock. Callers responsibility to acquire it.
|
||||
// This is because the code will panic, if we try to acquire it again if we already have it.
|
||||
func (kd *KubeDNS) isHeadlessServiceRecord(msg *skymsg.Service) bool {
|
||||
// If it is not a headless service, then msg.Host will be the cluster IP.
|
||||
// So we can check if msg.host exists in our clusterIPServiceMap.
|
||||
_, ok := kd.clusterIPServiceMap[msg.Host]
|
||||
// It is headless service if no record was found.
|
||||
return !ok
|
||||
}
|
||||
|
||||
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
|
||||
// Returns true if the service corresponding to the given message has endpoints.
|
||||
// Note: Works only for services with ClusterIP. Will return an error for headless service (service without a clusterIP).
|
||||
// Important: Assumes that we already have the cacheLock. Callers responsibility to acquire it.
|
||||
// This is because the code will panic, if we try to acquire it again if we already have it.
|
||||
func (kd *KubeDNS) serviceWithClusterIPHasEndpoints(msg *skymsg.Service) (bool, error) {
|
||||
svc, ok := kd.clusterIPServiceMap[msg.Host]
|
||||
if !ok {
|
||||
// It is a headless service.
|
||||
return false, fmt.Errorf("method not expected to be called for headless service")
|
||||
}
|
||||
key, err := kcache.MetaNamespaceKeyFunc(svc)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
e, exists, err := kd.endpointsStore.GetByKey(key)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get endpoints object from endpoints store - %v", err)
|
||||
}
|
||||
if !exists {
|
||||
return false, nil
|
||||
}
|
||||
if e, ok := e.(*kapi.Endpoints); ok {
|
||||
return len(e.Subsets) > 0, nil
|
||||
}
|
||||
return false, fmt.Errorf("unexpected: found non-endpoint object in endpoint store: %v", e)
|
||||
}
|
||||
|
||||
// ReverseRecords performs a reverse lookup for the given name.
|
||||
|
@ -558,6 +671,9 @@ func getSkyMsg(ip string, port int) (*skymsg.Service, string) {
|
|||
// 5. Fourth segment is exactly "svc"
|
||||
// 6. The remaining segments match kd.domainPath.
|
||||
// 7. And federation must be one of the listed federations in the config.
|
||||
// Note: Because of the above conditions, this method will treat wildcard queries such as
|
||||
// *.mysvc.myns.myfederation.svc.domain.path as non-federation queries.
|
||||
// We can add support for wildcard queries later, if needed.
|
||||
func (kd *KubeDNS) isFederationQuery(path []string) bool {
|
||||
if len(path) == 4+len(kd.domainPath) &&
|
||||
len(validation.IsDNS952Label(path[0])) == 0 &&
|
||||
|
|
|
@ -51,6 +51,7 @@ func newKubeDNS() *KubeDNS {
|
|||
servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||
cache: NewTreeCache(),
|
||||
reverseRecordMap: make(map[string]*skymsg.Service),
|
||||
clusterIPServiceMap: make(map[string]*kapi.Service),
|
||||
cacheLock: sync.RWMutex{},
|
||||
domainPath: reverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
|
||||
nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||
|
@ -58,6 +59,14 @@ func newKubeDNS() *KubeDNS {
|
|||
return kd
|
||||
}
|
||||
|
||||
func TestNewKubeDNS(t *testing.T) {
|
||||
// Verify that it returns an error for invalid federation names.
|
||||
_, err := NewKubeDNS(nil, "domainName", map[string]string{"invalid.name.with.dot": "example.come"})
|
||||
if err == nil {
|
||||
t.Errorf("Expected an error due to invalid federation name")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPodDns(t *testing.T) {
|
||||
const (
|
||||
testPodIP = "1.2.3.4"
|
||||
|
@ -350,6 +359,98 @@ func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
|
|||
assertNoDNSForHeadlessService(t, kd, service)
|
||||
}
|
||||
|
||||
// Verifies that a single record with host "a" is returned for query "q".
|
||||
func verifyRecord(q, a string, t *testing.T, kd *KubeDNS) {
|
||||
records, err := kd.Records(q, false)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, len(records))
|
||||
assert.Equal(t, a, records[0].Host)
|
||||
}
|
||||
|
||||
// Verifies that quering KubeDNS for a headless federation service returns the DNS hostname when a local service does not exist and returns the endpoint IP when a local service exists.
|
||||
func TestFederationHeadlessService(t *testing.T) {
|
||||
kd := newKubeDNS()
|
||||
kd.federations = map[string]string{
|
||||
"myfederation": "example.com",
|
||||
}
|
||||
kd.kubeClient = fake.NewSimpleClientset(newNodes())
|
||||
|
||||
// Verify that quering for federation service returns a federation domain name.
|
||||
verifyRecord("testservice.default.myfederation.svc.cluster.local.",
|
||||
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
|
||||
t, kd)
|
||||
|
||||
// Add a local service without any endpoint.
|
||||
s := newHeadlessService()
|
||||
assert.NoError(t, kd.servicesStore.Add(s))
|
||||
kd.newService(s)
|
||||
|
||||
// Verify that quering for federation service still returns the federation domain name.
|
||||
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
|
||||
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
|
||||
t, kd)
|
||||
|
||||
// Now add an endpoint.
|
||||
endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1"))
|
||||
assert.NoError(t, kd.endpointsStore.Add(endpoints))
|
||||
kd.updateService(s, s)
|
||||
|
||||
// Verify that quering for federation service returns the local service domain name this time.
|
||||
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), "testservice.default.svc.cluster.local.", t, kd)
|
||||
|
||||
// Delete the endpoint.
|
||||
endpoints.Subsets = []kapi.EndpointSubset{}
|
||||
kd.handleEndpointAdd(endpoints)
|
||||
kd.updateService(s, s)
|
||||
|
||||
// Verify that quering for federation service returns the federation domain name again.
|
||||
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
|
||||
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
|
||||
t, kd)
|
||||
}
|
||||
|
||||
// Verifies that quering KubeDNS for a federation service returns the DNS hostname if no endpoint exists and returns the local cluster IP if endpoints exist.
|
||||
func TestFederationService(t *testing.T) {
|
||||
kd := newKubeDNS()
|
||||
kd.federations = map[string]string{
|
||||
"myfederation": "example.com",
|
||||
}
|
||||
kd.kubeClient = fake.NewSimpleClientset(newNodes())
|
||||
|
||||
// Verify that quering for federation service returns the federation domain name.
|
||||
verifyRecord("testservice.default.myfederation.svc.cluster.local.",
|
||||
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
|
||||
t, kd)
|
||||
|
||||
// Add a local service without any endpoint.
|
||||
s := newService(testNamespace, testService, "1.2.3.4", "", 80)
|
||||
assert.NoError(t, kd.servicesStore.Add(s))
|
||||
kd.newService(s)
|
||||
|
||||
// Verify that quering for federation service still returns the federation domain name.
|
||||
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
|
||||
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
|
||||
t, kd)
|
||||
|
||||
// Now add an endpoint.
|
||||
endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1"))
|
||||
assert.NoError(t, kd.endpointsStore.Add(endpoints))
|
||||
kd.updateService(s, s)
|
||||
|
||||
// Verify that quering for federation service returns the local service domain name this time.
|
||||
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), "testservice.default.svc.cluster.local.", t, kd)
|
||||
|
||||
// Remove the endpoint.
|
||||
endpoints.Subsets = []kapi.EndpointSubset{}
|
||||
kd.handleEndpointAdd(endpoints)
|
||||
kd.updateService(s, s)
|
||||
|
||||
// Verify that quering for federation service returns the federation domain name again.
|
||||
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
|
||||
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
|
||||
t, kd)
|
||||
}
|
||||
|
||||
func TestFederationQueryWithoutCache(t *testing.T) {
|
||||
kd := newKubeDNS()
|
||||
kd.federations = map[string]string{
|
||||
|
@ -397,10 +498,7 @@ func testValidFederationQueries(t *testing.T, kd *KubeDNS) {
|
|||
}
|
||||
|
||||
for _, query := range queries {
|
||||
records, err := kd.Records(query.q, false)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, len(records))
|
||||
assert.Equal(t, query.a, records[0].Host)
|
||||
verifyRecord(query.q, query.a, t, kd)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -630,6 +728,10 @@ func getEquivalentQueries(serviceFQDN, namespace string) []string {
|
|||
}
|
||||
}
|
||||
|
||||
func getFederationServiceFQDN(kd *KubeDNS, s *kapi.Service, federationName string) string {
|
||||
return fmt.Sprintf("%s.%s.%s.svc.%s", s.Name, s.Namespace, federationName, kd.domain)
|
||||
}
|
||||
|
||||
func getServiceFQDN(kd *KubeDNS, s *kapi.Service) string {
|
||||
return fmt.Sprintf("%s.%s.svc.%s", s.Name, s.Namespace, kd.domain)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue