2014-11-11 21:15:59 +00:00
/ *
2015-05-01 16:19:44 +00:00
Copyright 2014 The Kubernetes Authors All rights reserved .
2014-11-11 21:15:59 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
// kube2sky is a bridge between Kubernetes and SkyDNS. It watches the
// Kubernetes master for changes in Services and manifests them into etcd for
// SkyDNS to serve as DNS records.
package main
import (
"encoding/json"
"flag"
"fmt"
2015-05-07 18:06:02 +00:00
"net/url"
2014-11-11 21:15:59 +00:00
"os"
2015-03-03 14:21:40 +00:00
"time"
2014-11-11 21:15:59 +00:00
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
2015-05-07 18:06:02 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
2015-04-23 23:42:10 +00:00
kclientcmd "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd"
kclientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api"
2015-05-07 18:06:02 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
kcontrollerFramework "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
kSelector "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
2015-03-04 23:41:17 +00:00
tools "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
2015-05-07 18:06:02 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
2015-05-04 22:45:33 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
2014-11-11 21:15:59 +00:00
etcd "github.com/coreos/go-etcd/etcd"
2015-05-07 18:06:02 +00:00
"github.com/golang/glog"
2014-11-11 21:15:59 +00:00
skymsg "github.com/skynetservices/skydns/msg"
)
var (
2015-05-07 18:06:02 +00:00
argDomain = flag . String ( "domain" , "kubernetes.local" , "domain under which to create names" )
argEtcdMutationTimeout = flag . Duration ( "etcd_mutation_timeout" , 10 * time . Second , "crash after retrying etcd mutation for a specified duration" )
argEtcdServer = flag . String ( "etcd-server" , "http://127.0.0.1:4001" , "URL to etcd server" )
argKubecfgFile = flag . String ( "kubecfg_file" , "" , "Location of kubecfg file for access to kubernetes service" )
argKubeMasterUrl = flag . String ( "kube_master_url" , "http://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT}" , "Url to reach kubernetes master. Env variables in this flag will be expanded." )
2014-11-11 21:15:59 +00:00
)
2015-05-07 18:06:02 +00:00
const (
// Maximum number of retries to connect to etcd server.
maxConnectRetries = 12
// Resync period for the kube controller loop.
resyncPeriod = 5 * time . Second
)
type kube2sky struct {
// Etcd client.
etcdClient * etcd . Client
// Kubernetes client.
kubeClient * kclient . Client
// DNS domain name.
domain string
// Etcd mutation timeout.
etcdMutationTimeout time . Duration
}
func ( ks * kube2sky ) removeDNS ( record string ) error {
glog . V ( 2 ) . Infof ( "Removing %s from DNS" , record )
_ , err := ks . etcdClient . Delete ( skymsg . Path ( record ) , true )
2014-11-11 21:15:59 +00:00
return err
}
2015-05-07 18:06:02 +00:00
func ( ks * kube2sky ) addDNS ( record string , service * kapi . Service ) error {
2015-03-16 21:36:30 +00:00
// if PortalIP is not set, a DNS entry should not be created
if ! kapi . IsServiceIPSet ( service ) {
2015-05-07 18:06:02 +00:00
glog . V ( 1 ) . Infof ( "Skipping dns record for headless service: %s\n" , service . Name )
2015-03-16 21:36:30 +00:00
return nil
}
2015-03-30 21:01:46 +00:00
for i := range service . Spec . Ports {
svc := skymsg . Service {
Host : service . Spec . PortalIP ,
Port : service . Spec . Ports [ i ] . Port ,
Priority : 10 ,
Weight : 10 ,
Ttl : 30 ,
}
b , err := json . Marshal ( svc )
if err != nil {
return err
}
// Set with no TTL, and hope that kubernetes events are accurate.
2014-11-11 21:15:59 +00:00
2015-05-07 18:06:02 +00:00
glog . V ( 2 ) . Infof ( "Setting DNS record: %v -> %s:%d\n" , record , service . Spec . PortalIP , service . Spec . Ports [ i ] . Port )
_ , err = ks . etcdClient . Set ( skymsg . Path ( record ) , string ( b ) , uint64 ( 0 ) )
2015-03-30 21:01:46 +00:00
if err != nil {
return err
}
}
return nil
2014-11-11 21:15:59 +00:00
}
2015-03-03 14:21:40 +00:00
// Implements retry logic for arbitrary mutator. Crashes after retrying for
// etcd_mutation_timeout.
2015-05-07 18:06:02 +00:00
func ( ks * kube2sky ) mutateEtcdOrDie ( mutator func ( ) error ) {
timeout := time . After ( ks . etcdMutationTimeout )
2015-03-03 14:21:40 +00:00
for {
select {
case <- timeout :
2015-05-07 18:06:02 +00:00
glog . Fatalf ( "Failed to mutate etcd for %v using mutator: %v" , ks . etcdMutationTimeout , mutator )
2015-03-03 14:21:40 +00:00
default :
if err := mutator ( ) ; err != nil {
delay := 50 * time . Millisecond
2015-05-07 18:06:02 +00:00
glog . V ( 1 ) . Infof ( "Failed to mutate etcd using mutator: %v due to: %v. Will retry in: %v" , mutator , err , delay )
2015-03-03 14:21:40 +00:00
time . Sleep ( delay )
} else {
return
}
}
}
}
2015-05-07 18:06:02 +00:00
func newEtcdClient ( etcdServer string ) ( * etcd . Client , error ) {
var (
client * etcd . Client
err error
)
retries := maxConnectRetries
for retries > 0 {
if _ , err = tools . GetEtcdVersion ( etcdServer ) ; err == nil {
break
}
if maxConnectRetries == 1 {
2015-03-04 23:41:17 +00:00
break
}
2015-05-07 18:06:02 +00:00
glog . Info ( "[Attempt: %d] Retrying request after 5 second sleep" , retries )
time . Sleep ( 5 * time . Second )
retries --
2015-03-04 23:41:17 +00:00
}
2015-05-07 18:06:02 +00:00
if err != nil {
return nil , fmt . Errorf ( "failed to connect to etcd server: %v, error: %v" , etcdServer , err )
}
glog . Infof ( "Etcd server found: %v" , etcdServer )
2015-05-02 07:13:52 +00:00
// loop until we have > 0 machines && machines[0] != ""
2015-05-04 22:45:33 +00:00
poll , timeout := 1 * time . Second , 10 * time . Second
if err := wait . Poll ( poll , timeout , func ( ) ( bool , error ) {
2015-05-07 18:06:02 +00:00
if client = etcd . NewClient ( [ ] string { etcdServer } ) ; client == nil {
return false , fmt . Errorf ( "etcd.NewClient returned nil" )
2015-05-02 07:13:52 +00:00
}
client . SyncCluster ( )
machines := client . GetCluster ( )
2015-05-04 22:45:33 +00:00
if len ( machines ) == 0 || len ( machines [ 0 ] ) == 0 {
return false , nil
2015-05-02 07:13:52 +00:00
}
2015-05-04 22:45:33 +00:00
return true , nil
} ) ; err != nil {
2015-05-07 18:06:02 +00:00
return nil , fmt . Errorf ( "Timed out after %s waiting for at least 1 synchronized etcd server in the cluster. Error: %v" , timeout , err )
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
return client , nil
}
func getKubeMasterUrl ( ) ( string , error ) {
if * argKubeMasterUrl == "" {
return "" , fmt . Errorf ( "no --kube_master_url specified" )
}
parsedUrl , err := url . Parse ( os . ExpandEnv ( * argKubeMasterUrl ) )
if err != nil {
return "" , fmt . Errorf ( "failed to parse --kube_master_url %s - %v" , * argKubeMasterUrl , err )
}
if parsedUrl . Scheme == "" || parsedUrl . Host == "" || parsedUrl . Host == ":" {
return "" , fmt . Errorf ( "invalid --kube_master_url specified %s" , * argKubeMasterUrl )
}
return parsedUrl . String ( ) , nil
2014-11-11 21:15:59 +00:00
}
// TODO: evaluate using pkg/client/clientcmd
func newKubeClient ( ) ( * kclient . Client , error ) {
2015-04-23 23:42:10 +00:00
var config * kclient . Config
2015-05-07 18:06:02 +00:00
masterUrl , err := getKubeMasterUrl ( )
if err != nil {
return nil , err
}
if * argKubecfgFile == "" {
2015-04-23 23:42:10 +00:00
config = & kclient . Config {
2015-05-07 18:06:02 +00:00
Host : masterUrl ,
Version : "v1beta3" ,
2015-04-23 23:42:10 +00:00
}
} else {
var err error
if config , err = kclientcmd . NewNonInteractiveDeferredLoadingClientConfig (
2015-05-07 18:06:02 +00:00
& kclientcmd . ClientConfigLoadingRules { ExplicitPath : * argKubecfgFile } ,
& kclientcmd . ConfigOverrides { ClusterInfo : kclientcmdapi . Cluster { Server : masterUrl } } ) . ClientConfig ( ) ; err != nil {
2015-04-23 23:42:10 +00:00
return nil , err
}
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
glog . Infof ( "Using %s for kubernetes master" , config . Host )
glog . Infof ( "Using kubernetes API %s" , config . Version )
2014-11-11 21:15:59 +00:00
return kclient . New ( config )
}
2015-05-07 18:06:02 +00:00
func ( ks * kube2sky ) buildNameString ( service , namespace , domain string ) string {
2014-11-11 21:15:59 +00:00
return fmt . Sprintf ( "%s.%s.%s." , service , namespace , domain )
}
2015-05-07 18:06:02 +00:00
// Returns a cache.ListWatch that gets all changes to services.
func ( ks * kube2sky ) createServiceLW ( ) * cache . ListWatch {
return cache . NewListWatchFromClient ( ks . kubeClient , "services" , kapi . NamespaceAll , kSelector . Everything ( ) )
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
func ( ks * kube2sky ) newService ( obj interface { } ) {
if s , ok := obj . ( * kapi . Service ) ; ok {
name := ks . buildNameString ( s . Name , s . Namespace , ks . domain )
ks . mutateEtcdOrDie ( func ( ) error { return ks . addDNS ( name , s ) } )
2014-11-11 21:15:59 +00:00
}
}
2015-05-07 18:06:02 +00:00
func ( ks * kube2sky ) removeService ( obj interface { } ) {
if s , ok := obj . ( * kapi . Service ) ; ok {
name := ks . buildNameString ( s . Name , s . Namespace , ks . domain )
ks . mutateEtcdOrDie ( func ( ) error { return ks . removeDNS ( name ) } )
}
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
func ( ks * kube2sky ) watchForServices ( ) {
var serviceController * kcontrollerFramework . Controller
_ , serviceController = framework . NewInformer (
ks . createServiceLW ( ) ,
& kapi . Service { } ,
resyncPeriod ,
framework . ResourceEventHandlerFuncs {
AddFunc : ks . newService ,
DeleteFunc : ks . removeService ,
UpdateFunc : func ( oldObj , newObj interface { } ) {
ks . newService ( newObj )
} ,
} ,
)
serviceController . Run ( util . NeverStop )
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
func main ( ) {
flag . Parse ( )
var err error
// TODO: Validate input flags.
ks := kube2sky {
domain : * argDomain ,
etcdMutationTimeout : * argEtcdMutationTimeout ,
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
if ks . etcdClient , err = newEtcdClient ( * argEtcdServer ) ; err != nil {
glog . Fatalf ( "Failed to create etcd client - %v" , err )
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
if ks . kubeClient , err = newKubeClient ( ) ; err != nil {
glog . Fatalf ( "Failed to create a kubernetes client: %v" , err )
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
ks . watchForServices ( )
2014-11-11 21:15:59 +00:00
}