2014-11-11 21:15:59 +00:00
/ *
2015-05-01 16:19:44 +00:00
Copyright 2014 The Kubernetes Authors All rights reserved .
2014-11-11 21:15:59 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
// kube2sky is a bridge between Kubernetes and SkyDNS. It watches the
// Kubernetes master for changes in Services and manifests them into etcd for
// SkyDNS to serve as DNS records.
package main
import (
"encoding/json"
"flag"
"fmt"
2015-05-28 22:28:17 +00:00
"hash/fnv"
2015-05-18 17:03:30 +00:00
"net/http"
2015-05-07 18:06:02 +00:00
"net/url"
2014-11-11 21:15:59 +00:00
"os"
2015-05-18 17:03:30 +00:00
"strings"
"sync"
2015-03-03 14:21:40 +00:00
"time"
2014-11-11 21:15:59 +00:00
2015-08-05 22:05:17 +00:00
etcd "github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
skymsg "github.com/skynetservices/skydns/msg"
2015-08-05 22:03:47 +00:00
kapi "k8s.io/kubernetes/pkg/api"
2015-08-13 19:01:50 +00:00
kclient "k8s.io/kubernetes/pkg/client/unversioned"
kcache "k8s.io/kubernetes/pkg/client/unversioned/cache"
kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
2015-08-05 22:03:47 +00:00
kframework "k8s.io/kubernetes/pkg/controller/framework"
kSelector "k8s.io/kubernetes/pkg/fields"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
2014-11-11 21:15:59 +00:00
)
var (
2015-06-20 03:59:58 +00:00
// TODO: switch to pflag and make - and _ equivalent.
2015-05-12 06:00:43 +00:00
argDomain = flag . String ( "domain" , "cluster.local" , "domain under which to create names" )
2015-05-07 18:06:02 +00:00
argEtcdMutationTimeout = flag . Duration ( "etcd_mutation_timeout" , 10 * time . Second , "crash after retrying etcd mutation for a specified duration" )
argEtcdServer = flag . String ( "etcd-server" , "http://127.0.0.1:4001" , "URL to etcd server" )
2015-06-20 03:59:58 +00:00
argKubecfgFile = flag . String ( "kubecfg_file" , "" , "Location of kubecfg file for access to kubernetes master service; --kube_master_url overrides the URL part of this; if neither this nor --kube_master_url are provided, defaults to service account tokens" )
2015-05-28 18:07:09 +00:00
argKubeMasterURL = flag . String ( "kube_master_url" , "" , "URL to reach kubernetes master. Env variables in this flag will be expanded." )
2014-11-11 21:15:59 +00:00
)
2015-05-07 18:06:02 +00:00
const (
2015-05-12 21:35:34 +00:00
// Maximum number of attempts to connect to etcd server.
maxConnectAttempts = 12
2015-05-07 18:06:02 +00:00
// Resync period for the kube controller loop.
2015-05-28 22:28:17 +00:00
resyncPeriod = 30 * time . Minute
2015-05-18 17:03:30 +00:00
// A subdomain added to the user specified domain for all services.
serviceSubdomain = "svc"
2015-05-07 18:06:02 +00:00
)
2015-05-12 21:35:34 +00:00
type etcdClient interface {
Set ( path , value string , ttl uint64 ) ( * etcd . Response , error )
2015-05-18 17:03:30 +00:00
RawGet ( key string , sort , recursive bool ) ( * etcd . RawResponse , error )
2015-05-12 21:35:34 +00:00
Delete ( path string , recursive bool ) ( * etcd . Response , error )
}
2015-05-18 17:03:30 +00:00
type nameNamespace struct {
name string
namespace string
}
2015-05-07 18:06:02 +00:00
type kube2sky struct {
// Etcd client.
2015-05-12 21:35:34 +00:00
etcdClient etcdClient
2015-05-07 18:06:02 +00:00
// DNS domain name.
domain string
// Etcd mutation timeout.
etcdMutationTimeout time . Duration
2015-05-18 17:03:30 +00:00
// A cache that contains all the endpoints in the system.
endpointsStore kcache . Store
// A cache that contains all the servicess in the system.
servicesStore kcache . Store
// Lock for controlling access to headless services.
mlock sync . Mutex
2015-05-07 18:06:02 +00:00
}
2015-05-18 17:03:30 +00:00
// Removes 'subdomain' from etcd.
func ( ks * kube2sky ) removeDNS ( subdomain string ) error {
glog . V ( 2 ) . Infof ( "Removing %s from DNS" , subdomain )
2015-05-28 22:28:17 +00:00
resp , err := ks . etcdClient . RawGet ( skymsg . Path ( subdomain ) , false , true )
2015-05-18 17:03:30 +00:00
if err != nil {
return err
}
if resp . StatusCode == http . StatusNotFound {
glog . V ( 2 ) . Infof ( "Subdomain %q does not exist in etcd" , subdomain )
return nil
}
_ , err = ks . etcdClient . Delete ( skymsg . Path ( subdomain ) , true )
2014-11-11 21:15:59 +00:00
return err
}
2015-05-18 17:03:30 +00:00
func ( ks * kube2sky ) writeSkyRecord ( subdomain string , data string ) error {
// Set with no TTL, and hope that kubernetes events are accurate.
_ , err := ks . etcdClient . Set ( skymsg . Path ( subdomain ) , data , uint64 ( 0 ) )
return err
}
// Generates skydns records for a headless service.
2015-07-22 18:58:11 +00:00
func ( ks * kube2sky ) newHeadlessService ( subdomain string , service * kapi . Service ) error {
2015-05-18 17:03:30 +00:00
// Create an A record for every pod in the service.
// This record must be periodically updated.
// Format is as follows:
// For a service x, with pods a and b create DNS records,
// a.x.ns.domain. and, b.x.ns.domain.
ks . mlock . Lock ( )
defer ks . mlock . Unlock ( )
key , err := kcache . MetaNamespaceKeyFunc ( service )
if err != nil {
return err
}
e , exists , err := ks . endpointsStore . GetByKey ( key )
if err != nil {
return fmt . Errorf ( "failed to get endpoints object from endpoints store - %v" , err )
}
if ! exists {
glog . V ( 1 ) . Infof ( "could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up." , service . Name , service . Namespace )
2015-03-16 21:36:30 +00:00
return nil
}
2015-05-18 17:03:30 +00:00
if e , ok := e . ( * kapi . Endpoints ) ; ok {
2015-07-22 18:58:11 +00:00
return ks . generateRecordsForHeadlessService ( subdomain , e , service )
2015-05-18 17:03:30 +00:00
}
return nil
}
2015-03-16 21:36:30 +00:00
2015-05-18 17:03:30 +00:00
func getSkyMsg ( ip string , port int ) * skymsg . Service {
return & skymsg . Service {
Host : ip ,
Port : port ,
Priority : 10 ,
Weight : 10 ,
Ttl : 30 ,
}
}
2015-07-22 18:58:11 +00:00
func ( ks * kube2sky ) generateRecordsForHeadlessService ( subdomain string , e * kapi . Endpoints , svc * kapi . Service ) error {
2015-05-18 17:03:30 +00:00
for idx := range e . Subsets {
for subIdx := range e . Subsets [ idx ] . Addresses {
2015-05-28 22:28:17 +00:00
b , err := json . Marshal ( getSkyMsg ( e . Subsets [ idx ] . Addresses [ subIdx ] . IP , 0 ) )
2015-05-18 17:03:30 +00:00
if err != nil {
return err
}
2015-05-28 22:28:17 +00:00
recordValue := string ( b )
recordLabel := getHash ( recordValue )
recordKey := buildDNSNameString ( subdomain , recordLabel )
glog . V ( 2 ) . Infof ( "Setting DNS record: %v -> %q\n" , recordKey , recordValue )
if err := ks . writeSkyRecord ( recordKey , recordValue ) ; err != nil {
2015-05-18 17:03:30 +00:00
return err
}
2015-07-22 18:58:11 +00:00
for portIdx := range e . Subsets [ idx ] . Ports {
endpointPort := & e . Subsets [ idx ] . Ports [ portIdx ]
portSegment := buildPortSegmentString ( endpointPort . Name , endpointPort . Protocol )
if portSegment != "" {
err := ks . generateSRVRecord ( subdomain , portSegment , recordLabel , recordKey , endpointPort . Port )
if err != nil {
return err
2015-05-28 22:28:17 +00:00
}
}
}
2015-03-30 21:01:46 +00:00
}
2015-05-18 17:03:30 +00:00
}
return nil
}
func ( ks * kube2sky ) getServiceFromEndpoints ( e * kapi . Endpoints ) ( * kapi . Service , error ) {
key , err := kcache . MetaNamespaceKeyFunc ( e )
if err != nil {
return nil , err
}
obj , exists , err := ks . servicesStore . GetByKey ( key )
if err != nil {
return nil , fmt . Errorf ( "failed to get service object from services store - %v" , err )
}
if ! exists {
glog . V ( 1 ) . Infof ( "could not find service for endpoint %q in namespace %q" , e . Name , e . Namespace )
return nil , nil
}
if svc , ok := obj . ( * kapi . Service ) ; ok {
return svc , nil
}
return nil , fmt . Errorf ( "got a non service object in services store %v" , obj )
}
2015-07-22 18:58:11 +00:00
func ( ks * kube2sky ) addDNSUsingEndpoints ( subdomain string , e * kapi . Endpoints ) error {
2015-05-18 17:03:30 +00:00
ks . mlock . Lock ( )
defer ks . mlock . Unlock ( )
svc , err := ks . getServiceFromEndpoints ( e )
if err != nil {
return err
}
if svc == nil || kapi . IsServiceIPSet ( svc ) {
// No headless service found corresponding to endpoints object.
return nil
}
// Remove existing DNS entry.
if err := ks . removeDNS ( subdomain ) ; err != nil {
return err
}
2015-07-22 18:58:11 +00:00
return ks . generateRecordsForHeadlessService ( subdomain , e , svc )
2015-05-18 17:03:30 +00:00
}
func ( ks * kube2sky ) handleEndpointAdd ( obj interface { } ) {
if e , ok := obj . ( * kapi . Endpoints ) ; ok {
2015-07-22 18:58:11 +00:00
name := buildDNSNameString ( ks . domain , serviceSubdomain , e . Namespace , e . Name )
ks . mutateEtcdOrDie ( func ( ) error { return ks . addDNSUsingEndpoints ( name , e ) } )
2015-05-18 17:03:30 +00:00
}
}
2015-07-22 18:58:11 +00:00
func ( ks * kube2sky ) generateRecordsForPortalService ( subdomain string , service * kapi . Service ) error {
2015-05-28 22:28:17 +00:00
b , err := json . Marshal ( getSkyMsg ( service . Spec . ClusterIP , 0 ) )
if err != nil {
return err
}
recordValue := string ( b )
2015-07-22 18:58:11 +00:00
recordLabel := getHash ( recordValue )
recordKey := buildDNSNameString ( subdomain , recordLabel )
2015-05-28 22:28:17 +00:00
glog . V ( 2 ) . Infof ( "Setting DNS record: %v -> %q, with recordKey: %v\n" , subdomain , recordValue , recordKey )
if err := ks . writeSkyRecord ( recordKey , recordValue ) ; err != nil {
return err
}
// Generate SRV Records
for i := range service . Spec . Ports {
port := & service . Spec . Ports [ i ]
portSegment := buildPortSegmentString ( port . Name , port . Protocol )
if portSegment != "" {
err = ks . generateSRVRecord ( subdomain , portSegment , recordLabel , subdomain , port . Port )
if err != nil {
return err
}
2015-03-30 21:01:46 +00:00
}
}
return nil
2014-11-11 21:15:59 +00:00
}
2015-05-28 22:28:17 +00:00
func buildPortSegmentString ( portName string , portProtocol kapi . Protocol ) string {
if portName == "" {
// we don't create a random name
return ""
}
if portProtocol == "" {
glog . Errorf ( "Port Protocol not set. port segment string cannot be created." )
return ""
}
return fmt . Sprintf ( "_%s._%s" , portName , strings . ToLower ( string ( portProtocol ) ) )
}
func ( ks * kube2sky ) generateSRVRecord ( subdomain , portSegment , recordName , cName string , portNumber int ) error {
recordKey := buildDNSNameString ( subdomain , portSegment , recordName )
srv_rec , err := json . Marshal ( getSkyMsg ( cName , portNumber ) )
if err != nil {
return err
}
if err := ks . writeSkyRecord ( recordKey , string ( srv_rec ) ) ; err != nil {
return err
}
return nil
}
2015-07-22 18:58:11 +00:00
func ( ks * kube2sky ) addDNS ( subdomain string , service * kapi . Service ) error {
2015-05-18 17:03:30 +00:00
if len ( service . Spec . Ports ) == 0 {
glog . Fatalf ( "unexpected service with no ports: %v" , service )
}
2015-05-23 20:41:11 +00:00
// if ClusterIP is not set, a DNS entry should not be created
2015-05-18 17:03:30 +00:00
if ! kapi . IsServiceIPSet ( service ) {
2015-07-22 18:58:11 +00:00
return ks . newHeadlessService ( subdomain , service )
2015-05-18 17:03:30 +00:00
}
2015-07-22 18:58:11 +00:00
return ks . generateRecordsForPortalService ( subdomain , service )
2015-05-18 17:03:30 +00:00
}
2015-03-03 14:21:40 +00:00
// Implements retry logic for arbitrary mutator. Crashes after retrying for
// etcd_mutation_timeout.
2015-05-07 18:06:02 +00:00
func ( ks * kube2sky ) mutateEtcdOrDie ( mutator func ( ) error ) {
timeout := time . After ( ks . etcdMutationTimeout )
2015-03-03 14:21:40 +00:00
for {
select {
case <- timeout :
2015-05-07 18:06:02 +00:00
glog . Fatalf ( "Failed to mutate etcd for %v using mutator: %v" , ks . etcdMutationTimeout , mutator )
2015-03-03 14:21:40 +00:00
default :
if err := mutator ( ) ; err != nil {
delay := 50 * time . Millisecond
2015-05-07 18:06:02 +00:00
glog . V ( 1 ) . Infof ( "Failed to mutate etcd using mutator: %v due to: %v. Will retry in: %v" , mutator , err , delay )
2015-03-03 14:21:40 +00:00
time . Sleep ( delay )
} else {
return
}
}
}
}
2015-05-18 17:03:30 +00:00
func buildDNSNameString ( labels ... string ) string {
var res string
for _ , label := range labels {
if res == "" {
res = label
} else {
res = fmt . Sprintf ( "%s.%s" , label , res )
}
}
return res
}
// Returns a cache.ListWatch that gets all changes to services.
func createServiceLW ( kubeClient * kclient . Client ) * kcache . ListWatch {
return kcache . NewListWatchFromClient ( kubeClient , "services" , kapi . NamespaceAll , kSelector . Everything ( ) )
}
// Returns a cache.ListWatch that gets all changes to endpoints.
func createEndpointsLW ( kubeClient * kclient . Client ) * kcache . ListWatch {
return kcache . NewListWatchFromClient ( kubeClient , "endpoints" , kapi . NamespaceAll , kSelector . Everything ( ) )
}
func ( ks * kube2sky ) newService ( obj interface { } ) {
if s , ok := obj . ( * kapi . Service ) ; ok {
2015-07-22 18:58:11 +00:00
name := buildDNSNameString ( ks . domain , serviceSubdomain , s . Namespace , s . Name )
ks . mutateEtcdOrDie ( func ( ) error { return ks . addDNS ( name , s ) } )
2015-05-18 17:03:30 +00:00
}
}
func ( ks * kube2sky ) removeService ( obj interface { } ) {
if s , ok := obj . ( * kapi . Service ) ; ok {
2015-07-22 18:58:11 +00:00
name := buildDNSNameString ( ks . domain , serviceSubdomain , s . Namespace , s . Name )
2015-05-18 17:03:30 +00:00
ks . mutateEtcdOrDie ( func ( ) error { return ks . removeDNS ( name ) } )
}
}
2015-05-28 22:28:17 +00:00
func ( ks * kube2sky ) updateService ( oldObj , newObj interface { } ) {
// TODO: Avoid unwanted updates.
ks . removeService ( oldObj )
ks . newService ( newObj )
}
2015-05-07 18:06:02 +00:00
func newEtcdClient ( etcdServer string ) ( * etcd . Client , error ) {
var (
client * etcd . Client
err error
)
2015-05-12 21:35:34 +00:00
for attempt := 1 ; attempt <= maxConnectAttempts ; attempt ++ {
2015-07-30 11:27:18 +00:00
if _ , err = etcdstorage . GetEtcdVersion ( etcdServer ) ; err == nil {
2015-05-07 18:06:02 +00:00
break
}
2015-05-12 21:35:34 +00:00
if attempt == maxConnectAttempts {
2015-03-04 23:41:17 +00:00
break
}
2015-05-12 21:35:34 +00:00
glog . Infof ( "[Attempt: %d] Attempting access to etcd after 5 second sleep" , attempt )
2015-05-07 18:06:02 +00:00
time . Sleep ( 5 * time . Second )
2015-03-04 23:41:17 +00:00
}
2015-05-07 18:06:02 +00:00
if err != nil {
return nil , fmt . Errorf ( "failed to connect to etcd server: %v, error: %v" , etcdServer , err )
}
glog . Infof ( "Etcd server found: %v" , etcdServer )
2015-05-02 07:13:52 +00:00
// loop until we have > 0 machines && machines[0] != ""
2015-05-04 22:45:33 +00:00
poll , timeout := 1 * time . Second , 10 * time . Second
if err := wait . Poll ( poll , timeout , func ( ) ( bool , error ) {
2015-05-07 18:06:02 +00:00
if client = etcd . NewClient ( [ ] string { etcdServer } ) ; client == nil {
return false , fmt . Errorf ( "etcd.NewClient returned nil" )
2015-05-02 07:13:52 +00:00
}
client . SyncCluster ( )
machines := client . GetCluster ( )
2015-05-04 22:45:33 +00:00
if len ( machines ) == 0 || len ( machines [ 0 ] ) == 0 {
return false , nil
2015-05-02 07:13:52 +00:00
}
2015-05-04 22:45:33 +00:00
return true , nil
} ) ; err != nil {
2015-05-07 18:06:02 +00:00
return nil , fmt . Errorf ( "Timed out after %s waiting for at least 1 synchronized etcd server in the cluster. Error: %v" , timeout , err )
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
return client , nil
}
2015-06-20 03:59:58 +00:00
func expandKubeMasterURL ( ) ( string , error ) {
2015-05-28 18:07:09 +00:00
parsedURL , err := url . Parse ( os . ExpandEnv ( * argKubeMasterURL ) )
2015-05-07 18:06:02 +00:00
if err != nil {
2015-05-28 18:07:09 +00:00
return "" , fmt . Errorf ( "failed to parse --kube_master_url %s - %v" , * argKubeMasterURL , err )
2015-05-07 18:06:02 +00:00
}
2015-05-28 18:07:09 +00:00
if parsedURL . Scheme == "" || parsedURL . Host == "" || parsedURL . Host == ":" {
return "" , fmt . Errorf ( "invalid --kube_master_url specified %s" , * argKubeMasterURL )
2015-05-07 18:06:02 +00:00
}
2015-05-28 18:07:09 +00:00
return parsedURL . String ( ) , nil
2014-11-11 21:15:59 +00:00
}
// TODO: evaluate using pkg/client/clientcmd
func newKubeClient ( ) ( * kclient . Client , error ) {
2015-05-22 23:07:58 +00:00
var (
config * kclient . Config
err error
masterURL string
)
2015-06-20 03:59:58 +00:00
// If the user specified --kube_master_url, expand env vars and verify it.
2015-05-28 18:07:09 +00:00
if * argKubeMasterURL != "" {
2015-06-20 03:59:58 +00:00
masterURL , err = expandKubeMasterURL ( )
2015-05-22 23:07:58 +00:00
if err != nil {
return nil , err
}
2015-05-07 18:06:02 +00:00
}
2015-06-20 03:59:58 +00:00
if masterURL != "" && * argKubecfgFile == "" {
// Only --kube_master_url was provided.
2015-04-23 23:42:10 +00:00
config = & kclient . Config {
2015-05-22 23:07:58 +00:00
Host : masterURL ,
2015-06-20 03:59:58 +00:00
Version : "v1" ,
2015-04-23 23:42:10 +00:00
}
} else {
2015-06-20 03:59:58 +00:00
// We either have:
// 1) --kube_master_url and --kubecfg_file
// 2) just --kubecfg_file
// 3) neither flag
// In any case, the logic is the same. If (3), this will automatically
// fall back on the service account token.
2015-05-22 23:07:58 +00:00
overrides := & kclientcmd . ConfigOverrides { }
2015-06-20 03:59:58 +00:00
overrides . ClusterInfo . Server = masterURL // might be "", but that is OK
rules := & kclientcmd . ClientConfigLoadingRules { ExplicitPath : * argKubecfgFile } // might be "", but that is OK
if config , err = kclientcmd . NewNonInteractiveDeferredLoadingClientConfig ( rules , overrides ) . ClientConfig ( ) ; err != nil {
2015-04-23 23:42:10 +00:00
return nil , err
}
2014-11-11 21:15:59 +00:00
}
2015-06-20 03:59:58 +00:00
2015-05-07 18:06:02 +00:00
glog . Infof ( "Using %s for kubernetes master" , config . Host )
glog . Infof ( "Using kubernetes API %s" , config . Version )
2014-11-11 21:15:59 +00:00
return kclient . New ( config )
}
2015-05-18 17:03:30 +00:00
func watchForServices ( kubeClient * kclient . Client , ks * kube2sky ) kcache . Store {
serviceStore , serviceController := kframework . NewInformer (
2015-05-12 21:35:34 +00:00
createServiceLW ( kubeClient ) ,
2015-05-07 18:06:02 +00:00
& kapi . Service { } ,
resyncPeriod ,
2015-05-18 17:03:30 +00:00
kframework . ResourceEventHandlerFuncs {
2015-05-07 18:06:02 +00:00
AddFunc : ks . newService ,
DeleteFunc : ks . removeService ,
2015-05-28 22:28:17 +00:00
UpdateFunc : ks . updateService ,
2015-05-07 18:06:02 +00:00
} ,
)
2015-05-18 17:03:30 +00:00
go serviceController . Run ( util . NeverStop )
return serviceStore
}
func watchEndpoints ( kubeClient * kclient . Client , ks * kube2sky ) kcache . Store {
eStore , eController := kframework . NewInformer (
createEndpointsLW ( kubeClient ) ,
& kapi . Endpoints { } ,
resyncPeriod ,
kframework . ResourceEventHandlerFuncs {
AddFunc : ks . handleEndpointAdd ,
UpdateFunc : func ( oldObj , newObj interface { } ) {
// TODO: Avoid unwanted updates.
ks . handleEndpointAdd ( newObj )
} ,
} ,
)
go eController . Run ( util . NeverStop )
return eStore
2014-11-11 21:15:59 +00:00
}
2015-05-28 22:28:17 +00:00
func getHash ( text string ) string {
h := fnv . New32a ( )
h . Write ( [ ] byte ( text ) )
return fmt . Sprintf ( "%x" , h . Sum32 ( ) )
}
2015-05-07 18:06:02 +00:00
func main ( ) {
flag . Parse ( )
var err error
// TODO: Validate input flags.
2015-05-18 17:03:30 +00:00
domain := * argDomain
if ! strings . HasSuffix ( domain , "." ) {
domain = fmt . Sprintf ( "%s." , domain )
}
2015-05-07 18:06:02 +00:00
ks := kube2sky {
2015-05-18 17:03:30 +00:00
domain : domain ,
2015-05-07 18:06:02 +00:00
etcdMutationTimeout : * argEtcdMutationTimeout ,
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
if ks . etcdClient , err = newEtcdClient ( * argEtcdServer ) ; err != nil {
glog . Fatalf ( "Failed to create etcd client - %v" , err )
2014-11-11 21:15:59 +00:00
}
2015-05-12 21:35:34 +00:00
kubeClient , err := newKubeClient ( )
if err != nil {
2015-05-07 18:06:02 +00:00
glog . Fatalf ( "Failed to create a kubernetes client: %v" , err )
2014-11-11 21:15:59 +00:00
}
2015-05-18 17:03:30 +00:00
ks . endpointsStore = watchEndpoints ( kubeClient , & ks )
ks . servicesStore = watchForServices ( kubeClient , & ks )
select { }
2014-11-11 21:15:59 +00:00
}