mirror of https://github.com/k3s-io/k3s
Merge pull request #26226 from ArtfulCoder/reversedns
Automatic merge from submit-queue Added DNS Reverse Record logic for service IPspull/6/head
commit
693dae70a8
|
@ -49,6 +49,9 @@ const (
|
|||
// A subdomain added to the user specified dmoain for all pods.
|
||||
podSubdomain = "pod"
|
||||
|
||||
// arpaSuffix is the standard suffix for PTR IP reverse lookups.
|
||||
arpaSuffix = ".in-addr.arpa."
|
||||
|
||||
// Resync period for the kube controller loop.
|
||||
resyncPeriod = 5 * time.Minute
|
||||
|
||||
|
@ -78,6 +81,8 @@ type KubeDNS struct {
|
|||
// A Records and SRV Records for (regular) services and headless Services.
|
||||
cache *TreeCache
|
||||
|
||||
reverseRecordMap map[string]*skymsg.Service
|
||||
|
||||
// caller is responsible for using the cacheLock before invoking methods on cache
|
||||
// the cache is not thread-safe, and the caller can guarantee thread safety by using
|
||||
// the cacheLock
|
||||
|
@ -105,12 +110,13 @@ type KubeDNS struct {
|
|||
|
||||
func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) *KubeDNS {
|
||||
kd := &KubeDNS{
|
||||
kubeClient: client,
|
||||
domain: domain,
|
||||
cache: NewTreeCache(),
|
||||
cacheLock: sync.RWMutex{},
|
||||
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
|
||||
federations: federations,
|
||||
kubeClient: client,
|
||||
domain: domain,
|
||||
cache: NewTreeCache(),
|
||||
cacheLock: sync.RWMutex{},
|
||||
reverseRecordMap: make(map[string]*skymsg.Service),
|
||||
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
|
||||
federations: federations,
|
||||
}
|
||||
kd.setEndpointsStore()
|
||||
kd.setServicesStore()
|
||||
|
@ -225,6 +231,7 @@ func (kd *KubeDNS) removeService(obj interface{}) {
|
|||
kd.cacheLock.Lock()
|
||||
defer kd.cacheLock.Unlock()
|
||||
kd.cache.deletePath(subCachePath...)
|
||||
delete(kd.reverseRecordMap, s.Spec.ClusterIP)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -283,9 +290,13 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) {
|
|||
}
|
||||
}
|
||||
subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
|
||||
host := kd.getServiceFQDN(service)
|
||||
reverseRecord, _ := getSkyMsg(host, 0)
|
||||
|
||||
kd.cacheLock.Lock()
|
||||
defer kd.cacheLock.Unlock()
|
||||
kd.cache.setSubCache(service.Name, subCache, subCachePath...)
|
||||
kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord
|
||||
}
|
||||
|
||||
func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kapi.Service) error {
|
||||
|
@ -430,17 +441,34 @@ func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) {
|
|||
func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
|
||||
glog.Infof("Received ReverseRecord Request:%s", name)
|
||||
|
||||
segments := strings.Split(strings.TrimRight(name, "."), ".")
|
||||
// if portalIP is not a valid IP, the reverseRecordMap lookup will fail
|
||||
portalIP, ok := extractIP(name)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("does not support reverse lookup for %s", name)
|
||||
}
|
||||
|
||||
for _, k := range segments {
|
||||
if k == "*" {
|
||||
return nil, fmt.Errorf("reverse can not contain wildcards")
|
||||
}
|
||||
kd.cacheLock.RLock()
|
||||
defer kd.cacheLock.RUnlock()
|
||||
if reverseRecord, ok := kd.reverseRecordMap[portalIP]; ok {
|
||||
return reverseRecord, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("must be exactly one service record")
|
||||
}
|
||||
|
||||
// extractIP turns a standard PTR reverse record lookup name
|
||||
// into an IP address
|
||||
func extractIP(reverseName string) (string, bool) {
|
||||
if !strings.HasSuffix(reverseName, arpaSuffix) {
|
||||
return "", false
|
||||
}
|
||||
search := strings.TrimSuffix(reverseName, arpaSuffix)
|
||||
|
||||
// reverse the segments and then combine them
|
||||
segments := reverseArray(strings.Split(search, "."))
|
||||
return strings.Join(segments, "."), true
|
||||
}
|
||||
|
||||
// e.g {"local", "cluster", "pod", "default", "10-0-0-1"}
|
||||
func (kd *KubeDNS) isPodRecord(path []string) bool {
|
||||
if len(path) != len(kd.domainPath)+3 {
|
||||
|
@ -602,6 +630,10 @@ func (kd *KubeDNS) getClusterZone() (string, error) {
|
|||
return zone, nil
|
||||
}
|
||||
|
||||
func (kd *KubeDNS) getServiceFQDN(service *kapi.Service) string {
|
||||
return strings.Join([]string{service.Name, service.Namespace, serviceSubdomain, kd.domain}, ".")
|
||||
}
|
||||
|
||||
func reverseArray(arr []string) []string {
|
||||
for i := 0; i < len(arr)/2; i++ {
|
||||
j := len(arr) - i - 1
|
||||
|
|
|
@ -41,13 +41,14 @@ const (
|
|||
|
||||
func newKubeDNS() *KubeDNS {
|
||||
kd := &KubeDNS{
|
||||
domain: testDomain,
|
||||
endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||
servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||
cache: NewTreeCache(),
|
||||
cacheLock: sync.RWMutex{},
|
||||
domainPath: reverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
|
||||
nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||
domain: testDomain,
|
||||
endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||
servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||
cache: NewTreeCache(),
|
||||
reverseRecordMap: make(map[string]*skymsg.Service),
|
||||
cacheLock: sync.RWMutex{},
|
||||
domainPath: reverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
|
||||
nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||
}
|
||||
return kd
|
||||
}
|
||||
|
@ -71,9 +72,11 @@ func TestUnnamedSinglePortService(t *testing.T) {
|
|||
// Add the service
|
||||
kd.newService(s)
|
||||
assertDNSForClusterIP(t, kd, s)
|
||||
assertReverseRecord(t, kd, s)
|
||||
// Delete the service
|
||||
kd.removeService(s)
|
||||
assertNoDNSForClusterIP(t, kd, s)
|
||||
assertNoReverseRecord(t, kd, s)
|
||||
}
|
||||
|
||||
func TestNamedSinglePortService(t *testing.T) {
|
||||
|
@ -457,6 +460,22 @@ func assertDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) {
|
|||
}
|
||||
}
|
||||
|
||||
func assertReverseRecord(t *testing.T, kd *KubeDNS, s *kapi.Service) {
|
||||
segments := reverseArray(strings.Split(s.Spec.ClusterIP, "."))
|
||||
reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), arpaSuffix)
|
||||
reverseRecord, err := kd.ReverseRecord(reverseLookup)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, kd.getServiceFQDN(s), reverseRecord.Host)
|
||||
}
|
||||
|
||||
func assertNoReverseRecord(t *testing.T, kd *KubeDNS, s *kapi.Service) {
|
||||
segments := reverseArray(strings.Split(s.Spec.ClusterIP, "."))
|
||||
reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), arpaSuffix)
|
||||
reverseRecord, err := kd.ReverseRecord(reverseLookup)
|
||||
require.Error(t, err)
|
||||
require.Nil(t, reverseRecord)
|
||||
}
|
||||
|
||||
func getEquivalentQueries(serviceFQDN, namespace string) []string {
|
||||
return []string{
|
||||
serviceFQDN,
|
||||
|
|
|
@ -117,7 +117,7 @@ func createDNSPod(namespace, wheezyProbeCmd, jessieProbeCmd string, useAnnotatio
|
|||
return dnsPod
|
||||
}
|
||||
|
||||
func createProbeCommand(namesToResolve []string, hostEntries []string, fileNamePrefix, namespace string) (string, []string) {
|
||||
func createProbeCommand(namesToResolve []string, hostEntries []string, ptrLookupIP string, fileNamePrefix, namespace string) (string, []string) {
|
||||
fileNames := make([]string, 0, len(namesToResolve)*2)
|
||||
probeCmd := "for i in `seq 1 600`; do "
|
||||
for _, name := range namesToResolve {
|
||||
|
@ -150,6 +150,16 @@ func createProbeCommand(namesToResolve []string, hostEntries []string, fileNameP
|
|||
fileNames = append(fileNames, podARecByUDPFileName)
|
||||
fileNames = append(fileNames, podARecByTCPFileName)
|
||||
|
||||
if len(ptrLookupIP) > 0 {
|
||||
ptrLookup := fmt.Sprintf("%s.in-addr.arpa.", strings.Join(reverseArray(strings.Split(ptrLookupIP, ".")), "."))
|
||||
ptrRecByUDPFileName := fmt.Sprintf("%s_udp@PTR", ptrLookupIP)
|
||||
ptrRecByTCPFileName := fmt.Sprintf("%s_tcp@PTR", ptrLookupIP)
|
||||
probeCmd += fmt.Sprintf(`test -n "$$(dig +notcp +noall +answer +search %s PTR)" && echo OK > /results/%s;`, ptrLookup, ptrRecByUDPFileName)
|
||||
probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search %s PTR)" && echo OK > /results/%s;`, ptrLookup, ptrRecByTCPFileName)
|
||||
fileNames = append(fileNames, ptrRecByUDPFileName)
|
||||
fileNames = append(fileNames, ptrRecByTCPFileName)
|
||||
}
|
||||
|
||||
probeCmd += "sleep 1; done"
|
||||
return probeCmd, fileNames
|
||||
}
|
||||
|
@ -256,6 +266,14 @@ func createServiceSpec(serviceName string, isHeadless bool, selector map[string]
|
|||
return headlessService
|
||||
}
|
||||
|
||||
func reverseArray(arr []string) []string {
|
||||
for i := 0; i < len(arr)/2; i++ {
|
||||
j := len(arr) - i - 1
|
||||
arr[i], arr[j] = arr[j], arr[i]
|
||||
}
|
||||
return arr
|
||||
}
|
||||
|
||||
var _ = framework.KubeDescribe("DNS", func() {
|
||||
f := framework.NewDefaultFramework("dns")
|
||||
|
||||
|
@ -274,8 +292,8 @@ var _ = framework.KubeDescribe("DNS", func() {
|
|||
}
|
||||
hostFQDN := fmt.Sprintf("%s.%s.%s.svc.cluster.local", dnsTestPodHostName, dnsTestServiceName, f.Namespace.Name)
|
||||
hostEntries := []string{hostFQDN, dnsTestPodHostName}
|
||||
wheezyProbeCmd, wheezyFileNames := createProbeCommand(namesToResolve, hostEntries, "wheezy", f.Namespace.Name)
|
||||
jessieProbeCmd, jessieFileNames := createProbeCommand(namesToResolve, hostEntries, "jessie", f.Namespace.Name)
|
||||
wheezyProbeCmd, wheezyFileNames := createProbeCommand(namesToResolve, hostEntries, "", "wheezy", f.Namespace.Name)
|
||||
jessieProbeCmd, jessieFileNames := createProbeCommand(namesToResolve, hostEntries, "", "jessie", f.Namespace.Name)
|
||||
By("Running these commands on wheezy:" + wheezyProbeCmd + "\n")
|
||||
By("Running these commands on jessie:" + jessieProbeCmd + "\n")
|
||||
|
||||
|
@ -301,7 +319,7 @@ var _ = framework.KubeDescribe("DNS", func() {
|
|||
}()
|
||||
|
||||
regularService := createServiceSpec("test-service-2", false, testServiceSelector)
|
||||
_, err = f.Client.Services(f.Namespace.Name).Create(regularService)
|
||||
regularService, err = f.Client.Services(f.Namespace.Name).Create(regularService)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer func() {
|
||||
By("deleting the test service")
|
||||
|
@ -320,8 +338,8 @@ var _ = framework.KubeDescribe("DNS", func() {
|
|||
fmt.Sprintf("_http._tcp.%s.%s.svc", regularService.Name, f.Namespace.Name),
|
||||
}
|
||||
|
||||
wheezyProbeCmd, wheezyFileNames := createProbeCommand(namesToResolve, nil, "wheezy", f.Namespace.Name)
|
||||
jessieProbeCmd, jessieFileNames := createProbeCommand(namesToResolve, nil, "jessie", f.Namespace.Name)
|
||||
wheezyProbeCmd, wheezyFileNames := createProbeCommand(namesToResolve, nil, regularService.Spec.ClusterIP, "wheezy", f.Namespace.Name)
|
||||
jessieProbeCmd, jessieFileNames := createProbeCommand(namesToResolve, nil, regularService.Spec.ClusterIP, "jessie", f.Namespace.Name)
|
||||
By("Running these commands on wheezy:" + wheezyProbeCmd + "\n")
|
||||
By("Running these commands on jessie:" + jessieProbeCmd + "\n")
|
||||
|
||||
|
@ -353,8 +371,8 @@ var _ = framework.KubeDescribe("DNS", func() {
|
|||
hostFQDN := fmt.Sprintf("%s.%s.%s.svc.cluster.local", podHostname, serviceName, f.Namespace.Name)
|
||||
hostNames := []string{hostFQDN, podHostname}
|
||||
namesToResolve := []string{hostFQDN}
|
||||
wheezyProbeCmd, wheezyFileNames := createProbeCommand(namesToResolve, hostNames, "wheezy", f.Namespace.Name)
|
||||
jessieProbeCmd, jessieFileNames := createProbeCommand(namesToResolve, hostNames, "jessie", f.Namespace.Name)
|
||||
wheezyProbeCmd, wheezyFileNames := createProbeCommand(namesToResolve, hostNames, "", "wheezy", f.Namespace.Name)
|
||||
jessieProbeCmd, jessieFileNames := createProbeCommand(namesToResolve, hostNames, "", "jessie", f.Namespace.Name)
|
||||
By("Running these commands on wheezy:" + wheezyProbeCmd + "\n")
|
||||
By("Running these commands on jessie:" + jessieProbeCmd + "\n")
|
||||
|
||||
|
|
Loading…
Reference in New Issue