diff --git a/build-tools/kube-dns/Makefile b/build-tools/kube-dns/Makefile index a3e827aaae..f193690c08 100644 --- a/build-tools/kube-dns/Makefile +++ b/build-tools/kube-dns/Makefile @@ -22,7 +22,7 @@ # Default registry, arch and tag. This can be overwritten by arguments to make PLATFORM?=linux ARCH?=amd64 -TAG?=1.8 +TAG?=1.9 REGISTRY?=gcr.io/google_containers GOLANG_VERSION=1.6 diff --git a/cluster/addons/dns/skydns-rc.yaml.base b/cluster/addons/dns/skydns-rc.yaml.base index 78b0a7723a..bce2f1001b 100644 --- a/cluster/addons/dns/skydns-rc.yaml.base +++ b/cluster/addons/dns/skydns-rc.yaml.base @@ -44,7 +44,7 @@ spec: spec: containers: - name: kubedns - image: gcr.io/google_containers/kubedns-amd64:1.8 + image: gcr.io/google_containers/kubedns-amd64:1.9 resources: # TODO: Set memory limits when we've profiled the container for large # clusters, then set request = limit to keep this container in @@ -76,6 +76,7 @@ spec: args: - --domain=__PILLAR__DNS__DOMAIN__. - --dns-port=10053 + - --config-map=kube-dns # This should be set to v=2 only after the new image (cut from 1.5) has # been released, otherwise we will flood the logs. - --v=0 diff --git a/cluster/addons/dns/skydns-rc.yaml.in b/cluster/addons/dns/skydns-rc.yaml.in index 2e9d0ef355..b3a40cf674 100644 --- a/cluster/addons/dns/skydns-rc.yaml.in +++ b/cluster/addons/dns/skydns-rc.yaml.in @@ -44,7 +44,7 @@ spec: spec: containers: - name: kubedns - image: gcr.io/google_containers/kubedns-amd64:1.8 + image: gcr.io/google_containers/kubedns-amd64:1.9 resources: # TODO: Set memory limits when we've profiled the container for large # clusters, then set request = limit to keep this container in @@ -76,6 +76,7 @@ spec: args: - --domain={{ pillar['dns_domain'] }}. - --dns-port=10053 + - --config-map=kube-dns # This should be set to v=2 only after the new image (cut from 1.5) has # been released, otherwise we will flood the logs. - --v=0 diff --git a/cluster/addons/dns/skydns-rc.yaml.sed b/cluster/addons/dns/skydns-rc.yaml.sed index d4e5c39605..049961fb53 100644 --- a/cluster/addons/dns/skydns-rc.yaml.sed +++ b/cluster/addons/dns/skydns-rc.yaml.sed @@ -44,7 +44,7 @@ spec: spec: containers: - name: kubedns - image: gcr.io/google_containers/kubedns-amd64:1.8 + image: gcr.io/google_containers/kubedns-amd64:1.9 resources: # TODO: Set memory limits when we've profiled the container for large # clusters, then set request = limit to keep this container in @@ -76,6 +76,7 @@ spec: args: - --domain=$DNS_DOMAIN. - --dns-port=10053 + - --config-map=kube-dns # This should be set to v=2 only after the new image (cut from 1.5) has # been released, otherwise we will flood the logs. - --v=0 diff --git a/cluster/gce/coreos/kube-manifests/addons/dns/skydns-rc.yaml b/cluster/gce/coreos/kube-manifests/addons/dns/skydns-rc.yaml index 10624c4caa..8a90b80215 100644 --- a/cluster/gce/coreos/kube-manifests/addons/dns/skydns-rc.yaml +++ b/cluster/gce/coreos/kube-manifests/addons/dns/skydns-rc.yaml @@ -24,7 +24,7 @@ spec: spec: containers: - name: kubedns - image: gcr.io/google_containers/kubedns-amd64:1.8 + image: gcr.io/google_containers/kubedns-amd64:1.9 resources: # TODO: Set memory limits when we've profiled the container for large # clusters, then set request = limit to keep this container in diff --git a/cmd/kube-dns/app/BUILD b/cmd/kube-dns/app/BUILD index 3b019704c1..5afe7ea4fc 100644 --- a/cmd/kube-dns/app/BUILD +++ b/cmd/kube-dns/app/BUILD @@ -21,6 +21,7 @@ go_library( "//pkg/client/restclient:go_default_library", "//pkg/client/unversioned/clientcmd:go_default_library", "//pkg/dns:go_default_library", + "//pkg/dns/config:go_default_library", "//vendor:github.com/golang/glog", "//vendor:github.com/skynetservices/skydns/metrics", "//vendor:github.com/skynetservices/skydns/server", diff --git a/cmd/kube-dns/app/options/BUILD b/cmd/kube-dns/app/options/BUILD index ab68d6af54..10b714d107 100644 --- a/cmd/kube-dns/app/options/BUILD +++ b/cmd/kube-dns/app/options/BUILD @@ -15,6 +15,8 @@ go_library( srcs = ["options.go"], tags = ["automanaged"], deps = [ + "//pkg/api:go_default_library", + "//pkg/dns/federation:go_default_library", "//pkg/util/validation:go_default_library", "//vendor:github.com/spf13/pflag", ], diff --git a/cmd/kube-dns/app/options/options.go b/cmd/kube-dns/app/options/options.go index b00997ae6d..c2a1d527ad 100644 --- a/cmd/kube-dns/app/options/options.go +++ b/cmd/kube-dns/app/options/options.go @@ -18,14 +18,15 @@ limitations under the License. package options import ( - "net/url" - "os" - "fmt" _ "net/http/pprof" + "net/url" + "os" "strings" "github.com/spf13/pflag" + "k8s.io/kubernetes/pkg/api" + fed "k8s.io/kubernetes/pkg/dns/federation" "k8s.io/kubernetes/pkg/util/validation" ) @@ -33,22 +34,28 @@ type KubeDNSConfig struct { ClusterDomain string KubeConfigFile string KubeMasterURL string + HealthzPort int DNSBindAddress string DNSPort int - // Federations maps federation names to their registered domain names. + Federations map[string]string + + ConfigMapNs string + ConfigMap string } func NewKubeDNSConfig() *KubeDNSConfig { return &KubeDNSConfig{ ClusterDomain: "cluster.local.", - KubeConfigFile: "", - KubeMasterURL: "", HealthzPort: 8081, DNSBindAddress: "0.0.0.0", DNSPort: 53, - Federations: make(map[string]string), + + Federations: make(map[string]string), + + ConfigMapNs: api.NamespaceSystem, + ConfigMap: "", // default to using command line flags } } @@ -107,25 +114,8 @@ type federationsVar struct { nameDomainMap map[string]string } -// Set deserializes the input string in the format -// "myfederation1=example.com,myfederation2=second.example.com,myfederation3=example.com" -// into a map of key-value pairs of federation names to domain names. func (fv federationsVar) Set(keyVal string) error { - for _, val := range strings.Split(keyVal, ",") { - splits := strings.SplitN(strings.TrimSpace(val), "=", 2) - name := strings.TrimSpace(splits[0]) - domain := strings.TrimSpace(splits[1]) - if errs := validation.IsDNS1123Label(name); len(errs) != 0 { - return fmt.Errorf("%q not a valid federation name: %q", name, errs) - } - // The federation domain name need not strictly be domain names, we - // accept valid dns names with subdomain components. - if errs := validation.IsDNS1123Subdomain(domain); len(errs) != 0 { - return fmt.Errorf("%q not a valid domain name: %q", domain, errs) - } - fv.nameDomainMap[name] = domain - } - return nil + return fed.ParseFederationsFlag(keyVal, fv.nameDomainMap) } func (fv federationsVar) String() string { @@ -141,11 +131,33 @@ func (fv federationsVar) Type() string { } func (s *KubeDNSConfig) AddFlags(fs *pflag.FlagSet) { - fs.Var(clusterDomainVar{&s.ClusterDomain}, "domain", "domain under which to create names") - fs.StringVar(&s.KubeConfigFile, "kubecfg-file", s.KubeConfigFile, "Location of kubecfg file for access to kubernetes master service; --kube-master-url overrides the URL part of this; if neither this nor --kube-master-url are provided, defaults to service account tokens") - fs.Var(kubeMasterURLVar{&s.KubeMasterURL}, "kube-master-url", "URL to reach kubernetes master. Env variables in this flag will be expanded.") - fs.IntVar(&s.HealthzPort, "healthz-port", s.HealthzPort, "port on which to serve a kube-dns HTTP readiness probe.") - fs.StringVar(&s.DNSBindAddress, "dns-bind-address", s.DNSBindAddress, "address on which to serve DNS requests.") + fs.Var(clusterDomainVar{&s.ClusterDomain}, "domain", + "domain under which to create names") + + fs.StringVar(&s.KubeConfigFile, "kubecfg-file", s.KubeConfigFile, + "Location of kubecfg file for access to kubernetes master service;"+ + " --kube-master-url overrides the URL part of this; if neither this nor"+ + " --kube-master-url are provided, defaults to service account tokens") + fs.Var(kubeMasterURLVar{&s.KubeMasterURL}, "kube-master-url", + "URL to reach kubernetes master. Env variables in this flag will be expanded.") + + fs.IntVar(&s.HealthzPort, "healthz-port", s.HealthzPort, + "port on which to serve a kube-dns HTTP readiness probe.") + fs.StringVar(&s.DNSBindAddress, "dns-bind-address", s.DNSBindAddress, + "address on which to serve DNS requests.") fs.IntVar(&s.DNSPort, "dns-port", s.DNSPort, "port on which to serve DNS requests.") - fs.Var(federationsVar{s.Federations}, "federations", "a comma separated list of the federation names and their corresponding domain names to which this cluster belongs. Example: \"myfederation1=example.com,myfederation2=example2.com,myfederation3=example.com\"") + + fs.Var(federationsVar{s.Federations}, "federations", + "a comma separated list of the federation names and their corresponding"+ + " domain names to which this cluster belongs. Example:"+ + " \"myfederation1=example.com,myfederation2=example2.com,myfederation3=example.com\"."+ + " It is an error to set both the federations and config-map flags.") + fs.MarkDeprecated("federations", "use config-map instead. Will be removed in future version") + + fs.StringVar(&s.ConfigMapNs, "config-map-namespace", s.ConfigMapNs, + "namespace for the config-map") + fs.StringVar(&s.ConfigMap, "config-map", s.ConfigMap, + "config-map name. If empty, then the config-map will not used. Cannot be "+ + " used in conjunction with federations flag. config-map contains "+ + "dynamically adjustable configuration.") } diff --git a/cmd/kube-dns/app/server.go b/cmd/kube-dns/app/server.go index 7f17df9349..a369719e2d 100644 --- a/cmd/kube-dns/app/server.go +++ b/cmd/kube-dns/app/server.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/client/restclient" kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" kdns "k8s.io/kubernetes/pkg/dns" + dnsConfig "k8s.io/kubernetes/pkg/dns/config" ) type KubeDNSServer struct { @@ -52,13 +53,25 @@ func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer { if err != nil { glog.Fatalf("Failed to create a kubernetes client: %v", err) } + ks.healthzPort = config.HealthzPort ks.dnsBindAddress = config.DNSBindAddress ks.dnsPort = config.DNSPort - ks.kd, err = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.Federations) - if err != nil { - glog.Fatalf("Failed to start kubeDNS: %v", err) + + var configSync dnsConfig.Sync + if config.ConfigMap == "" { + glog.V(0).Infof("ConfigMap not configured, using values from command line flags") + configSync = dnsConfig.NewNopSync( + &dnsConfig.Config{Federations: config.Federations}) + } else { + glog.V(0).Infof("Using configuration read from ConfigMap: %v:%v", + config.ConfigMapNs, config.ConfigMap) + configSync = dnsConfig.NewSync( + kubeClient, config.ConfigMapNs, config.ConfigMap) } + + ks.kd = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, configSync) + return &ks } diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 0b2d1cc978..806813e199 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -91,6 +91,8 @@ concurrent-replicaset-syncs concurrent-resource-quota-syncs concurrent-serviceaccount-token-syncs concurrent-service-syncs +config-map +config-map-namespace config-sync-period configure-cloud-routes conntrack-max diff --git a/pkg/dns/BUILD b/pkg/dns/BUILD index 9acaf709e6..e54128a6f1 100644 --- a/pkg/dns/BUILD +++ b/pkg/dns/BUILD @@ -23,6 +23,7 @@ go_library( "//pkg/api/unversioned:go_default_library", "//pkg/client/cache:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/dns/config:go_default_library", "//pkg/dns/treecache:go_default_library", "//pkg/dns/util:go_default_library", "//pkg/runtime:go_default_library", @@ -47,6 +48,7 @@ go_test( "//pkg/api/unversioned:go_default_library", "//pkg/client/cache:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/dns/config:go_default_library", "//pkg/dns/treecache:go_default_library", "//pkg/dns/util:go_default_library", "//pkg/util/sets:go_default_library", diff --git a/pkg/dns/config/BUILD b/pkg/dns/config/BUILD new file mode 100644 index 0000000000..8b8da0d165 --- /dev/null +++ b/pkg/dns/config/BUILD @@ -0,0 +1,42 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", + "go_test", + "cgo_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "config.go", + "mocksync.go", + "nopsync.go", + "sync.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/unversioned:go_default_library", + "//pkg/client/cache:go_default_library", + "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/dns/federation:go_default_library", + "//pkg/fields:go_default_library", + "//pkg/runtime:go_default_library", + "//pkg/watch:go_default_library", + "//vendor:github.com/golang/glog", + "//vendor:k8s.io/client-go/pkg/util/wait", + ], +) + +go_test( + name = "go_default_test", + srcs = ["config_test.go"], + library = "go_default_library", + tags = ["automanaged"], + deps = ["//vendor:github.com/stretchr/testify/assert"], +) diff --git a/pkg/dns/config/config.go b/pkg/dns/config/config.go new file mode 100644 index 0000000000..075fdbb9b1 --- /dev/null +++ b/pkg/dns/config/config.go @@ -0,0 +1,65 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + types "k8s.io/kubernetes/pkg/api/unversioned" + fed "k8s.io/kubernetes/pkg/dns/federation" +) + +// Config populated either from the configuration source (command +// line flags or via the config map mechanism). +type Config struct { + // The inclusion of TypeMeta is to ensure future compatibility if the + // Config object was populated directly via a Kubernetes API mechanism. + // + // For example, instead of the custom implementation here, the + // configuration could be obtained from an API that unifies + // command-line flags, config-map, etc mechanisms. + types.TypeMeta + + // Map of federation names that the cluster in which this kube-dns + // is running belongs to, to the corresponding domain names. + Federations map[string]string `json:"federations"` +} + +func NewDefaultConfig() *Config { + return &Config{ + Federations: make(map[string]string), + } +} + +// IsValid returns whether or not the configuration is valid. +func (config *Config) Validate() error { + if err := config.validateFederations(); err != nil { + return err + } + + return nil +} + +func (config *Config) validateFederations() error { + for name, domain := range config.Federations { + if err := fed.ValidateName(name); err != nil { + return err + } + if err := fed.ValidateDomain(domain); err != nil { + return err + } + } + return nil +} diff --git a/pkg/dns/config/config_test.go b/pkg/dns/config/config_test.go new file mode 100644 index 0000000000..8c751bb045 --- /dev/null +++ b/pkg/dns/config/config_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestValidate(t *testing.T) { + for _, testCase := range []struct { + config *Config + hasError bool + }{ + { + config: &Config{Federations: map[string]string{}}, + }, + { + config: &Config{ + Federations: map[string]string{ + "abc": "d.e.f", + }, + }, + }, + { + config: &Config{ + Federations: map[string]string{ + "a.b": "cdef", + }, + }, + hasError: true, + }, + } { + err := testCase.config.Validate() + if !testCase.hasError { + assert.Nil(t, err, "should be valid", testCase) + } else { + assert.NotNil(t, err, "should not be valid", testCase) + } + } +} diff --git a/pkg/dns/config/mocksync.go b/pkg/dns/config/mocksync.go new file mode 100644 index 0000000000..e42f4f1292 --- /dev/null +++ b/pkg/dns/config/mocksync.go @@ -0,0 +1,46 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +// MockSync is a testing mock. +type MockSync struct { + // Config that will be returned from Once(). + Config *Config + // Error that will be returned from Once(). + Error error + + // Chan to send new configurations on. + Chan chan *Config +} + +var _ Sync = (*MockSync)(nil) + +func NewMockSync(config *Config, error error) *MockSync { + return &MockSync{ + Config: config, + Error: error, + Chan: make(chan *Config), + } +} + +func (sync *MockSync) Once() (*Config, error) { + return sync.Config, sync.Error +} + +func (sync *MockSync) Periodic() <-chan *Config { + return sync.Chan +} diff --git a/pkg/dns/config/nopsync.go b/pkg/dns/config/nopsync.go new file mode 100644 index 0000000000..4225a3ffd3 --- /dev/null +++ b/pkg/dns/config/nopsync.go @@ -0,0 +1,37 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +// nopSync does no synchronization, used when the DNS server is +// started without a ConfigMap configured. +type nopSync struct { + config *Config +} + +var _ Sync = (*nopSync)(nil) + +func NewNopSync(config *Config) Sync { + return &nopSync{config: config} +} + +func (sync *nopSync) Once() (*Config, error) { + return sync.config, nil +} + +func (sync *nopSync) Periodic() <-chan *Config { + return make(chan *Config) +} diff --git a/pkg/dns/config/sync.go b/pkg/dns/config/sync.go new file mode 100644 index 0000000000..9022051fd9 --- /dev/null +++ b/pkg/dns/config/sync.go @@ -0,0 +1,201 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "k8s.io/client-go/pkg/util/wait" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + fed "k8s.io/kubernetes/pkg/dns/federation" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" + + "time" + + "github.com/golang/glog" +) + +// Sync manages synchronization of the config map. +type Sync interface { + // Once does a blocking synchronization of the config map. If the + // ConfigMap fails to validate, this method will return nil, err. + Once() (*Config, error) + + // Start a periodic synchronization of the configuration map. When a + // successful configuration map update is detected, the + // configuration will be sent to the channel. + // + // It is an error to call this more than once. + Periodic() <-chan *Config +} + +// NewSync for ConfigMap from namespace `ns` and `name`. +func NewSync(client clientset.Interface, ns string, name string) Sync { + sync := &kubeSync{ + ns: ns, + name: name, + client: client, + channel: make(chan *Config), + } + + listWatch := &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + options.FieldSelector = fields.Set{"metadata.name": name}.AsSelector() + return client.Core().ConfigMaps(ns).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + options.FieldSelector = fields.Set{"metadata.name": name}.AsSelector() + return client.Core().ConfigMaps(ns).Watch(options) + }, + } + + store, controller := cache.NewInformer( + listWatch, + &api.ConfigMap{}, + time.Duration(0), + cache.ResourceEventHandlerFuncs{ + AddFunc: sync.onAdd, + DeleteFunc: sync.onDelete, + UpdateFunc: sync.onUpdate, + }) + + sync.store = store + sync.controller = controller + + return sync +} + +// kubeSync implements Sync for the Kubernetes API. +type kubeSync struct { + ns string + name string + + client clientset.Interface + store cache.Store + controller *cache.Controller + + channel chan *Config + + latestVersion string +} + +var _ Sync = (*kubeSync)(nil) + +func (sync *kubeSync) Once() (*Config, error) { + cm, err := sync.client.Core().ConfigMaps(sync.ns).Get(sync.name) + + if err != nil { + glog.Errorf("Error getting ConfigMap %v:%v err: %v", + sync.ns, sync.name, err) + return nil, err + } + + config, _, err := sync.processUpdate(cm) + return config, err +} + +func (sync *kubeSync) Periodic() <-chan *Config { + go sync.controller.Run(wait.NeverStop) + return sync.channel +} + +func (sync *kubeSync) toConfigMap(obj interface{}) *api.ConfigMap { + cm, ok := obj.(*api.ConfigMap) + if !ok { + glog.Fatalf("Expected ConfigMap, got %T", obj) + } + return cm +} + +func (sync *kubeSync) onAdd(obj interface{}) { + cm := sync.toConfigMap(obj) + + glog.V(2).Infof("ConfigMap %s:%s was created", sync.ns, sync.name) + + config, updated, err := sync.processUpdate(cm) + if updated && err == nil { + sync.channel <- config + } +} + +func (sync *kubeSync) onDelete(_ interface{}) { + glog.V(2).Infof("ConfigMap %s:%s was deleted, reverting to default configuration", + sync.ns, sync.name) + + sync.latestVersion = "" + sync.channel <- NewDefaultConfig() +} + +func (sync *kubeSync) onUpdate(_, obj interface{}) { + cm := sync.toConfigMap(obj) + + glog.V(2).Infof("ConfigMap %s:%s was updated", sync.ns, sync.name) + + config, changed, err := sync.processUpdate(cm) + + if changed && err == nil { + sync.channel <- config + } +} + +func (sync *kubeSync) processUpdate(cm *api.ConfigMap) (config *Config, changed bool, err error) { + glog.V(4).Infof("processUpdate ConfigMap %+v", *cm) + + if cm.ObjectMeta.ResourceVersion != sync.latestVersion { + glog.V(3).Infof("Updating config to version %v (was %v)", + cm.ObjectMeta.ResourceVersion, sync.latestVersion) + changed = true + sync.latestVersion = cm.ObjectMeta.ResourceVersion + } else { + glog.V(4).Infof("Config was unchanged (version %v)", sync.latestVersion) + return + } + + config = &Config{} + + if err = sync.updateFederations(cm, config); err != nil { + glog.Errorf("Invalid configuration, ignoring update") + return + } + + if err = config.Validate(); err != nil { + glog.Errorf("Invalid onfiguration: %v (value was %+v), ignoring update", + err, config) + config = nil + return + } + + return +} + +func (sync *kubeSync) updateFederations(cm *api.ConfigMap, config *Config) (err error) { + if flagValue, ok := cm.Data["federations"]; ok { + config.Federations = make(map[string]string) + if err = fed.ParseFederationsFlag(flagValue, config.Federations); err != nil { + glog.Errorf("Invalid federations value: %v (value was %q)", + err, cm.Data["federations"]) + return + } + glog.V(2).Infof("Updated federations to %v", config.Federations) + } else { + glog.V(2).Infof("No federations present") + } + + return +} diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index cc0e53a068..9d52ec7484 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" kcache "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/dns/config" "k8s.io/kubernetes/pkg/dns/treecache" "k8s.io/kubernetes/pkg/dns/util" "k8s.io/kubernetes/pkg/runtime" @@ -43,8 +44,6 @@ import ( ) const ( - kubernetesSvcName = "kubernetes" - // A subdomain added to the user specified domain for all services. serviceSubdomain = "svc" @@ -67,77 +66,78 @@ type KubeDNS struct { // to get Endpoints and Service objects. kubeClient clientset.Interface - // The domain for which this DNS Server is authoritative. + // domain for which this DNS Server is authoritative. domain string + // configMap where kube-dns dynamic configuration is store. If this + // is empty then getting configuration from a configMap will be + // disabled. + configMap string - // A cache that contains all the endpoints in the system. + // endpointsStore that contains all the endpoints in the system. endpointsStore kcache.Store - - // A cache that contains all the services in the system. + // servicesStore that contains all the services in the system. servicesStore kcache.Store + // nodesStore contains some subset of nodes in the system so that we + // can retrieve the cluster zone annotation from the cached node + // instead of getting it from the API server every time. + nodesStore kcache.Store - // stores DNS records for the domain. - // A Records and SRV Records for (regular) services and headless Services. - // CNAME Records for ExternalName Services. + // cache stores DNS records for the domain. A Records and SRV Records for + // (regular) services and headless Services. CNAME Records for + // ExternalName Services. cache treecache.TreeCache - - // TODO(nikhiljindal): Remove this. It can be recreated using clusterIPServiceMap. + // TODO(nikhiljindal): Remove this. It can be recreated using + // clusterIPServiceMap. reverseRecordMap map[string]*skymsg.Service - - // Map of cluster IP to service object. Headless services are not part of this map. - // Used to get a service when given its cluster IP. - // Access to this is coordinated using cacheLock. We use the same lock for cache and this map - // to ensure that they don't get out of sync. + // clusterIPServiceMap to service object. Headless services are not + // part of this map. Used to get a service when given its cluster + // IP. Access to this is coordinated using cacheLock. We use the + // same lock for cache and this map to ensure that they don't get + // out of sync. clusterIPServiceMap map[string]*kapi.Service - - // caller is responsible for using the cacheLock before invoking methods on cache - // the cache is not thread-safe, and the caller can guarantee thread safety by using + // cacheLock protecting the cache. caller is responsible for using + // the cacheLock before invoking methods on cache the cache is not + // thread-safe, and the caller can guarantee thread safety by using // the cacheLock cacheLock sync.RWMutex - // The domain for which this DNS Server is authoritative, in array format and reversed. - // e.g. if domain is "cluster.local", domainPath is []string{"local", "cluster"} + // The domain for which this DNS Server is authoritative, in array + // format and reversed. e.g. if domain is "cluster.local", + // domainPath is []string{"local", "cluster"} domainPath []string // endpointsController invokes registered callbacks when endpoints change. endpointsController *kcache.Controller - // serviceController invokes registered callbacks when services change. serviceController *kcache.Controller - // Map of federation names that the cluster in which this kube-dns is running belongs to, to - // the corresponding domain names. - federations map[string]string - - // A TTL cache that contains some subset of nodes in the system so that we can retrieve the - // cluster zone annotation from the cached node instead of getting it from the API server - // every time. - nodesStore kcache.Store + // config set from the dynamic configuration source. + config *config.Config + // configLock protects the config below. + configLock sync.RWMutex + // configSync manages synchronization of the config map + configSync config.Sync } -func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) (*KubeDNS, error) { - // Verify that federation names should not contain dots ('.') - // We can not allow dots since we use that as separator for path segments (svcname.nsname.fedname.svc.domain) - for key := range federations { - if strings.ContainsAny(key, ".") { - return nil, fmt.Errorf("invalid federation name: %s, cannot have '.'", key) - } - } +func NewKubeDNS(client clientset.Interface, clusterDomain string, configSync config.Sync) *KubeDNS { kd := &KubeDNS{ kubeClient: client, - domain: domain, + domain: clusterDomain, cache: treecache.NewTreeCache(), cacheLock: sync.RWMutex{}, nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc), reverseRecordMap: make(map[string]*skymsg.Service), clusterIPServiceMap: make(map[string]*kapi.Service), - domainPath: util.ReverseArray(strings.Split(strings.TrimRight(domain, "."), ".")), - federations: federations, + domainPath: util.ReverseArray(strings.Split(strings.TrimRight(clusterDomain, "."), ".")), + + configLock: sync.RWMutex{}, + configSync: configSync, } + kd.setEndpointsStore() kd.setServicesStore() - return kd, nil + return kd } func (kd *KubeDNS) Start() { @@ -147,25 +147,29 @@ func (kd *KubeDNS) Start() { glog.V(2).Infof("Starting serviceController") go kd.serviceController.Run(wait.NeverStop) - // Wait synchronously for the Kubernetes service and add a DNS - // record for it. This ensures that the Start function returns only - // after having received Service objects from APIServer. + kd.startConfigMapSync() + + // Wait synchronously for the Kubernetes service. This ensures that + // the Start function returns only after having received Service + // objects from APIServer. // // TODO: we might not have to wait for kubernetes service // specifically. We should just wait for a list operation to be // complete from APIServer. - glog.V(2).Infof("Waiting for Kubernetes service") kd.waitForKubernetesService() } -func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) { +func (kd *KubeDNS) waitForKubernetesService() { + glog.V(2).Infof("Waiting for Kubernetes service") + + const kubernetesSvcName = "kubernetes" + const servicePollInterval = 1 * time.Second + name := fmt.Sprintf("%v/%v", kapi.NamespaceDefault, kubernetesSvcName) glog.V(2).Infof("Waiting for service: %v", name) - var err error - servicePollInterval := 1 * time.Second for { - svc, err = kd.kubeClient.Core().Services(kapi.NamespaceDefault).Get(kubernetesSvcName) + svc, err := kd.kubeClient.Core().Services(kapi.NamespaceDefault).Get(kubernetesSvcName) if err != nil || svc == nil { glog.V(3).Infof( "Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.", @@ -179,6 +183,30 @@ func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) { return } +func (kd *KubeDNS) startConfigMapSync() { + initialConfig, err := kd.configSync.Once() + if err != nil { + glog.Errorf( + "Error getting initial ConfigMap: %v, starting with default values", err) + kd.config = config.NewDefaultConfig() + } else { + kd.config = initialConfig + } + + go kd.syncConfigMap(kd.configSync.Periodic()) +} + +func (kd *KubeDNS) syncConfigMap(syncChan <-chan *config.Config) { + for { + nextConfig := <-syncChan + + kd.configLock.Lock() + kd.config = nextConfig + glog.V(2).Infof("Configuration updated: %+v", *kd.config) + kd.configLock.Unlock() + } +} + func (kd *KubeDNS) GetCacheAsJSON() (string, error) { kd.cacheLock.RLock() defer kd.cacheLock.RUnlock() @@ -528,6 +556,9 @@ func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string, // We know that a headless service has endpoints for sure if a // record was returned for it. The record contains endpoint // IPs. So nothing to check for headless services. + // + // TODO: this access to the cluster IP map does not seem to be + // threadsafe. if !kd.isHeadlessServiceRecord(&val) { ok, err := kd.serviceWithClusterIPHasEndpoints(&val) if err != nil { @@ -740,10 +771,15 @@ func (kd *KubeDNS) isFederationQuery(path []string) bool { return false } } - if _, ok := kd.federations[path[2]]; !ok { - glog.V(4).Infof("Not a federation query: kd.federations[%q] not found", path[2]) + + kd.configLock.RLock() + defer kd.configLock.RUnlock() + + if _, ok := kd.config.Federations[path[2]]; !ok { + glog.V(4).Infof("Not a federation query: label %q not found", path[2]) return false } + return true } @@ -775,7 +811,10 @@ func (kd *KubeDNS) federationRecords(queryPath []string) ([]skymsg.Service, erro // We have already established that the map entry exists for the given federation, // we just need to retrieve the domain name, validate it and append it to the path. - domain := kd.federations[path[2]] + kd.configLock.RLock() + domain := kd.config.Federations[path[2]] + kd.configLock.RUnlock() + // We accept valid subdomains as well, so just let all the valid subdomains. if len(validation.IsDNS1123Subdomain(domain)) != 0 { return nil, fmt.Errorf("%s is not a valid domain name for federation %s", domain, path[2]) diff --git a/pkg/dns/dns_test.go b/pkg/dns/dns_test.go index 019d78f36c..7849c4d016 100644 --- a/pkg/dns/dns_test.go +++ b/pkg/dns/dns_test.go @@ -20,9 +20,11 @@ import ( "encoding/json" "fmt" "net" + "reflect" "strings" "sync" "testing" + "time" etcd "github.com/coreos/etcd/client" "github.com/miekg/dns" @@ -35,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/cache" fake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/dns/config" "k8s.io/kubernetes/pkg/dns/treecache" "k8s.io/kubernetes/pkg/dns/util" "k8s.io/kubernetes/pkg/util/sets" @@ -48,25 +51,22 @@ const ( ) func newKubeDNS() *KubeDNS { - kd := &KubeDNS{ - domain: testDomain, - endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc), - servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc), + return &KubeDNS{ + domain: testDomain, + domainPath: util.ReverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")), + + endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc), + servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc), + nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc), + cache: treecache.NewTreeCache(), reverseRecordMap: make(map[string]*skymsg.Service), clusterIPServiceMap: make(map[string]*kapi.Service), cacheLock: sync.RWMutex{}, - domainPath: util.ReverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")), - nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc), - } - return kd -} -func TestNewKubeDNS(t *testing.T) { - // Verify that it returns an error for invalid federation names. - _, err := NewKubeDNS(nil, "domainName", map[string]string{"invalid.name.with.dot": "example.come"}) - if err == nil { - t.Errorf("Expected an error due to invalid federation name") + config: config.NewDefaultConfig(), + configLock: sync.RWMutex{}, + configSync: config.NewNopSync(config.NewDefaultConfig()), } } @@ -384,93 +384,95 @@ func verifyRecord(q, a string, t *testing.T, kd *KubeDNS) { assert.Equal(t, a, records[0].Host) } -// Verifies that quering KubeDNS for a headless federation service returns the DNS hostname when a local service does not exist and returns the endpoint IP when a local service exists. +const federatedServiceFQDN = "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com." + +// Verifies that querying KubeDNS for a headless federation service +// returns the DNS hostname when a local service does not exist and +// returns the endpoint IP when a local service exists. func TestFederationHeadlessService(t *testing.T) { kd := newKubeDNS() - kd.federations = map[string]string{ + kd.config.Federations = map[string]string{ "myfederation": "example.com", } kd.kubeClient = fake.NewSimpleClientset(newNodes()) - // Verify that quering for federation service returns a federation domain name. + // Verify that querying for federation service returns a federation domain name. verifyRecord("testservice.default.myfederation.svc.cluster.local.", - "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", - t, kd) + federatedServiceFQDN, t, kd) // Add a local service without any endpoint. s := newHeadlessService() assert.NoError(t, kd.servicesStore.Add(s)) kd.newService(s) - // Verify that quering for federation service still returns the federation domain name. + // Verify that querying for federation service still returns the federation domain name. verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), - "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", - t, kd) + federatedServiceFQDN, t, kd) // Now add an endpoint. endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1")) assert.NoError(t, kd.endpointsStore.Add(endpoints)) kd.updateService(s, s) - // Verify that quering for federation service returns the local service domain name this time. - verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), "testservice.default.svc.cluster.local.", t, kd) + // Verify that querying for federation service returns the local service domain name this time. + verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), + "testservice.default.svc.cluster.local.", t, kd) // Delete the endpoint. endpoints.Subsets = []kapi.EndpointSubset{} kd.handleEndpointAdd(endpoints) kd.updateService(s, s) - // Verify that quering for federation service returns the federation domain name again. + // Verify that querying for federation service returns the federation domain name again. verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), - "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", - t, kd) + federatedServiceFQDN, t, kd) } -// Verifies that quering KubeDNS for a federation service returns the DNS hostname if no endpoint exists and returns the local cluster IP if endpoints exist. +// Verifies that querying KubeDNS for a federation service returns the +// DNS hostname if no endpoint exists and returns the local cluster IP +// if endpoints exist. func TestFederationService(t *testing.T) { kd := newKubeDNS() - kd.federations = map[string]string{ + kd.config.Federations = map[string]string{ "myfederation": "example.com", } kd.kubeClient = fake.NewSimpleClientset(newNodes()) - // Verify that quering for federation service returns the federation domain name. + // Verify that querying for federation service returns the federation domain name. verifyRecord("testservice.default.myfederation.svc.cluster.local.", - "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", - t, kd) + federatedServiceFQDN, t, kd) // Add a local service without any endpoint. s := newService(testNamespace, testService, "1.2.3.4", "", 80) assert.NoError(t, kd.servicesStore.Add(s)) kd.newService(s) - // Verify that quering for federation service still returns the federation domain name. + // Verify that querying for federation service still returns the federation domain name. verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), - "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", - t, kd) + federatedServiceFQDN, t, kd) // Now add an endpoint. endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1")) assert.NoError(t, kd.endpointsStore.Add(endpoints)) kd.updateService(s, s) - // Verify that quering for federation service returns the local service domain name this time. - verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), "testservice.default.svc.cluster.local.", t, kd) + // Verify that querying for federation service returns the local service domain name this time. + verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), + "testservice.default.svc.cluster.local.", t, kd) // Remove the endpoint. endpoints.Subsets = []kapi.EndpointSubset{} kd.handleEndpointAdd(endpoints) kd.updateService(s, s) - // Verify that quering for federation service returns the federation domain name again. + // Verify that querying for federation service returns the federation domain name again. verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), - "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", - t, kd) + federatedServiceFQDN, t, kd) } func TestFederationQueryWithoutCache(t *testing.T) { kd := newKubeDNS() - kd.federations = map[string]string{ + kd.config.Federations = map[string]string{ "myfederation": "example.com", "secondfederation": "second.example.com", } @@ -482,7 +484,7 @@ func TestFederationQueryWithoutCache(t *testing.T) { func TestFederationQueryWithCache(t *testing.T) { kd := newKubeDNS() - kd.federations = map[string]string{ + kd.config.Federations = map[string]string{ "myfederation": "example.com", "secondfederation": "second.example.com", } @@ -530,12 +532,64 @@ func testInvalidFederationQueries(t *testing.T, kd *KubeDNS) { t.Errorf("expected not found error, got nil") } if etcdErr, ok := err.(etcd.Error); !ok || etcdErr.Code != etcd.ErrorCodeKeyNotFound { - t.Errorf("expected not found error, got %v", etcdErr) + t.Errorf("expected not found error, got %v", err) } assert.Equal(t, 0, len(records)) } } +func checkConfigEqual(t *testing.T, kd *KubeDNS, expected *config.Config) { + const timeout = time.Duration(5) + + start := time.Now() + + ok := false + + for time.Since(start) < timeout*time.Second { + kd.configLock.RLock() + isEqual := reflect.DeepEqual(expected.Federations, kd.config.Federations) + kd.configLock.RUnlock() + + if isEqual { + ok = true + break + } + } + + if !ok { + t.Errorf("Federations should be %v, got %v", + expected.Federations, kd.config.Federations) + } +} + +func TestConfigSync(t *testing.T) { + kd := newKubeDNS() + mockSync := config.NewMockSync( + &config.Config{Federations: make(map[string]string)}, nil) + kd.configSync = mockSync + + kd.startConfigMapSync() + + checkConfigEqual(t, kd, &config.Config{Federations: make(map[string]string)}) + // update + mockSync.Chan <- &config.Config{Federations: map[string]string{"name1": "domain1"}} + checkConfigEqual(t, kd, &config.Config{Federations: map[string]string{"name1": "domain1"}}) + // update + mockSync.Chan <- &config.Config{Federations: map[string]string{"name2": "domain2"}} + checkConfigEqual(t, kd, &config.Config{Federations: map[string]string{"name2": "domain2"}}) +} + +func TestConfigSyncInitialMap(t *testing.T) { + // start with different initial map + kd := newKubeDNS() + mockSync := config.NewMockSync( + &config.Config{Federations: map[string]string{"name3": "domain3"}}, nil) + kd.configSync = mockSync + + kd.startConfigMapSync() + checkConfigEqual(t, kd, &config.Config{Federations: map[string]string{"name3": "domain3"}}) +} + func newNodes() *kapi.NodeList { return &kapi.NodeList{ Items: []kapi.Node{ diff --git a/pkg/dns/federation/BUILD b/pkg/dns/federation/BUILD new file mode 100644 index 0000000000..9502665067 --- /dev/null +++ b/pkg/dns/federation/BUILD @@ -0,0 +1,26 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", + "go_test", + "cgo_library", +) + +go_library( + name = "go_default_library", + srcs = ["federation.go"], + tags = ["automanaged"], + deps = ["//pkg/util/validation:go_default_library"], +) + +go_test( + name = "go_default_test", + srcs = ["federation_test.go"], + library = "go_default_library", + tags = ["automanaged"], + deps = ["//vendor:github.com/stretchr/testify/assert"], +) diff --git a/pkg/dns/federation/federation.go b/pkg/dns/federation/federation.go new file mode 100644 index 0000000000..acbb1c4f26 --- /dev/null +++ b/pkg/dns/federation/federation.go @@ -0,0 +1,75 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Fed contains federation specific DNS code. +package fed + +import ( + "errors" + "fmt" + "strings" + + "k8s.io/kubernetes/pkg/util/validation" +) + +var ErrExpectedKeyEqualsValue = errors.New("invalid format, must be key=value") + +// ParseFederationsFlag parses the federations command line flag. The +// flag is a comma-separated list of zero or more "name=label" pairs, +// e.g. "a=b,c=d". +func ParseFederationsFlag(str string, federations map[string]string) error { + if strings.TrimSpace(str) == "" { + return nil + } + + for _, val := range strings.Split(str, ",") { + splits := strings.SplitN(strings.TrimSpace(val), "=", 2) + if len(splits) != 2 { + return ErrExpectedKeyEqualsValue + } + + name := strings.TrimSpace(splits[0]) + domain := strings.TrimSpace(splits[1]) + + if err := ValidateName(name); err != nil { + return err + } + if err := ValidateDomain(domain); err != nil { + return err + } + federations[name] = domain + } + + return nil +} + +// ValidateName checks the validity of a federation name. +func ValidateName(name string) error { + if errs := validation.IsDNS1123Label(name); len(errs) != 0 { + return fmt.Errorf("%q not a valid federation name: %q", name, errs) + } + return nil +} + +// ValidateDomain checks the validity of a federation label. +func ValidateDomain(name string) error { + // The federation domain name need not strictly be domain names, we + // accept valid dns names with subdomain components. + if errs := validation.IsDNS1123Subdomain(name); len(errs) != 0 { + return fmt.Errorf("%q not a valid domain name: %q", name, errs) + } + return nil +} diff --git a/pkg/dns/federation/federation_test.go b/pkg/dns/federation/federation_test.go new file mode 100644 index 0000000000..c8ce8de443 --- /dev/null +++ b/pkg/dns/federation/federation_test.go @@ -0,0 +1,76 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fed + +import ( + "github.com/stretchr/testify/assert" + + "reflect" + "testing" +) + +func TestParseFederationsFlag(t *testing.T) { + type TestCase struct { + input string + hasError bool + expected map[string]string + } + + for _, testCase := range []TestCase{ + {input: "", expected: make(map[string]string)}, + {input: "a=b", expected: map[string]string{"a": "b"}}, + {input: "a=b,cc=dd", expected: map[string]string{"a": "b", "cc": "dd"}}, + {input: "abc=d.e.f", expected: map[string]string{"abc": "d.e.f"}}, + + {input: "ccdd", hasError: true}, + {input: "a=b,ccdd", hasError: true}, + {input: "-", hasError: true}, + {input: "a.b.c=d.e.f", hasError: true}, + } { + output := make(map[string]string) + err := ParseFederationsFlag(testCase.input, output) + + if !testCase.hasError { + assert.Nil(t, err, "unexpected err", testCase) + assert.True(t, reflect.DeepEqual( + testCase.expected, output), output, testCase) + } else { + assert.NotNil(t, err, testCase) + } + } +} + +func TestValidateName(t *testing.T) { + // More complete testing is done in validation.IsDNS1123Label. These + // tests are to catch issues specific to the implementation of + // kube-dns. + assert.NotNil(t, ValidateName("")) + assert.NotNil(t, ValidateName(".")) + assert.NotNil(t, ValidateName("ab.cd")) + assert.Nil(t, ValidateName("abcd")) +} + +func TestValidateDomain(t *testing.T) { + // More complete testing is done in + // validation.IsDNS1123Subdomain. These tests are to catch issues + // specific to the implementation of kube-dns. + assert.NotNil(t, ValidateDomain("")) + assert.NotNil(t, ValidateDomain(".")) + assert.Nil(t, ValidateDomain("ab.cd")) + assert.Nil(t, ValidateDomain("abcd")) + assert.Nil(t, ValidateDomain("a.b.c.d")) +} diff --git a/pkg/dns/treecache/treecache_test.go b/pkg/dns/treecache/treecache_test.go index e03cb0b21f..b6fcbf885f 100644 --- a/pkg/dns/treecache/treecache_test.go +++ b/pkg/dns/treecache/treecache_test.go @@ -110,7 +110,7 @@ func TestTreeCache(t *testing.T) { {"key3", []string{"p1", "p3"}}, } { if _, ok := tc.GetEntry(testCase.k, testCase.p...); ok { - t.Error() + t.Error("path should not exist") } } } diff --git a/test/e2e/BUILD b/test/e2e/BUILD index a5b4a7f4d6..3b29763f3a 100644 --- a/test/e2e/BUILD +++ b/test/e2e/BUILD @@ -31,6 +31,7 @@ go_library( "disruption.go", "dns.go", "dns_autoscaling.go", + "dns_configmap.go", "e2e.go", "empty.go", "empty_dir_wrapper.go", @@ -148,6 +149,7 @@ go_library( "//pkg/controller/petset:go_default_library", "//pkg/controller/replicaset:go_default_library", "//pkg/controller/replication:go_default_library", + "//pkg/dns/federation:go_default_library", "//pkg/fields:go_default_library", "//pkg/kubectl:go_default_library", "//pkg/kubectl/cmd/util:go_default_library", diff --git a/test/e2e/dns_configmap.go b/test/e2e/dns_configmap.go new file mode 100644 index 0000000000..80e960d8e6 --- /dev/null +++ b/test/e2e/dns_configmap.go @@ -0,0 +1,318 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "strings" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + fed "k8s.io/kubernetes/pkg/dns/federation" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +type dnsConfigMapTest struct { + f *framework.Framework + c clientset.Interface + ns string + name string + labels []string + + cm *api.ConfigMap + isValid bool + + dnsPod *api.Pod + utilPod *api.Pod + utilService *api.Service +} + +var _ = framework.KubeDescribe("DNS config map", func() { + test := &dnsConfigMapTest{ + f: framework.NewDefaultFramework("dns-config-map"), + ns: "kube-system", + name: "kube-dns", + } + + BeforeEach(func() { + test.c = test.f.ClientSet + }) + + It("should be able to change configuration", func() { + test.run() + }) +}) + +func (t *dnsConfigMapTest) init() { + By("Finding a DNS pod") + label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "kube-dns"})) + options := api.ListOptions{LabelSelector: label} + + pods, err := t.f.ClientSet.Core().Pods("kube-system").List(options) + Expect(err).NotTo(HaveOccurred()) + Expect(len(pods.Items)).Should(BeNumerically(">=", 1)) + + t.dnsPod = &pods.Items[0] + framework.Logf("Using DNS pod: %v", t.dnsPod.Name) +} + +func (t *dnsConfigMapTest) run() { + t.init() + + defer t.c.Core().ConfigMaps(t.ns).Delete(t.name, nil) + t.createUtilPod() + defer t.deleteUtilPod() + + t.validate() + + t.labels = []string{"abc", "ghi"} + valid1 := map[string]string{"federations": t.labels[0] + "=def"} + valid2 := map[string]string{"federations": t.labels[1] + "=xyz"} + invalid := map[string]string{"federations": "invalid.map=xyz"} + + By("empty -> valid1") + t.setConfigMap(&api.ConfigMap{Data: valid1}, true) + t.validate() + + By("valid1 -> valid2") + t.setConfigMap(&api.ConfigMap{Data: valid2}, true) + t.validate() + + By("valid2 -> invalid") + t.setConfigMap(&api.ConfigMap{Data: invalid}, false) + t.validate() + + By("invalid -> valid1") + t.setConfigMap(&api.ConfigMap{Data: valid1}, true) + t.validate() + + By("valid1 -> deleted") + t.deleteConfigMap() + t.validate() + + By("deleted -> invalid") + t.setConfigMap(&api.ConfigMap{Data: invalid}, false) + t.validate() +} + +func (t *dnsConfigMapTest) validate() { + t.validateFederation() +} + +func (t *dnsConfigMapTest) validateFederation() { + federations := make(map[string]string) + if t.cm != nil { + err := fed.ParseFederationsFlag(t.cm.Data["federations"], federations) + Expect(err).NotTo(HaveOccurred()) + } + + if len(federations) == 0 { + By(fmt.Sprintf("Validating federation labels %v do not exist", t.labels)) + + for _, label := range t.labels { + var federationDNS = fmt.Sprintf("e2e-dns-configmap.%s.%s.svc.cluster.local.", + t.f.Namespace.Name, label) + predicate := func(actual []string) bool { + return len(actual) == 0 + } + t.checkDNSRecord(federationDNS, predicate, wait.ForeverTestTimeout) + } + } else { + for label := range federations { + var federationDNS = fmt.Sprintf("%s.%s.%s.svc.cluster.local.", + t.utilService.ObjectMeta.Name, t.f.Namespace.Name, label) + var localDNS = fmt.Sprintf("%s.%s.svc.cluster.local.", + t.utilService.ObjectMeta.Name, t.f.Namespace.Name) + // Check local mapping. Checking a remote mapping requires + // creating an arbitrary DNS record which is not possible at the + // moment. + By(fmt.Sprintf("Validating federation record %v", label)) + predicate := func(actual []string) bool { + for _, v := range actual { + if v == localDNS { + return true + } + } + return false + } + t.checkDNSRecord(federationDNS, predicate, wait.ForeverTestTimeout) + } + } +} + +func (t *dnsConfigMapTest) checkDNSRecord(name string, predicate func([]string) bool, timeout time.Duration) { + var actual []string + + err := wait.PollImmediate( + time.Duration(1)*time.Second, + timeout, + func() (bool, error) { + actual = t.runDig(name) + if predicate(actual) { + return true, nil + } + return false, nil + }) + + if err != nil { + framework.Logf("dig result did not match: %#v after %v", + actual, timeout) + } +} + +// runDig querying for `dnsName`. Returns a list of responses. +func (t *dnsConfigMapTest) runDig(dnsName string) []string { + cmd := []string{ + "/usr/bin/dig", + "+short", + "@" + t.dnsPod.Status.PodIP, + "-p", "10053", dnsName, + } + stdout, stderr, err := t.f.ExecWithOptions(framework.ExecOptions{ + Command: cmd, + Namespace: t.f.Namespace.Name, + PodName: t.utilPod.Name, + ContainerName: "util", + CaptureStdout: true, + CaptureStderr: true, + }) + + Expect(err).NotTo(HaveOccurred()) + + framework.Logf("Running dig: %v, stdout: %q, stderr: %q", + cmd, stdout, stderr) + + if stdout == "" { + return []string{} + } else { + return strings.Split(stdout, "\n") + } +} + +func (t *dnsConfigMapTest) setConfigMap(cm *api.ConfigMap, isValid bool) { + if isValid { + t.cm = cm + } + t.isValid = isValid + + cm.ObjectMeta.Namespace = t.ns + cm.ObjectMeta.Name = t.name + + options := api.ListOptions{ + FieldSelector: fields.Set{ + "metadata.namespace": t.ns, + "metadata.name": t.name, + }.AsSelector(), + } + cmList, err := t.c.Core().ConfigMaps(t.ns).List(options) + Expect(err).NotTo(HaveOccurred()) + + if len(cmList.Items) == 0 { + By(fmt.Sprintf("Creating the ConfigMap (%s:%s) %+v", t.ns, t.name, *cm)) + _, err := t.c.Core().ConfigMaps(t.ns).Create(cm) + Expect(err).NotTo(HaveOccurred()) + } else { + By(fmt.Sprintf("Updating the ConfigMap (%s:%s) to %+v", t.ns, t.name, *cm)) + _, err := t.c.Core().ConfigMaps(t.ns).Update(cm) + Expect(err).NotTo(HaveOccurred()) + } +} + +func (t *dnsConfigMapTest) deleteConfigMap() { + By(fmt.Sprintf("Deleting the ConfigMap (%s:%s)", t.ns, t.name)) + + t.cm = nil + t.isValid = false + + err := t.c.Core().ConfigMaps(t.ns).Delete(t.name, nil) + Expect(err).NotTo(HaveOccurred()) +} + +func (t *dnsConfigMapTest) createUtilPod() { + // Actual port # doesn't matter, just need to exist. + const servicePort = 10101 + + t.utilPod = &api.Pod{ + TypeMeta: unversioned.TypeMeta{ + Kind: "Pod", + }, + ObjectMeta: api.ObjectMeta{ + Namespace: t.f.Namespace.Name, + Labels: map[string]string{"app": "e2e-dns-configmap"}, + GenerateName: "e2e-dns-configmap-", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "util", + Image: "gcr.io/google_containers/dnsutils:e2e", + Command: []string{"sleep", "10000"}, + Ports: []api.ContainerPort{ + {ContainerPort: servicePort, Protocol: "TCP"}, + }, + }, + }, + }, + } + + var err error + t.utilPod, err = t.c.Core().Pods(t.f.Namespace.Name).Create(t.utilPod) + Expect(err).NotTo(HaveOccurred()) + framework.Logf("Created pod %v", t.utilPod) + Expect(t.f.WaitForPodRunning(t.utilPod.Name)).NotTo(HaveOccurred()) + + t.utilService = &api.Service{ + TypeMeta: unversioned.TypeMeta{ + Kind: "Service", + }, + ObjectMeta: api.ObjectMeta{ + Namespace: t.f.Namespace.Name, + Name: "e2e-dns-configmap", + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{"app": "e2e-dns-configmap"}, + Ports: []api.ServicePort{ + { + Protocol: "TCP", + Port: servicePort, + TargetPort: intstr.FromInt(servicePort), + }, + }, + }, + } + + t.utilService, err = t.c.Core().Services(t.f.Namespace.Name).Create(t.utilService) + Expect(err).NotTo(HaveOccurred()) + framework.Logf("Created service %v", t.utilService) +} + +func (t *dnsConfigMapTest) deleteUtilPod() { + podClient := t.c.Core().Pods(t.f.Namespace.Name) + if err := podClient.Delete(t.utilPod.Name, api.NewDeleteOptions(0)); err != nil { + framework.Logf("Delete of pod %v:%v failed: %v", + t.utilPod.Namespace, t.utilPod.Name, err) + } +} diff --git a/test/e2e/framework/exec_util.go b/test/e2e/framework/exec_util.go index a6939702e7..2a553bd112 100644 --- a/test/e2e/framework/exec_util.go +++ b/test/e2e/framework/exec_util.go @@ -30,42 +30,82 @@ import ( . "github.com/onsi/gomega" ) -// ExecCommandInContainer execute a command in the specified container. -// Pass in stdin, tty if needed in the future. -func (f *Framework) ExecCommandInContainer(podName, containerName string, cmd ...string) string { - stdout, stderr, err := f.ExecCommandInContainerWithFullOutput(podName, containerName, cmd...) - Logf("Exec stderr: %q", stderr) - Expect(err).NotTo(HaveOccurred(), "fail to execute command") - return stdout +// ExecOptions passed to ExecWithOptions +type ExecOptions struct { + Command []string + + Namespace string + PodName string + ContainerName string + + Stdin io.Reader + CaptureStdout bool + CaptureStderr bool + // If false, whitespace in std{err,out} will be removed. + PreserveWhitespace bool } -// ExecCommandInContainerWithFullOutput executes a command in the specified container and return stdout, stderr and error -func (f *Framework) ExecCommandInContainerWithFullOutput(podName, containerName string, cmd ...string) (string, string, error) { - Logf("Exec running '%s'", strings.Join(cmd, " ")) +// ExecWithOptions executes a command in the specified container, +// returning stdout, stderr and error. `options` allowed for +// additional parameters to be passed. +func (f *Framework) ExecWithOptions(options ExecOptions) (string, string, error) { + Logf("ExecWithOptions %+v", options) + config, err := LoadConfig() Expect(err).NotTo(HaveOccurred(), "failed to load restclient config") - var stdout, stderr bytes.Buffer - var stdin io.Reader - tty := false + + const tty = false + req := f.ClientSet.Core().RESTClient().Post(). Resource("pods"). - Name(podName). - Namespace(f.Namespace.Name). + Name(options.PodName). + Namespace(options.Namespace). SubResource("exec"). - Param("container", containerName) + Param("container", options.ContainerName) req.VersionedParams(&api.PodExecOptions{ - Container: containerName, - Command: cmd, - Stdin: stdin != nil, - Stdout: true, - Stderr: true, + Container: options.ContainerName, + Command: options.Command, + Stdin: options.Stdin != nil, + Stdout: options.CaptureStdout, + Stderr: options.CaptureStderr, TTY: tty, }, api.ParameterCodec) - err = execute("POST", req.URL(), config, stdin, &stdout, &stderr, tty) + var stdout, stderr bytes.Buffer + err = execute("POST", req.URL(), config, options.Stdin, &stdout, &stderr, tty) + + if options.PreserveWhitespace { + return stdout.String(), stderr.String(), err + } return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), err } +// ExecCommandInContainerWithFullOutput executes a command in the +// specified container and return stdout, stderr and error +func (f *Framework) ExecCommandInContainerWithFullOutput(podName, containerName string, cmd ...string) (string, string, error) { + return f.ExecWithOptions(ExecOptions{ + Command: cmd, + Namespace: f.Namespace.Name, + PodName: podName, + ContainerName: containerName, + + Stdin: nil, + CaptureStdout: true, + CaptureStderr: true, + PreserveWhitespace: false, + }) +} + +// ExecCommandInContainer executes a command in the specified container. +func (f *Framework) ExecCommandInContainer(podName, containerName string, cmd ...string) string { + stdout, stderr, err := f.ExecCommandInContainerWithFullOutput(podName, containerName, cmd...) + Logf("Exec stderr: %q", stderr) + Expect(err).NotTo(HaveOccurred(), + "failed to execute command in pod %v, container %v: %v", + podName, containerName, err) + return stdout +} + func (f *Framework) ExecShellInContainer(podName, containerName string, cmd string) string { return f.ExecCommandInContainer(podName, containerName, "/bin/sh", "-c", cmd) }