2014-11-11 21:15:59 +00:00
/ *
2015-05-01 16:19:44 +00:00
Copyright 2014 The Kubernetes Authors All rights reserved .
2014-11-11 21:15:59 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
// kube2sky is a bridge between Kubernetes and SkyDNS. It watches the
// Kubernetes master for changes in Services and manifests them into etcd for
// SkyDNS to serve as DNS records.
package main
import (
"encoding/json"
"flag"
"fmt"
2015-05-18 17:03:30 +00:00
"net/http"
2015-05-07 18:06:02 +00:00
"net/url"
2014-11-11 21:15:59 +00:00
"os"
2015-05-18 17:03:30 +00:00
"strings"
"sync"
2015-03-03 14:21:40 +00:00
"time"
2014-11-11 21:15:59 +00:00
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
2015-05-18 17:03:30 +00:00
kcache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
2015-04-23 23:42:10 +00:00
kclientcmd "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd"
2015-05-18 17:03:30 +00:00
kframework "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
2015-05-07 18:06:02 +00:00
kSelector "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
2015-03-04 23:41:17 +00:00
tools "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
2015-05-07 18:06:02 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
2015-05-04 22:45:33 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
2014-11-11 21:15:59 +00:00
etcd "github.com/coreos/go-etcd/etcd"
2015-05-07 18:06:02 +00:00
"github.com/golang/glog"
2014-11-11 21:15:59 +00:00
skymsg "github.com/skynetservices/skydns/msg"
)
var (
2015-05-12 06:00:43 +00:00
argDomain = flag . String ( "domain" , "cluster.local" , "domain under which to create names" )
2015-05-07 18:06:02 +00:00
argEtcdMutationTimeout = flag . Duration ( "etcd_mutation_timeout" , 10 * time . Second , "crash after retrying etcd mutation for a specified duration" )
argEtcdServer = flag . String ( "etcd-server" , "http://127.0.0.1:4001" , "URL to etcd server" )
argKubecfgFile = flag . String ( "kubecfg_file" , "" , "Location of kubecfg file for access to kubernetes service" )
2015-05-28 18:07:09 +00:00
argKubeMasterURL = flag . String ( "kube_master_url" , "" , "URL to reach kubernetes master. Env variables in this flag will be expanded." )
2014-11-11 21:15:59 +00:00
)
2015-05-07 18:06:02 +00:00
const (
2015-05-12 21:35:34 +00:00
// Maximum number of attempts to connect to etcd server.
maxConnectAttempts = 12
2015-05-07 18:06:02 +00:00
// Resync period for the kube controller loop.
resyncPeriod = 5 * time . Second
2015-05-18 17:03:30 +00:00
// A subdomain added to the user specified domain for all services.
serviceSubdomain = "svc"
2015-05-07 18:06:02 +00:00
)
2015-05-12 21:35:34 +00:00
type etcdClient interface {
Set ( path , value string , ttl uint64 ) ( * etcd . Response , error )
2015-05-18 17:03:30 +00:00
RawGet ( key string , sort , recursive bool ) ( * etcd . RawResponse , error )
2015-05-12 21:35:34 +00:00
Delete ( path string , recursive bool ) ( * etcd . Response , error )
}
2015-05-18 17:03:30 +00:00
type nameNamespace struct {
name string
namespace string
}
2015-05-07 18:06:02 +00:00
type kube2sky struct {
// Etcd client.
2015-05-12 21:35:34 +00:00
etcdClient etcdClient
2015-05-07 18:06:02 +00:00
// DNS domain name.
domain string
// Etcd mutation timeout.
etcdMutationTimeout time . Duration
2015-05-18 17:03:30 +00:00
// A cache that contains all the endpoints in the system.
endpointsStore kcache . Store
// A cache that contains all the servicess in the system.
servicesStore kcache . Store
// Lock for controlling access to headless services.
mlock sync . Mutex
2015-05-07 18:06:02 +00:00
}
2015-05-18 17:03:30 +00:00
// Removes 'subdomain' from etcd.
func ( ks * kube2sky ) removeDNS ( subdomain string ) error {
glog . V ( 2 ) . Infof ( "Removing %s from DNS" , subdomain )
resp , err := ks . etcdClient . RawGet ( skymsg . Path ( subdomain ) , false , false )
if err != nil {
return err
}
if resp . StatusCode == http . StatusNotFound {
glog . V ( 2 ) . Infof ( "Subdomain %q does not exist in etcd" , subdomain )
return nil
}
_ , err = ks . etcdClient . Delete ( skymsg . Path ( subdomain ) , true )
2014-11-11 21:15:59 +00:00
return err
}
2015-05-18 17:03:30 +00:00
func ( ks * kube2sky ) writeSkyRecord ( subdomain string , data string ) error {
// Set with no TTL, and hope that kubernetes events are accurate.
_ , err := ks . etcdClient . Set ( skymsg . Path ( subdomain ) , data , uint64 ( 0 ) )
return err
}
// Generates skydns records for a headless service.
func ( ks * kube2sky ) newHeadlessService ( subdomain string , service * kapi . Service ) error {
// Create an A record for every pod in the service.
// This record must be periodically updated.
// Format is as follows:
// For a service x, with pods a and b create DNS records,
// a.x.ns.domain. and, b.x.ns.domain.
// TODO: Handle multi-port services.
ks . mlock . Lock ( )
defer ks . mlock . Unlock ( )
key , err := kcache . MetaNamespaceKeyFunc ( service )
if err != nil {
return err
}
e , exists , err := ks . endpointsStore . GetByKey ( key )
if err != nil {
return fmt . Errorf ( "failed to get endpoints object from endpoints store - %v" , err )
}
if ! exists {
glog . V ( 1 ) . Infof ( "could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up." , service . Name , service . Namespace )
2015-03-16 21:36:30 +00:00
return nil
}
2015-05-18 17:03:30 +00:00
if e , ok := e . ( * kapi . Endpoints ) ; ok {
return ks . generateRecordsForHeadlessService ( subdomain , e , service )
}
return nil
}
2015-03-16 21:36:30 +00:00
2015-05-18 17:03:30 +00:00
func getSkyMsg ( ip string , port int ) * skymsg . Service {
return & skymsg . Service {
Host : ip ,
Port : port ,
Priority : 10 ,
Weight : 10 ,
Ttl : 30 ,
}
}
func ( ks * kube2sky ) generateRecordsForHeadlessService ( subdomain string , e * kapi . Endpoints , svc * kapi . Service ) error {
for idx := range e . Subsets {
for subIdx := range e . Subsets [ idx ] . Addresses {
subdomain := buildDNSNameString ( subdomain , fmt . Sprintf ( "%d%d" , idx , subIdx ) )
b , err := json . Marshal ( getSkyMsg ( e . Subsets [ idx ] . Addresses [ subIdx ] . IP , svc . Spec . Ports [ 0 ] . Port ) )
if err != nil {
return err
}
glog . V ( 2 ) . Infof ( "Setting DNS record: %v -> %q\n" , subdomain , string ( b ) )
if err := ks . writeSkyRecord ( subdomain , string ( b ) ) ; err != nil {
return err
}
2015-03-30 21:01:46 +00:00
}
2015-05-18 17:03:30 +00:00
}
return nil
}
func ( ks * kube2sky ) getServiceFromEndpoints ( e * kapi . Endpoints ) ( * kapi . Service , error ) {
key , err := kcache . MetaNamespaceKeyFunc ( e )
if err != nil {
return nil , err
}
obj , exists , err := ks . servicesStore . GetByKey ( key )
if err != nil {
return nil , fmt . Errorf ( "failed to get service object from services store - %v" , err )
}
if ! exists {
glog . V ( 1 ) . Infof ( "could not find service for endpoint %q in namespace %q" , e . Name , e . Namespace )
return nil , nil
}
if svc , ok := obj . ( * kapi . Service ) ; ok {
return svc , nil
}
return nil , fmt . Errorf ( "got a non service object in services store %v" , obj )
}
func ( ks * kube2sky ) addDNSUsingEndpoints ( subdomain string , e * kapi . Endpoints ) error {
ks . mlock . Lock ( )
defer ks . mlock . Unlock ( )
svc , err := ks . getServiceFromEndpoints ( e )
if err != nil {
return err
}
if svc == nil || kapi . IsServiceIPSet ( svc ) {
// No headless service found corresponding to endpoints object.
return nil
}
// Remove existing DNS entry.
if err := ks . removeDNS ( subdomain ) ; err != nil {
return err
}
return ks . generateRecordsForHeadlessService ( subdomain , e , svc )
}
func ( ks * kube2sky ) handleEndpointAdd ( obj interface { } ) {
if e , ok := obj . ( * kapi . Endpoints ) ; ok {
name := buildDNSNameString ( ks . domain , e . Namespace , e . Name )
ks . mutateEtcdOrDie ( func ( ) error { return ks . addDNSUsingEndpoints ( name , e ) } )
name = buildDNSNameString ( ks . domain , serviceSubdomain , e . Namespace , e . Name )
ks . mutateEtcdOrDie ( func ( ) error { return ks . addDNSUsingEndpoints ( name , e ) } )
}
}
func ( ks * kube2sky ) generateRecordsForPortalService ( subdomain string , service * kapi . Service ) error {
for i := range service . Spec . Ports {
2015-05-23 20:41:11 +00:00
b , err := json . Marshal ( getSkyMsg ( service . Spec . ClusterIP , service . Spec . Ports [ i ] . Port ) )
2015-03-30 21:01:46 +00:00
if err != nil {
return err
}
2015-05-18 17:03:30 +00:00
glog . V ( 2 ) . Infof ( "Setting DNS record: %v -> %q\n" , subdomain , string ( b ) )
if err := ks . writeSkyRecord ( subdomain , string ( b ) ) ; err != nil {
2015-03-30 21:01:46 +00:00
return err
}
}
return nil
2014-11-11 21:15:59 +00:00
}
2015-05-18 17:03:30 +00:00
func ( ks * kube2sky ) addDNS ( subdomain string , service * kapi . Service ) error {
if len ( service . Spec . Ports ) == 0 {
glog . Fatalf ( "unexpected service with no ports: %v" , service )
}
2015-05-23 20:41:11 +00:00
// if ClusterIP is not set, a DNS entry should not be created
2015-05-18 17:03:30 +00:00
if ! kapi . IsServiceIPSet ( service ) {
return ks . newHeadlessService ( subdomain , service )
}
return ks . generateRecordsForPortalService ( subdomain , service )
}
2015-03-03 14:21:40 +00:00
// Implements retry logic for arbitrary mutator. Crashes after retrying for
// etcd_mutation_timeout.
2015-05-07 18:06:02 +00:00
func ( ks * kube2sky ) mutateEtcdOrDie ( mutator func ( ) error ) {
timeout := time . After ( ks . etcdMutationTimeout )
2015-03-03 14:21:40 +00:00
for {
select {
case <- timeout :
2015-05-07 18:06:02 +00:00
glog . Fatalf ( "Failed to mutate etcd for %v using mutator: %v" , ks . etcdMutationTimeout , mutator )
2015-03-03 14:21:40 +00:00
default :
if err := mutator ( ) ; err != nil {
delay := 50 * time . Millisecond
2015-05-07 18:06:02 +00:00
glog . V ( 1 ) . Infof ( "Failed to mutate etcd using mutator: %v due to: %v. Will retry in: %v" , mutator , err , delay )
2015-03-03 14:21:40 +00:00
time . Sleep ( delay )
} else {
return
}
}
}
}
2015-05-18 17:03:30 +00:00
func buildDNSNameString ( labels ... string ) string {
var res string
for _ , label := range labels {
if res == "" {
res = label
} else {
res = fmt . Sprintf ( "%s.%s" , label , res )
}
}
return res
}
// Returns a cache.ListWatch that gets all changes to services.
func createServiceLW ( kubeClient * kclient . Client ) * kcache . ListWatch {
return kcache . NewListWatchFromClient ( kubeClient , "services" , kapi . NamespaceAll , kSelector . Everything ( ) )
}
// Returns a cache.ListWatch that gets all changes to endpoints.
func createEndpointsLW ( kubeClient * kclient . Client ) * kcache . ListWatch {
return kcache . NewListWatchFromClient ( kubeClient , "endpoints" , kapi . NamespaceAll , kSelector . Everything ( ) )
}
func ( ks * kube2sky ) newService ( obj interface { } ) {
if s , ok := obj . ( * kapi . Service ) ; ok {
//TODO(artfulcoder) stop adding and deleting old-format string for service
name := buildDNSNameString ( ks . domain , s . Namespace , s . Name )
ks . mutateEtcdOrDie ( func ( ) error { return ks . addDNS ( name , s ) } )
name = buildDNSNameString ( ks . domain , serviceSubdomain , s . Namespace , s . Name )
ks . mutateEtcdOrDie ( func ( ) error { return ks . addDNS ( name , s ) } )
}
}
func ( ks * kube2sky ) removeService ( obj interface { } ) {
if s , ok := obj . ( * kapi . Service ) ; ok {
name := buildDNSNameString ( ks . domain , s . Namespace , s . Name )
ks . mutateEtcdOrDie ( func ( ) error { return ks . removeDNS ( name ) } )
name = buildDNSNameString ( ks . domain , serviceSubdomain , s . Namespace , s . Name )
ks . mutateEtcdOrDie ( func ( ) error { return ks . removeDNS ( name ) } )
}
}
2015-05-07 18:06:02 +00:00
func newEtcdClient ( etcdServer string ) ( * etcd . Client , error ) {
var (
client * etcd . Client
err error
)
2015-05-12 21:35:34 +00:00
for attempt := 1 ; attempt <= maxConnectAttempts ; attempt ++ {
2015-05-07 18:06:02 +00:00
if _ , err = tools . GetEtcdVersion ( etcdServer ) ; err == nil {
break
}
2015-05-12 21:35:34 +00:00
if attempt == maxConnectAttempts {
2015-03-04 23:41:17 +00:00
break
}
2015-05-12 21:35:34 +00:00
glog . Infof ( "[Attempt: %d] Attempting access to etcd after 5 second sleep" , attempt )
2015-05-07 18:06:02 +00:00
time . Sleep ( 5 * time . Second )
2015-03-04 23:41:17 +00:00
}
2015-05-07 18:06:02 +00:00
if err != nil {
return nil , fmt . Errorf ( "failed to connect to etcd server: %v, error: %v" , etcdServer , err )
}
glog . Infof ( "Etcd server found: %v" , etcdServer )
2015-05-02 07:13:52 +00:00
// loop until we have > 0 machines && machines[0] != ""
2015-05-04 22:45:33 +00:00
poll , timeout := 1 * time . Second , 10 * time . Second
if err := wait . Poll ( poll , timeout , func ( ) ( bool , error ) {
2015-05-07 18:06:02 +00:00
if client = etcd . NewClient ( [ ] string { etcdServer } ) ; client == nil {
return false , fmt . Errorf ( "etcd.NewClient returned nil" )
2015-05-02 07:13:52 +00:00
}
client . SyncCluster ( )
machines := client . GetCluster ( )
2015-05-04 22:45:33 +00:00
if len ( machines ) == 0 || len ( machines [ 0 ] ) == 0 {
return false , nil
2015-05-02 07:13:52 +00:00
}
2015-05-04 22:45:33 +00:00
return true , nil
} ) ; err != nil {
2015-05-07 18:06:02 +00:00
return nil , fmt . Errorf ( "Timed out after %s waiting for at least 1 synchronized etcd server in the cluster. Error: %v" , timeout , err )
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
return client , nil
}
2015-05-28 18:07:09 +00:00
func getKubeMasterURL ( ) ( string , error ) {
parsedURL , err := url . Parse ( os . ExpandEnv ( * argKubeMasterURL ) )
2015-05-07 18:06:02 +00:00
if err != nil {
2015-05-28 18:07:09 +00:00
return "" , fmt . Errorf ( "failed to parse --kube_master_url %s - %v" , * argKubeMasterURL , err )
2015-05-07 18:06:02 +00:00
}
2015-05-28 18:07:09 +00:00
if parsedURL . Scheme == "" || parsedURL . Host == "" || parsedURL . Host == ":" {
return "" , fmt . Errorf ( "invalid --kube_master_url specified %s" , * argKubeMasterURL )
2015-05-07 18:06:02 +00:00
}
2015-05-28 18:07:09 +00:00
return parsedURL . String ( ) , nil
2014-11-11 21:15:59 +00:00
}
// TODO: evaluate using pkg/client/clientcmd
func newKubeClient ( ) ( * kclient . Client , error ) {
2015-05-22 23:07:58 +00:00
var (
config * kclient . Config
err error
masterURL string
)
2015-05-28 18:07:09 +00:00
if * argKubeMasterURL != "" {
masterURL , err = getKubeMasterURL ( )
2015-05-22 23:07:58 +00:00
if err != nil {
return nil , err
}
2015-05-07 18:06:02 +00:00
}
if * argKubecfgFile == "" {
2015-05-22 23:07:58 +00:00
if masterURL == "" {
return nil , fmt . Errorf ( "--kube_master_url must be set when --kubecfg_file is not set" )
}
2015-04-23 23:42:10 +00:00
config = & kclient . Config {
2015-05-22 23:07:58 +00:00
Host : masterURL ,
2015-05-07 18:06:02 +00:00
Version : "v1beta3" ,
2015-04-23 23:42:10 +00:00
}
} else {
2015-05-22 23:07:58 +00:00
overrides := & kclientcmd . ConfigOverrides { }
if masterURL != "" {
overrides . ClusterInfo . Server = masterURL
}
2015-04-23 23:42:10 +00:00
if config , err = kclientcmd . NewNonInteractiveDeferredLoadingClientConfig (
2015-05-07 18:06:02 +00:00
& kclientcmd . ClientConfigLoadingRules { ExplicitPath : * argKubecfgFile } ,
2015-05-22 23:07:58 +00:00
overrides ) . ClientConfig ( ) ; err != nil {
2015-04-23 23:42:10 +00:00
return nil , err
}
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
glog . Infof ( "Using %s for kubernetes master" , config . Host )
glog . Infof ( "Using kubernetes API %s" , config . Version )
2014-11-11 21:15:59 +00:00
return kclient . New ( config )
}
2015-05-18 17:03:30 +00:00
func watchForServices ( kubeClient * kclient . Client , ks * kube2sky ) kcache . Store {
serviceStore , serviceController := kframework . NewInformer (
2015-05-12 21:35:34 +00:00
createServiceLW ( kubeClient ) ,
2015-05-07 18:06:02 +00:00
& kapi . Service { } ,
resyncPeriod ,
2015-05-18 17:03:30 +00:00
kframework . ResourceEventHandlerFuncs {
2015-05-07 18:06:02 +00:00
AddFunc : ks . newService ,
DeleteFunc : ks . removeService ,
UpdateFunc : func ( oldObj , newObj interface { } ) {
2015-05-18 17:03:30 +00:00
// TODO: Avoid unwanted updates.
2015-05-07 18:06:02 +00:00
ks . newService ( newObj )
} ,
} ,
)
2015-05-18 17:03:30 +00:00
go serviceController . Run ( util . NeverStop )
return serviceStore
}
func watchEndpoints ( kubeClient * kclient . Client , ks * kube2sky ) kcache . Store {
eStore , eController := kframework . NewInformer (
createEndpointsLW ( kubeClient ) ,
& kapi . Endpoints { } ,
resyncPeriod ,
kframework . ResourceEventHandlerFuncs {
AddFunc : ks . handleEndpointAdd ,
UpdateFunc : func ( oldObj , newObj interface { } ) {
// TODO: Avoid unwanted updates.
ks . handleEndpointAdd ( newObj )
} ,
} ,
)
go eController . Run ( util . NeverStop )
return eStore
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
func main ( ) {
flag . Parse ( )
var err error
// TODO: Validate input flags.
2015-05-18 17:03:30 +00:00
domain := * argDomain
if ! strings . HasSuffix ( domain , "." ) {
domain = fmt . Sprintf ( "%s." , domain )
}
2015-05-07 18:06:02 +00:00
ks := kube2sky {
2015-05-18 17:03:30 +00:00
domain : domain ,
2015-05-07 18:06:02 +00:00
etcdMutationTimeout : * argEtcdMutationTimeout ,
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
if ks . etcdClient , err = newEtcdClient ( * argEtcdServer ) ; err != nil {
glog . Fatalf ( "Failed to create etcd client - %v" , err )
2014-11-11 21:15:59 +00:00
}
2015-05-12 21:35:34 +00:00
kubeClient , err := newKubeClient ( )
if err != nil {
2015-05-07 18:06:02 +00:00
glog . Fatalf ( "Failed to create a kubernetes client: %v" , err )
2014-11-11 21:15:59 +00:00
}
2015-05-18 17:03:30 +00:00
ks . endpointsStore = watchEndpoints ( kubeClient , & ks )
ks . servicesStore = watchForServices ( kubeClient , & ks )
select { }
2014-11-11 21:15:59 +00:00
}