mirror of https://github.com/k3s-io/k3s
Switch kube-dns to use external versioned API instead of the internal version.
parent
26028bce52
commit
3ee2b76554
|
@ -28,8 +28,8 @@ import (
|
|||
"github.com/skynetservices/skydns/server"
|
||||
"k8s.io/kubernetes/cmd/kube-dns/app/options"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
kclient "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||
kdns "k8s.io/kubernetes/pkg/dns"
|
||||
)
|
||||
|
@ -56,7 +56,7 @@ func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer {
|
|||
}
|
||||
|
||||
// TODO: evaluate using pkg/client/clientcmd
|
||||
func newKubeClient(dnsConfig *options.KubeDNSConfig) (*kclient.Client, error) {
|
||||
func newKubeClient(dnsConfig *options.KubeDNSConfig) (clientset.Interface, error) {
|
||||
var (
|
||||
config *restclient.Config
|
||||
err error
|
||||
|
@ -85,7 +85,7 @@ func newKubeClient(dnsConfig *options.KubeDNSConfig) (*kclient.Client, error) {
|
|||
|
||||
glog.Infof("Using %s for kubernetes master", config.Host)
|
||||
glog.Infof("Using kubernetes API %v", config.GroupVersion)
|
||||
return kclient.New(config)
|
||||
return clientset.NewForConfig(config)
|
||||
}
|
||||
|
||||
func (server *KubeDNSServer) Run() {
|
||||
|
|
|
@ -30,13 +30,15 @@ import (
|
|||
skymsg "github.com/skynetservices/skydns/msg"
|
||||
kapi "k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/endpoints"
|
||||
kunversioned "k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
v1 "k8s.io/kubernetes/pkg/api/v1"
|
||||
kcache "k8s.io/kubernetes/pkg/client/cache"
|
||||
kclient "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
|
||||
kframework "k8s.io/kubernetes/pkg/controller/framework"
|
||||
kselector "k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/validation"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -62,7 +64,7 @@ const (
|
|||
type KubeDNS struct {
|
||||
// kubeClient makes calls to API Server and registers calls with API Server
|
||||
// to get Endpoints and Service objects.
|
||||
kubeClient *kclient.Client
|
||||
kubeClient clientset.Interface
|
||||
|
||||
// The domain for which this DNS Server is authoritative.
|
||||
domain string
|
||||
|
@ -102,7 +104,7 @@ type KubeDNS struct {
|
|||
nodesStore kcache.Store
|
||||
}
|
||||
|
||||
func NewKubeDNS(client *kclient.Client, domain string, federations map[string]string) *KubeDNS {
|
||||
func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) *KubeDNS {
|
||||
kd := &KubeDNS{
|
||||
kubeClient: client,
|
||||
domain: domain,
|
||||
|
@ -127,13 +129,13 @@ func (kd *KubeDNS) Start() {
|
|||
kd.waitForKubernetesService()
|
||||
}
|
||||
|
||||
func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) {
|
||||
func (kd *KubeDNS) waitForKubernetesService() (svc *v1.Service) {
|
||||
name := fmt.Sprintf("%v/%v", kapi.NamespaceDefault, kubernetesSvcName)
|
||||
glog.Infof("Waiting for service: %v", name)
|
||||
var err error
|
||||
servicePollInterval := 1 * time.Second
|
||||
for {
|
||||
svc, err = kd.kubeClient.Services(kapi.NamespaceDefault).Get(kubernetesSvcName)
|
||||
svc, err = kd.kubeClient.Core().Services(kapi.NamespaceDefault).Get(kubernetesSvcName)
|
||||
if err != nil || svc == nil {
|
||||
glog.Infof("Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.", name, err, servicePollInterval)
|
||||
time.Sleep(servicePollInterval)
|
||||
|
@ -153,10 +155,16 @@ func (kd *KubeDNS) GetCacheAsJSON() (string, error) {
|
|||
|
||||
func (kd *KubeDNS) setServicesStore() {
|
||||
// Returns a cache.ListWatch that gets all changes to services.
|
||||
serviceWatch := kcache.NewListWatchFromClient(kd.kubeClient, "services", kapi.NamespaceAll, kselector.Everything())
|
||||
kd.servicesStore, kd.serviceController = kframework.NewInformer(
|
||||
serviceWatch,
|
||||
&kapi.Service{},
|
||||
&kcache.ListWatch{
|
||||
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
|
||||
return kd.kubeClient.Core().Services(v1.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
|
||||
return kd.kubeClient.Core().Services(v1.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&v1.Service{},
|
||||
resyncPeriod,
|
||||
kframework.ResourceEventHandlerFuncs{
|
||||
AddFunc: kd.newService,
|
||||
|
@ -168,10 +176,16 @@ func (kd *KubeDNS) setServicesStore() {
|
|||
|
||||
func (kd *KubeDNS) setEndpointsStore() {
|
||||
// Returns a cache.ListWatch that gets all changes to endpoints.
|
||||
endpointsWatch := kcache.NewListWatchFromClient(kd.kubeClient, "endpoints", kapi.NamespaceAll, kselector.Everything())
|
||||
kd.endpointsStore, kd.endpointsController = kframework.NewInformer(
|
||||
endpointsWatch,
|
||||
&kapi.Endpoints{},
|
||||
&kcache.ListWatch{
|
||||
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
|
||||
return kd.kubeClient.Core().Endpoints(v1.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
|
||||
return kd.kubeClient.Core().Endpoints(v1.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&v1.Endpoints{},
|
||||
resyncPeriod,
|
||||
kframework.ResourceEventHandlerFuncs{
|
||||
AddFunc: kd.handleEndpointAdd,
|
||||
|
@ -546,12 +560,12 @@ func (kd *KubeDNS) federationRecords(queryPath []string) ([]skymsg.Service, erro
|
|||
// simpler approach here.
|
||||
// Also note that zone here means the zone in cloud provider terminology, not the DNS zone.
|
||||
func (kd *KubeDNS) getClusterZone() (string, error) {
|
||||
var node *kapi.Node
|
||||
var node *v1.Node
|
||||
|
||||
objs := kd.nodesStore.List()
|
||||
if len(objs) > 0 {
|
||||
var ok bool
|
||||
if node, ok = objs[0].(*kapi.Node); !ok {
|
||||
if node, ok = objs[0].(*v1.Node); !ok {
|
||||
return "", fmt.Errorf("expected node object, got: %T", objs[0])
|
||||
}
|
||||
} else {
|
||||
|
@ -559,7 +573,7 @@ func (kd *KubeDNS) getClusterZone() (string, error) {
|
|||
// wasteful in case of non-federated independent Kubernetes clusters. So carefully
|
||||
// proceeding here.
|
||||
// TODO(madhusudancs): Move this to external/v1 API.
|
||||
nodeList, err := kd.kubeClient.Nodes().List(kapi.ListOptions{})
|
||||
nodeList, err := kd.kubeClient.Core().Nodes().List(kapi.ListOptions{})
|
||||
if err != nil || len(nodeList.Items) == 0 {
|
||||
return "", fmt.Errorf("failed to retrieve the cluster nodes: %v", err)
|
||||
}
|
||||
|
@ -570,7 +584,7 @@ func (kd *KubeDNS) getClusterZone() (string, error) {
|
|||
}
|
||||
}
|
||||
|
||||
zone, ok := node.Annotations[kunversioned.LabelZoneFailureDomain]
|
||||
zone, ok := node.Annotations[unversioned.LabelZoneFailureDomain]
|
||||
if !ok || zone == "" {
|
||||
return "", fmt.Errorf("unknown cluster zone")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue