Fix srv record lookup

pull/6/head
Prashanth Balasubramanian 2016-05-26 17:20:34 -07:00
parent 642049652b
commit 2439930592
5 changed files with 253 additions and 28 deletions

View File

@ -469,4 +469,3 @@ watch-only
whitelist-override-label
windows-line-endings
www-prefix

View File

@ -26,7 +26,7 @@ import (
"time"
etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
"github.com/miekg/dns"
skymsg "github.com/skynetservices/skydns/msg"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
@ -38,6 +38,8 @@ import (
"k8s.io/kubernetes/pkg/util/validation"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
const (
@ -61,6 +63,15 @@ const (
// never change. So we expire the cache and retrieve a node once every 180 seconds.
// The value is chosen to be neither too long nor too short.
nodeCacheTTL = 180 * time.Second
// default priority used for service records
defaultPriority = 10
// default weight used for service records
defaultWeight = 10
// default TTL used for service records
defaultTTL = 30
)
type KubeDNS struct {
@ -213,6 +224,7 @@ func assertIsService(obj interface{}) (*kapi.Service, bool) {
func (kd *KubeDNS) newService(obj interface{}) {
if service, ok := assertIsService(obj); ok {
glog.V(4).Infof("Add/Updated for service %v", service.Name)
// if ClusterIP is not set, a DNS entry should not be created
if !kapi.IsServiceIPSet(service) {
kd.newHeadlessService(service)
@ -276,17 +288,26 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er
return nil, fmt.Errorf("got a non service object in services store %v", obj)
}
// fqdn constructs the fqdn for the given service. subpaths is a list of path
// elements rooted at the given service, ending at a service record.
func (kd *KubeDNS) fqdn(service *kapi.Service, subpaths ...string) string {
domainLabels := append(append(kd.domainPath, serviceSubdomain, service.Namespace, service.Name), subpaths...)
return dns.Fqdn(strings.Join(reverseArray(domainLabels), "."))
}
func (kd *KubeDNS) newPortalService(service *kapi.Service) {
subCache := NewTreeCache()
recordValue, recordLabel := getSkyMsg(service.Spec.ClusterIP, 0)
subCache.setEntry(recordLabel, recordValue)
subCache.setEntry(recordLabel, recordValue, kd.fqdn(service, recordLabel))
// Generate SRV Records
for i := range service.Spec.Ports {
port := &service.Spec.Ports[i]
if port.Name != "" && port.Protocol != "" {
srvValue := kd.generateSRVRecordValue(service, int(port.Port))
subCache.setEntry(recordLabel, srvValue, "_"+strings.ToLower(string(port.Protocol)), "_"+port.Name)
l := []string{"_" + strings.ToLower(string(port.Protocol)), "_" + port.Name}
subCache.setEntry(recordLabel, srvValue, kd.fqdn(service, append(l, recordLabel)...), l...)
}
}
subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
@ -315,12 +336,14 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap
if hostLabel, exists := getHostname(address, podHostnames); exists {
endpointName = hostLabel
}
subCache.setEntry(endpointName, recordValue)
subCache.setEntry(endpointName, recordValue, kd.fqdn(svc, endpointName))
for portIdx := range e.Subsets[idx].Ports {
endpointPort := &e.Subsets[idx].Ports[portIdx]
if endpointPort.Name != "" && endpointPort.Protocol != "" {
srvValue := kd.generateSRVRecordValue(svc, int(endpointPort.Port), endpointName)
subCache.setEntry(endpointName, srvValue, "_"+strings.ToLower(string(endpointPort.Protocol)), "_"+endpointPort.Name)
l := []string{"_" + strings.ToLower(string(endpointPort.Protocol)), "_" + endpointPort.Name}
subCache.setEntry(endpointName, srvValue, kd.fqdn(svc, append(l, endpointName)...), l...)
}
}
}
@ -390,7 +413,11 @@ func (kd *KubeDNS) newHeadlessService(service *kapi.Service) error {
return nil
}
func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) {
// Records responds with DNS records that match the given name, in a format
// understood by the skydns server. If "exact" is true, a single record
// matching the given name is returned, otherwise all records stored under
// the subtree matching the name are returned.
func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, err error) {
glog.Infof("Received DNS Request:%s, exact:%v", name, exact)
trimmed := strings.TrimRight(name, ".")
segments := strings.Split(trimmed, ".")
@ -420,9 +447,8 @@ func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) {
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
records := kd.cache.getValuesForPathWithWildcards(path...)
retval := []skymsg.Service{}
for _, val := range records {
retval = append(retval, *(val.(*skymsg.Service)))
retval = append(retval, *val)
}
glog.Infof("records:%v, retval:%v, path:%v", records, retval, path)
if len(retval) > 0 {
@ -438,6 +464,7 @@ func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) {
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}
// ReverseRecords performs a reverse lookup for the given name.
func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
glog.Infof("Received ReverseRecord Request:%s", name)
@ -494,21 +521,29 @@ func (kd *KubeDNS) getPodIP(path []string) (string, error) {
return "", fmt.Errorf("Invalid IP Address %v", ip)
}
// Returns record in a format that SkyDNS understands.
// Also return the hash of the record.
func getSkyMsg(ip string, port int) (*skymsg.Service, string) {
msg := &skymsg.Service{
Host: ip,
Port: port,
Priority: 10,
Weight: 10,
Ttl: 30,
}
func hashServiceRecord(msg *skymsg.Service) string {
s := fmt.Sprintf("%v", msg)
h := fnv.New32a()
h.Write([]byte(s))
hash := fmt.Sprintf("%x", h.Sum32())
glog.Infof("DNS Record:%s, hash:%s", s, hash)
return fmt.Sprintf("%x", h.Sum32())
}
func newServiceRecord(ip string, port int) *skymsg.Service {
return &skymsg.Service{
Host: ip,
Port: port,
Priority: defaultPriority,
Weight: defaultWeight,
Ttl: defaultTTL,
}
}
// Returns record in a format that SkyDNS understands.
// Also return the hash of the record.
func getSkyMsg(ip string, port int) (*skymsg.Service, string) {
msg := newServiceRecord(ip, port)
hash := hashServiceRecord(msg)
glog.Infof("DNS Record:%s, hash:%s", fmt.Sprintf("%v", msg), hash)
return msg, fmt.Sprintf("%x", hash)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package dns
import (
"encoding/json"
"fmt"
"net"
"strings"
@ -24,13 +25,17 @@ import (
"testing"
etcd "github.com/coreos/etcd/client"
"github.com/miekg/dns"
skymsg "github.com/skynetservices/skydns/msg"
skyServer "github.com/skynetservices/skydns/server"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
kapi "k8s.io/kubernetes/pkg/api"
endpointsapi "k8s.io/kubernetes/pkg/api/endpoints"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
fake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/util/sets"
)
const (
@ -106,7 +111,145 @@ func TestNamedSinglePortService(t *testing.T) {
assertNoSRVForNamedPort(t, kd, s, portName2)
}
func TestHeadlessService(t *testing.T) {
func assertARecordsMatchIPs(t *testing.T, records []dns.RR, ips ...string) {
expectedEndpoints := sets.NewString(ips...)
gotEndpoints := sets.NewString()
for _, r := range records {
if a, ok := r.(*dns.A); !ok {
t.Errorf("Expected A record, got %+v", a)
} else {
gotEndpoints.Insert(a.A.String())
}
}
if !gotEndpoints.Equal(expectedEndpoints) {
t.Errorf("Expected %v got %v", expectedEndpoints, gotEndpoints)
}
}
func assertSRVRecordsMatchTarget(t *testing.T, records []dns.RR, targets ...string) {
expectedTargets := sets.NewString(targets...)
gotTargets := sets.NewString()
for _, r := range records {
if srv, ok := r.(*dns.SRV); !ok {
t.Errorf("Expected SRV record, got %+v", srv)
} else {
gotTargets.Insert(srv.Target)
}
}
if !gotTargets.Equal(expectedTargets) {
t.Errorf("Expected %v got %v", expectedTargets, gotTargets)
}
}
func assertSRVRecordsMatchPort(t *testing.T, records []dns.RR, port ...int) {
expectedPorts := sets.NewInt(port...)
gotPorts := sets.NewInt()
for _, r := range records {
if srv, ok := r.(*dns.SRV); !ok {
t.Errorf("Expected SRV record, got %+v", srv)
} else {
gotPorts.Insert(int(srv.Port))
t.Logf("got %+v", srv)
}
}
if !gotPorts.Equal(expectedPorts) {
t.Errorf("Expected %v got %v", expectedPorts, gotPorts)
}
}
func TestSkySimpleSRVLookup(t *testing.T) {
kd := newKubeDNS()
skydnsConfig := &skyServer.Config{Domain: testDomain, DnsAddr: "0.0.0.0:53"}
skyServer.SetDefaults(skydnsConfig)
s := skyServer.New(kd, skydnsConfig)
service := newHeadlessService()
endpointIPs := []string{"10.0.0.1", "10.0.0.2"}
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, endpointIPs...))
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.newService(service)
name := strings.Join([]string{testService, testNamespace, "svc", testDomain}, ".")
question := dns.Question{Name: name, Qtype: dns.TypeSRV, Qclass: dns.ClassINET}
rec, extra, err := s.SRVRecords(question, name, 512, false)
if err != nil {
t.Fatalf("Failed srv record lookup on service with fqdn %v", name)
}
assertARecordsMatchIPs(t, extra, endpointIPs...)
targets := []string{}
for _, eip := range endpointIPs {
// A portal service is always created with a port of '0'
targets = append(targets, fmt.Sprintf("%v.%v", fmt.Sprintf("%x", hashServiceRecord(newServiceRecord(eip, 0))), name))
}
assertSRVRecordsMatchTarget(t, rec, targets...)
}
func TestSkyPodHostnameSRVLookup(t *testing.T) {
kd := newKubeDNS()
skydnsConfig := &skyServer.Config{Domain: testDomain, DnsAddr: "0.0.0.0:53"}
skyServer.SetDefaults(skydnsConfig)
s := skyServer.New(kd, skydnsConfig)
service := newHeadlessService()
endpointIPs := []string{"10.0.0.1", "10.0.0.2"}
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, endpointIPs...))
// The format of thes annotations is:
// endpoints.beta.kubernetes.io/hostnames-map: '{"ep-ip":{"HostName":"pod request hostname"}}'
epRecords := map[string]endpointsapi.HostRecord{}
for i, ep := range endpointIPs {
epRecords[ep] = endpointsapi.HostRecord{HostName: fmt.Sprintf("ep-%d", i)}
}
b, err := json.Marshal(epRecords)
if err != nil {
t.Fatalf("%v", err)
}
endpoints.Annotations = map[string]string{
endpointsapi.PodHostnamesAnnotation: string(b),
}
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.newService(service)
name := strings.Join([]string{testService, testNamespace, "svc", testDomain}, ".")
question := dns.Question{Name: name, Qtype: dns.TypeSRV, Qclass: dns.ClassINET}
rec, _, err := s.SRVRecords(question, name, 512, false)
if err != nil {
t.Fatalf("Failed srv record lookup on service with fqdn %v", name)
}
targets := []string{}
for i := range endpointIPs {
targets = append(targets, fmt.Sprintf("%v.%v", fmt.Sprintf("ep-%d", i), name))
}
assertSRVRecordsMatchTarget(t, rec, targets...)
}
func TestSkyNamedPortSRVLookup(t *testing.T) {
kd := newKubeDNS()
skydnsConfig := &skyServer.Config{Domain: testDomain, DnsAddr: "0.0.0.0:53"}
skyServer.SetDefaults(skydnsConfig)
s := skyServer.New(kd, skydnsConfig)
service := newHeadlessService()
eip := "10.0.0.1"
endpoints := newEndpoints(service, newSubsetWithOnePort("http", 8081, eip))
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.newService(service)
name := strings.Join([]string{"_http", "_tcp", testService, testNamespace, "svc", testDomain}, ".")
question := dns.Question{Name: name, Qtype: dns.TypeSRV, Qclass: dns.ClassINET}
rec, extra, err := s.SRVRecords(question, name, 512, false)
if err != nil {
t.Fatalf("Failed srv record lookup on service with fqdn %v", name)
}
svcDomain := strings.Join([]string{testService, testNamespace, "svc", testDomain}, ".")
assertARecordsMatchIPs(t, extra, eip)
assertSRVRecordsMatchTarget(t, rec, fmt.Sprintf("%v.%v", fmt.Sprintf("%x", hashServiceRecord(newServiceRecord(eip, 0))), svcDomain))
assertSRVRecordsMatchPort(t, rec, 8081)
}
func TestSimpleHeadlessService(t *testing.T) {
kd := newKubeDNS()
s := newHeadlessService()
assert.NoError(t, kd.servicesStore.Add(s))

23
pkg/dns/doc.go Normal file
View File

@ -0,0 +1,23 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package DNS provides a backend for the skydns DNS server started by the
// kubedns cluster addon. It exposes the 2 interface method: Records and
// ReverseRecord, which skydns invokes according to the DNS queries it
// receives. It serves these records by consulting an in memory tree
// populated with Kubernetes Services and Endpoints received from the Kubernetes
// API server.
package dns

View File

@ -19,6 +19,7 @@ package dns
import (
"bytes"
"encoding/json"
skymsg "github.com/skynetservices/skydns/msg"
"strings"
)
@ -49,8 +50,30 @@ func (cache *TreeCache) Serialize() (string, error) {
return string(prettyJSON.Bytes()), nil
}
func (cache *TreeCache) setEntry(key string, val interface{}, path ...string) {
// setEntry creates the entire path if it doesn't already exist in the cache,
// then sets the given service record under the given key. The path this entry
// would have occupied in an etcd datastore is computed from the given fqdn and
// stored as the "Key" of the skydns service; this is only required because
// skydns expects the service record to contain a key in a specific format
// (presumably for legacy compatibility). Note that the fqnd string typically
// contains both the key and all elements in the path.
func (cache *TreeCache) setEntry(key string, val *skymsg.Service, fqdn string, path ...string) {
// TODO: Consolidate setEntry and setSubCache into a single method with a
// type switch.
// TODO: Insted of passing the fqdn as an argument, we can reconstruct
// it from the path, provided callers always pass the full path to the
// object. This is currently *not* the case, since callers first create
// a new, empty node, populate it, then parent it under the right path.
// So we don't know the full key till the final parenting operation.
node := cache.ensureChildNode(path...)
// This key is used to construct the "target" for SRV record lookups.
// For normal service/endpoint lookups, this will result in a key like:
// /skydns/local/cluster/svc/svcNS/svcName/record-hash
// but for headless services that govern pods requesting a specific
// hostname (as used by petset), this will end up being:
// /skydns/local/cluster/svc/svcNS/svcName/pod-hostname
val.Key = skymsg.Path(fqdn)
node.Entries[key] = val
}
@ -65,6 +88,9 @@ func (cache *TreeCache) getSubCache(path ...string) *TreeCache {
return childCache
}
// setSubCache inserts the given subtree under the given path:key. Usually the
// key is the name of a Kubernetes Service, and the path maps to the cluster
// subdomains matching the Service.
func (cache *TreeCache) setSubCache(key string, subCache *TreeCache, path ...string) {
node := cache.ensureChildNode(path...)
node.ChildNodes[key] = subCache
@ -76,8 +102,8 @@ func (cache *TreeCache) getEntry(key string, path ...string) (interface{}, bool)
return val, ok
}
func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []interface{} {
retval := []interface{}{}
func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []*skymsg.Service {
retval := []*skymsg.Service{}
nodesToExplore := []*TreeCache{cache}
for idx, subpath := range path {
nextNodesToExplore := []*TreeCache{}
@ -88,7 +114,7 @@ func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []interfac
nextNodesToExplore = append(nextNodesToExplore, node)
} else {
if val, ok := node.Entries[subpath]; ok {
retval = append(retval, val)
retval = append(retval, val.(*skymsg.Service))
} else {
childNode := node.ChildNodes[subpath]
if childNode != nil {
@ -122,10 +148,9 @@ func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []interfac
for _, node := range nodesToExplore {
for _, val := range node.Entries {
retval = append(retval, val)
retval = append(retval, val.(*skymsg.Service))
}
}
return retval
}