2014-11-11 21:15:59 +00:00
/ *
2015-05-01 16:19:44 +00:00
Copyright 2014 The Kubernetes Authors All rights reserved .
2014-11-11 21:15:59 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
// kube2sky is a bridge between Kubernetes and SkyDNS. It watches the
// Kubernetes master for changes in Services and manifests them into etcd for
// SkyDNS to serve as DNS records.
package main
import (
"encoding/json"
"fmt"
2015-05-28 22:28:17 +00:00
"hash/fnv"
2015-05-18 17:03:30 +00:00
"net/http"
2015-05-07 18:06:02 +00:00
"net/url"
2014-11-11 21:15:59 +00:00
"os"
2016-03-01 03:40:15 +00:00
"os/signal"
2015-05-18 17:03:30 +00:00
"strings"
"sync"
2016-03-01 03:40:15 +00:00
"syscall"
2015-03-03 14:21:40 +00:00
"time"
2014-11-11 21:15:59 +00:00
2015-08-05 22:05:17 +00:00
etcd "github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
skymsg "github.com/skynetservices/skydns/msg"
2015-11-30 00:06:59 +00:00
flag "github.com/spf13/pflag"
2015-08-05 22:03:47 +00:00
kapi "k8s.io/kubernetes/pkg/api"
2016-02-02 18:59:54 +00:00
"k8s.io/kubernetes/pkg/api/endpoints"
2015-11-13 21:20:54 +00:00
"k8s.io/kubernetes/pkg/api/unversioned"
2015-09-03 21:40:58 +00:00
kcache "k8s.io/kubernetes/pkg/client/cache"
2016-02-12 18:58:43 +00:00
"k8s.io/kubernetes/pkg/client/restclient"
2015-09-03 21:43:19 +00:00
kclient "k8s.io/kubernetes/pkg/client/unversioned"
2015-08-13 19:01:50 +00:00
kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
2015-08-05 22:03:47 +00:00
kframework "k8s.io/kubernetes/pkg/controller/framework"
2015-10-16 08:09:09 +00:00
kselector "k8s.io/kubernetes/pkg/fields"
2015-11-23 19:32:50 +00:00
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/util"
2016-02-02 18:59:54 +00:00
"k8s.io/kubernetes/pkg/util/validation"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/util/wait"
2014-11-11 21:15:59 +00:00
)
2016-03-01 03:40:15 +00:00
// The name of the "master" Kubernetes Service.
const kubernetesSvcName = "kubernetes"
2014-11-11 21:15:59 +00:00
var (
2015-05-12 06:00:43 +00:00
argDomain = flag . String ( "domain" , "cluster.local" , "domain under which to create names" )
2015-11-30 00:06:59 +00:00
argEtcdMutationTimeout = flag . Duration ( "etcd-mutation-timeout" , 10 * time . Second , "crash after retrying etcd mutation for a specified duration" )
2015-05-07 18:06:02 +00:00
argEtcdServer = flag . String ( "etcd-server" , "http://127.0.0.1:4001" , "URL to etcd server" )
2015-11-30 00:06:59 +00:00
argKubecfgFile = flag . String ( "kubecfg-file" , "" , "Location of kubecfg file for access to kubernetes master service; --kube-master-url overrides the URL part of this; if neither this nor --kube-master-url are provided, defaults to service account tokens" )
argKubeMasterURL = flag . String ( "kube-master-url" , "" , "URL to reach kubernetes master. Env variables in this flag will be expanded." )
2016-03-01 03:40:15 +00:00
healthzPort = flag . Int ( "healthz-port" , 8081 , "port on which to serve a kube2sky HTTP readiness probe." )
2014-11-11 21:15:59 +00:00
)
2015-05-07 18:06:02 +00:00
const (
2015-05-12 21:35:34 +00:00
// Maximum number of attempts to connect to etcd server.
maxConnectAttempts = 12
2015-05-07 18:06:02 +00:00
// Resync period for the kube controller loop.
2015-05-28 22:28:17 +00:00
resyncPeriod = 30 * time . Minute
2015-05-18 17:03:30 +00:00
// A subdomain added to the user specified domain for all services.
serviceSubdomain = "svc"
2015-09-04 14:30:21 +00:00
// A subdomain added to the user specified dmoain for all pods.
podSubdomain = "pod"
2015-05-07 18:06:02 +00:00
)
2015-05-12 21:35:34 +00:00
type etcdClient interface {
Set ( path , value string , ttl uint64 ) ( * etcd . Response , error )
2015-05-18 17:03:30 +00:00
RawGet ( key string , sort , recursive bool ) ( * etcd . RawResponse , error )
2015-05-12 21:35:34 +00:00
Delete ( path string , recursive bool ) ( * etcd . Response , error )
}
2015-05-18 17:03:30 +00:00
type nameNamespace struct {
name string
namespace string
}
2015-05-07 18:06:02 +00:00
type kube2sky struct {
// Etcd client.
2015-05-12 21:35:34 +00:00
etcdClient etcdClient
2015-05-07 18:06:02 +00:00
// DNS domain name.
domain string
// Etcd mutation timeout.
etcdMutationTimeout time . Duration
2015-05-18 17:03:30 +00:00
// A cache that contains all the endpoints in the system.
endpointsStore kcache . Store
2015-11-30 16:21:44 +00:00
// A cache that contains all the services in the system.
2015-05-18 17:03:30 +00:00
servicesStore kcache . Store
2015-11-30 16:21:44 +00:00
// A cache that contains all the pods in the system.
podsStore kcache . Store
2015-05-18 17:03:30 +00:00
// Lock for controlling access to headless services.
mlock sync . Mutex
2015-05-07 18:06:02 +00:00
}
2015-05-18 17:03:30 +00:00
// Removes 'subdomain' from etcd.
func ( ks * kube2sky ) removeDNS ( subdomain string ) error {
glog . V ( 2 ) . Infof ( "Removing %s from DNS" , subdomain )
2015-05-28 22:28:17 +00:00
resp , err := ks . etcdClient . RawGet ( skymsg . Path ( subdomain ) , false , true )
2015-05-18 17:03:30 +00:00
if err != nil {
return err
}
if resp . StatusCode == http . StatusNotFound {
glog . V ( 2 ) . Infof ( "Subdomain %q does not exist in etcd" , subdomain )
return nil
}
_ , err = ks . etcdClient . Delete ( skymsg . Path ( subdomain ) , true )
2014-11-11 21:15:59 +00:00
return err
}
2015-05-18 17:03:30 +00:00
func ( ks * kube2sky ) writeSkyRecord ( subdomain string , data string ) error {
// Set with no TTL, and hope that kubernetes events are accurate.
_ , err := ks . etcdClient . Set ( skymsg . Path ( subdomain ) , data , uint64 ( 0 ) )
return err
}
// Generates skydns records for a headless service.
2015-07-22 18:58:11 +00:00
func ( ks * kube2sky ) newHeadlessService ( subdomain string , service * kapi . Service ) error {
2015-05-18 17:03:30 +00:00
// Create an A record for every pod in the service.
// This record must be periodically updated.
// Format is as follows:
// For a service x, with pods a and b create DNS records,
// a.x.ns.domain. and, b.x.ns.domain.
ks . mlock . Lock ( )
defer ks . mlock . Unlock ( )
key , err := kcache . MetaNamespaceKeyFunc ( service )
if err != nil {
return err
}
e , exists , err := ks . endpointsStore . GetByKey ( key )
if err != nil {
return fmt . Errorf ( "failed to get endpoints object from endpoints store - %v" , err )
}
if ! exists {
2015-10-08 02:38:34 +00:00
glog . V ( 1 ) . Infof ( "Could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up." , service . Name , service . Namespace )
2015-03-16 21:36:30 +00:00
return nil
}
2015-05-18 17:03:30 +00:00
if e , ok := e . ( * kapi . Endpoints ) ; ok {
2015-07-22 18:58:11 +00:00
return ks . generateRecordsForHeadlessService ( subdomain , e , service )
2015-05-18 17:03:30 +00:00
}
return nil
}
2015-03-16 21:36:30 +00:00
2015-05-18 17:03:30 +00:00
func getSkyMsg ( ip string , port int ) * skymsg . Service {
return & skymsg . Service {
Host : ip ,
Port : port ,
Priority : 10 ,
Weight : 10 ,
Ttl : 30 ,
}
}
2015-07-22 18:58:11 +00:00
func ( ks * kube2sky ) generateRecordsForHeadlessService ( subdomain string , e * kapi . Endpoints , svc * kapi . Service ) error {
2016-02-02 18:59:54 +00:00
glog . V ( 4 ) . Infof ( "Endpoints Annotations: %v" , e . Annotations )
2015-05-18 17:03:30 +00:00
for idx := range e . Subsets {
for subIdx := range e . Subsets [ idx ] . Addresses {
2016-02-02 18:59:54 +00:00
endpointIP := e . Subsets [ idx ] . Addresses [ subIdx ] . IP
b , err := json . Marshal ( getSkyMsg ( endpointIP , 0 ) )
2015-05-18 17:03:30 +00:00
if err != nil {
return err
}
2015-05-28 22:28:17 +00:00
recordValue := string ( b )
recordLabel := getHash ( recordValue )
2016-02-02 18:59:54 +00:00
if serializedPodHostnames := e . Annotations [ endpoints . PodHostnamesAnnotation ] ; len ( serializedPodHostnames ) > 0 {
podHostnames := map [ string ] endpoints . HostRecord { }
err := json . Unmarshal ( [ ] byte ( serializedPodHostnames ) , & podHostnames )
if err != nil {
return err
}
if hostRecord , exists := podHostnames [ string ( endpointIP ) ] ; exists {
if validation . IsDNS1123Label ( hostRecord . HostName ) {
recordLabel = hostRecord . HostName
}
}
}
2015-05-28 22:28:17 +00:00
recordKey := buildDNSNameString ( subdomain , recordLabel )
glog . V ( 2 ) . Infof ( "Setting DNS record: %v -> %q\n" , recordKey , recordValue )
if err := ks . writeSkyRecord ( recordKey , recordValue ) ; err != nil {
2015-05-18 17:03:30 +00:00
return err
}
2015-07-22 18:58:11 +00:00
for portIdx := range e . Subsets [ idx ] . Ports {
endpointPort := & e . Subsets [ idx ] . Ports [ portIdx ]
portSegment := buildPortSegmentString ( endpointPort . Name , endpointPort . Protocol )
if portSegment != "" {
err := ks . generateSRVRecord ( subdomain , portSegment , recordLabel , recordKey , endpointPort . Port )
if err != nil {
return err
2015-05-28 22:28:17 +00:00
}
}
}
2015-03-30 21:01:46 +00:00
}
2015-05-18 17:03:30 +00:00
}
return nil
}
func ( ks * kube2sky ) getServiceFromEndpoints ( e * kapi . Endpoints ) ( * kapi . Service , error ) {
key , err := kcache . MetaNamespaceKeyFunc ( e )
if err != nil {
return nil , err
}
obj , exists , err := ks . servicesStore . GetByKey ( key )
if err != nil {
return nil , fmt . Errorf ( "failed to get service object from services store - %v" , err )
}
if ! exists {
glog . V ( 1 ) . Infof ( "could not find service for endpoint %q in namespace %q" , e . Name , e . Namespace )
return nil , nil
}
if svc , ok := obj . ( * kapi . Service ) ; ok {
return svc , nil
}
return nil , fmt . Errorf ( "got a non service object in services store %v" , obj )
}
2015-07-22 18:58:11 +00:00
func ( ks * kube2sky ) addDNSUsingEndpoints ( subdomain string , e * kapi . Endpoints ) error {
2015-05-18 17:03:30 +00:00
ks . mlock . Lock ( )
defer ks . mlock . Unlock ( )
svc , err := ks . getServiceFromEndpoints ( e )
if err != nil {
return err
}
if svc == nil || kapi . IsServiceIPSet ( svc ) {
// No headless service found corresponding to endpoints object.
return nil
}
// Remove existing DNS entry.
if err := ks . removeDNS ( subdomain ) ; err != nil {
return err
}
2015-07-22 18:58:11 +00:00
return ks . generateRecordsForHeadlessService ( subdomain , e , svc )
2015-05-18 17:03:30 +00:00
}
func ( ks * kube2sky ) handleEndpointAdd ( obj interface { } ) {
if e , ok := obj . ( * kapi . Endpoints ) ; ok {
2015-07-22 18:58:11 +00:00
name := buildDNSNameString ( ks . domain , serviceSubdomain , e . Namespace , e . Name )
ks . mutateEtcdOrDie ( func ( ) error { return ks . addDNSUsingEndpoints ( name , e ) } )
2015-05-18 17:03:30 +00:00
}
}
2015-09-04 14:30:21 +00:00
func ( ks * kube2sky ) handlePodCreate ( obj interface { } ) {
if e , ok := obj . ( * kapi . Pod ) ; ok {
// If the pod ip is not yet available, do not attempt to create.
if e . Status . PodIP != "" {
name := buildDNSNameString ( ks . domain , podSubdomain , e . Namespace , santizeIP ( e . Status . PodIP ) )
ks . mutateEtcdOrDie ( func ( ) error { return ks . generateRecordsForPod ( name , e ) } )
}
}
}
func ( ks * kube2sky ) handlePodUpdate ( old interface { } , new interface { } ) {
oldPod , okOld := old . ( * kapi . Pod )
newPod , okNew := new . ( * kapi . Pod )
// Validate that the objects are good
if okOld && okNew {
if oldPod . Status . PodIP != newPod . Status . PodIP {
ks . handlePodDelete ( oldPod )
ks . handlePodCreate ( newPod )
}
} else if okNew {
ks . handlePodCreate ( newPod )
} else if okOld {
ks . handlePodDelete ( oldPod )
}
}
func ( ks * kube2sky ) handlePodDelete ( obj interface { } ) {
if e , ok := obj . ( * kapi . Pod ) ; ok {
if e . Status . PodIP != "" {
name := buildDNSNameString ( ks . domain , podSubdomain , e . Namespace , santizeIP ( e . Status . PodIP ) )
ks . mutateEtcdOrDie ( func ( ) error { return ks . removeDNS ( name ) } )
}
}
}
func ( ks * kube2sky ) generateRecordsForPod ( subdomain string , service * kapi . Pod ) error {
b , err := json . Marshal ( getSkyMsg ( service . Status . PodIP , 0 ) )
if err != nil {
return err
}
recordValue := string ( b )
recordLabel := getHash ( recordValue )
recordKey := buildDNSNameString ( subdomain , recordLabel )
glog . V ( 2 ) . Infof ( "Setting DNS record: %v -> %q, with recordKey: %v\n" , subdomain , recordValue , recordKey )
if err := ks . writeSkyRecord ( recordKey , recordValue ) ; err != nil {
return err
}
return nil
}
2015-07-22 18:58:11 +00:00
func ( ks * kube2sky ) generateRecordsForPortalService ( subdomain string , service * kapi . Service ) error {
2015-05-28 22:28:17 +00:00
b , err := json . Marshal ( getSkyMsg ( service . Spec . ClusterIP , 0 ) )
if err != nil {
return err
}
recordValue := string ( b )
2015-07-22 18:58:11 +00:00
recordLabel := getHash ( recordValue )
recordKey := buildDNSNameString ( subdomain , recordLabel )
2015-05-28 22:28:17 +00:00
glog . V ( 2 ) . Infof ( "Setting DNS record: %v -> %q, with recordKey: %v\n" , subdomain , recordValue , recordKey )
if err := ks . writeSkyRecord ( recordKey , recordValue ) ; err != nil {
return err
}
// Generate SRV Records
for i := range service . Spec . Ports {
port := & service . Spec . Ports [ i ]
portSegment := buildPortSegmentString ( port . Name , port . Protocol )
if portSegment != "" {
err = ks . generateSRVRecord ( subdomain , portSegment , recordLabel , subdomain , port . Port )
if err != nil {
return err
}
2015-03-30 21:01:46 +00:00
}
}
return nil
2014-11-11 21:15:59 +00:00
}
2015-09-04 14:30:21 +00:00
func santizeIP ( ip string ) string {
return strings . Replace ( ip , "." , "-" , - 1 )
}
2015-05-28 22:28:17 +00:00
func buildPortSegmentString ( portName string , portProtocol kapi . Protocol ) string {
if portName == "" {
// we don't create a random name
return ""
}
if portProtocol == "" {
glog . Errorf ( "Port Protocol not set. port segment string cannot be created." )
return ""
}
return fmt . Sprintf ( "_%s._%s" , portName , strings . ToLower ( string ( portProtocol ) ) )
}
func ( ks * kube2sky ) generateSRVRecord ( subdomain , portSegment , recordName , cName string , portNumber int ) error {
recordKey := buildDNSNameString ( subdomain , portSegment , recordName )
srv_rec , err := json . Marshal ( getSkyMsg ( cName , portNumber ) )
if err != nil {
return err
}
if err := ks . writeSkyRecord ( recordKey , string ( srv_rec ) ) ; err != nil {
return err
}
return nil
}
2015-07-22 18:58:11 +00:00
func ( ks * kube2sky ) addDNS ( subdomain string , service * kapi . Service ) error {
2015-05-23 20:41:11 +00:00
// if ClusterIP is not set, a DNS entry should not be created
2015-05-18 17:03:30 +00:00
if ! kapi . IsServiceIPSet ( service ) {
2015-07-22 18:58:11 +00:00
return ks . newHeadlessService ( subdomain , service )
2015-05-18 17:03:30 +00:00
}
2016-01-13 18:36:08 +00:00
if len ( service . Spec . Ports ) == 0 {
glog . Info ( "Unexpected service with no ports, this should not have happend: %v" , service )
}
2015-07-22 18:58:11 +00:00
return ks . generateRecordsForPortalService ( subdomain , service )
2015-05-18 17:03:30 +00:00
}
2015-03-03 14:21:40 +00:00
// Implements retry logic for arbitrary mutator. Crashes after retrying for
2015-11-30 00:06:59 +00:00
// etcd-mutation-timeout.
2015-05-07 18:06:02 +00:00
func ( ks * kube2sky ) mutateEtcdOrDie ( mutator func ( ) error ) {
timeout := time . After ( ks . etcdMutationTimeout )
2015-03-03 14:21:40 +00:00
for {
select {
case <- timeout :
2015-05-07 18:06:02 +00:00
glog . Fatalf ( "Failed to mutate etcd for %v using mutator: %v" , ks . etcdMutationTimeout , mutator )
2015-03-03 14:21:40 +00:00
default :
if err := mutator ( ) ; err != nil {
delay := 50 * time . Millisecond
2015-05-07 18:06:02 +00:00
glog . V ( 1 ) . Infof ( "Failed to mutate etcd using mutator: %v due to: %v. Will retry in: %v" , mutator , err , delay )
2015-03-03 14:21:40 +00:00
time . Sleep ( delay )
} else {
return
}
}
}
}
2015-05-18 17:03:30 +00:00
func buildDNSNameString ( labels ... string ) string {
var res string
for _ , label := range labels {
if res == "" {
res = label
} else {
res = fmt . Sprintf ( "%s.%s" , label , res )
}
}
return res
}
// Returns a cache.ListWatch that gets all changes to services.
func createServiceLW ( kubeClient * kclient . Client ) * kcache . ListWatch {
2015-10-16 08:09:09 +00:00
return kcache . NewListWatchFromClient ( kubeClient , "services" , kapi . NamespaceAll , kselector . Everything ( ) )
2015-05-18 17:03:30 +00:00
}
// Returns a cache.ListWatch that gets all changes to endpoints.
func createEndpointsLW ( kubeClient * kclient . Client ) * kcache . ListWatch {
2015-10-16 08:09:09 +00:00
return kcache . NewListWatchFromClient ( kubeClient , "endpoints" , kapi . NamespaceAll , kselector . Everything ( ) )
2015-05-18 17:03:30 +00:00
}
2015-09-04 14:30:21 +00:00
// Returns a cache.ListWatch that gets all changes to pods.
func createEndpointsPodLW ( kubeClient * kclient . Client ) * kcache . ListWatch {
2015-10-16 08:09:09 +00:00
return kcache . NewListWatchFromClient ( kubeClient , "pods" , kapi . NamespaceAll , kselector . Everything ( ) )
2015-09-04 14:30:21 +00:00
}
2015-05-18 17:03:30 +00:00
func ( ks * kube2sky ) newService ( obj interface { } ) {
if s , ok := obj . ( * kapi . Service ) ; ok {
2015-07-22 18:58:11 +00:00
name := buildDNSNameString ( ks . domain , serviceSubdomain , s . Namespace , s . Name )
ks . mutateEtcdOrDie ( func ( ) error { return ks . addDNS ( name , s ) } )
2015-05-18 17:03:30 +00:00
}
}
func ( ks * kube2sky ) removeService ( obj interface { } ) {
if s , ok := obj . ( * kapi . Service ) ; ok {
2015-07-22 18:58:11 +00:00
name := buildDNSNameString ( ks . domain , serviceSubdomain , s . Namespace , s . Name )
2015-05-18 17:03:30 +00:00
ks . mutateEtcdOrDie ( func ( ) error { return ks . removeDNS ( name ) } )
}
}
2015-05-28 22:28:17 +00:00
func ( ks * kube2sky ) updateService ( oldObj , newObj interface { } ) {
2016-03-01 03:40:15 +00:00
// TODO: We shouldn't leave etcd in a state where it doesn't have a
// record for a Service. This removal is needed to completely clean
// the directory of a Service, which has SRV records and A records
// that are hashed according to oldObj. Unfortunately, this is the
// easiest way to purge the directory.
2015-05-28 22:28:17 +00:00
ks . removeService ( oldObj )
ks . newService ( newObj )
}
2015-05-07 18:06:02 +00:00
func newEtcdClient ( etcdServer string ) ( * etcd . Client , error ) {
var (
client * etcd . Client
err error
)
2015-05-12 21:35:34 +00:00
for attempt := 1 ; attempt <= maxConnectAttempts ; attempt ++ {
2015-11-23 19:32:50 +00:00
if _ , err = etcdutil . GetEtcdVersion ( etcdServer ) ; err == nil {
2015-05-07 18:06:02 +00:00
break
}
2015-05-12 21:35:34 +00:00
if attempt == maxConnectAttempts {
2015-03-04 23:41:17 +00:00
break
}
2015-05-12 21:35:34 +00:00
glog . Infof ( "[Attempt: %d] Attempting access to etcd after 5 second sleep" , attempt )
2015-05-07 18:06:02 +00:00
time . Sleep ( 5 * time . Second )
2015-03-04 23:41:17 +00:00
}
2015-05-07 18:06:02 +00:00
if err != nil {
return nil , fmt . Errorf ( "failed to connect to etcd server: %v, error: %v" , etcdServer , err )
}
glog . Infof ( "Etcd server found: %v" , etcdServer )
2015-05-02 07:13:52 +00:00
// loop until we have > 0 machines && machines[0] != ""
2015-05-04 22:45:33 +00:00
poll , timeout := 1 * time . Second , 10 * time . Second
if err := wait . Poll ( poll , timeout , func ( ) ( bool , error ) {
2015-05-07 18:06:02 +00:00
if client = etcd . NewClient ( [ ] string { etcdServer } ) ; client == nil {
return false , fmt . Errorf ( "etcd.NewClient returned nil" )
2015-05-02 07:13:52 +00:00
}
client . SyncCluster ( )
machines := client . GetCluster ( )
2015-05-04 22:45:33 +00:00
if len ( machines ) == 0 || len ( machines [ 0 ] ) == 0 {
return false , nil
2015-05-02 07:13:52 +00:00
}
2015-05-04 22:45:33 +00:00
return true , nil
} ) ; err != nil {
2015-05-07 18:06:02 +00:00
return nil , fmt . Errorf ( "Timed out after %s waiting for at least 1 synchronized etcd server in the cluster. Error: %v" , timeout , err )
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
return client , nil
}
2015-06-20 03:59:58 +00:00
func expandKubeMasterURL ( ) ( string , error ) {
2015-05-28 18:07:09 +00:00
parsedURL , err := url . Parse ( os . ExpandEnv ( * argKubeMasterURL ) )
2015-05-07 18:06:02 +00:00
if err != nil {
2015-11-30 00:06:59 +00:00
return "" , fmt . Errorf ( "failed to parse --kube-master-url %s - %v" , * argKubeMasterURL , err )
2015-05-07 18:06:02 +00:00
}
2015-05-28 18:07:09 +00:00
if parsedURL . Scheme == "" || parsedURL . Host == "" || parsedURL . Host == ":" {
2015-11-30 00:06:59 +00:00
return "" , fmt . Errorf ( "invalid --kube-master-url specified %s" , * argKubeMasterURL )
2015-05-07 18:06:02 +00:00
}
2015-05-28 18:07:09 +00:00
return parsedURL . String ( ) , nil
2014-11-11 21:15:59 +00:00
}
// TODO: evaluate using pkg/client/clientcmd
func newKubeClient ( ) ( * kclient . Client , error ) {
2015-05-22 23:07:58 +00:00
var (
2016-02-12 18:58:43 +00:00
config * restclient . Config
2015-05-22 23:07:58 +00:00
err error
masterURL string
)
2015-11-30 00:06:59 +00:00
// If the user specified --kube-master-url, expand env vars and verify it.
2015-05-28 18:07:09 +00:00
if * argKubeMasterURL != "" {
2015-06-20 03:59:58 +00:00
masterURL , err = expandKubeMasterURL ( )
2015-05-22 23:07:58 +00:00
if err != nil {
return nil , err
}
2015-05-07 18:06:02 +00:00
}
2015-09-04 14:30:21 +00:00
2015-06-20 03:59:58 +00:00
if masterURL != "" && * argKubecfgFile == "" {
2015-11-30 00:06:59 +00:00
// Only --kube-master-url was provided.
2016-02-12 18:58:43 +00:00
config = & restclient . Config {
2015-12-25 23:05:01 +00:00
Host : masterURL ,
2016-02-12 18:58:43 +00:00
ContentConfig : restclient . ContentConfig { GroupVersion : & unversioned . GroupVersion { Version : "v1" } } ,
2015-04-23 23:42:10 +00:00
}
} else {
2015-06-20 03:59:58 +00:00
// We either have:
2015-11-30 00:06:59 +00:00
// 1) --kube-master-url and --kubecfg-file
// 2) just --kubecfg-file
2015-06-20 03:59:58 +00:00
// 3) neither flag
// In any case, the logic is the same. If (3), this will automatically
// fall back on the service account token.
2015-05-22 23:07:58 +00:00
overrides := & kclientcmd . ConfigOverrides { }
2015-06-20 03:59:58 +00:00
overrides . ClusterInfo . Server = masterURL // might be "", but that is OK
rules := & kclientcmd . ClientConfigLoadingRules { ExplicitPath : * argKubecfgFile } // might be "", but that is OK
if config , err = kclientcmd . NewNonInteractiveDeferredLoadingClientConfig ( rules , overrides ) . ClientConfig ( ) ; err != nil {
2015-04-23 23:42:10 +00:00
return nil , err
}
2014-11-11 21:15:59 +00:00
}
2015-06-20 03:59:58 +00:00
2015-05-07 18:06:02 +00:00
glog . Infof ( "Using %s for kubernetes master" , config . Host )
2015-11-13 21:20:54 +00:00
glog . Infof ( "Using kubernetes API %v" , config . GroupVersion )
2014-11-11 21:15:59 +00:00
return kclient . New ( config )
}
2015-05-18 17:03:30 +00:00
func watchForServices ( kubeClient * kclient . Client , ks * kube2sky ) kcache . Store {
serviceStore , serviceController := kframework . NewInformer (
2015-05-12 21:35:34 +00:00
createServiceLW ( kubeClient ) ,
2015-05-07 18:06:02 +00:00
& kapi . Service { } ,
resyncPeriod ,
2015-05-18 17:03:30 +00:00
kframework . ResourceEventHandlerFuncs {
2015-05-07 18:06:02 +00:00
AddFunc : ks . newService ,
DeleteFunc : ks . removeService ,
2015-05-28 22:28:17 +00:00
UpdateFunc : ks . updateService ,
2015-05-07 18:06:02 +00:00
} ,
)
2016-02-02 10:57:06 +00:00
go serviceController . Run ( wait . NeverStop )
2015-05-18 17:03:30 +00:00
return serviceStore
}
func watchEndpoints ( kubeClient * kclient . Client , ks * kube2sky ) kcache . Store {
eStore , eController := kframework . NewInformer (
createEndpointsLW ( kubeClient ) ,
& kapi . Endpoints { } ,
resyncPeriod ,
kframework . ResourceEventHandlerFuncs {
AddFunc : ks . handleEndpointAdd ,
UpdateFunc : func ( oldObj , newObj interface { } ) {
// TODO: Avoid unwanted updates.
ks . handleEndpointAdd ( newObj )
} ,
} ,
)
2016-02-02 10:57:06 +00:00
go eController . Run ( wait . NeverStop )
2015-05-18 17:03:30 +00:00
return eStore
2014-11-11 21:15:59 +00:00
}
2015-09-04 14:30:21 +00:00
func watchPods ( kubeClient * kclient . Client , ks * kube2sky ) kcache . Store {
eStore , eController := kframework . NewInformer (
createEndpointsPodLW ( kubeClient ) ,
& kapi . Pod { } ,
resyncPeriod ,
kframework . ResourceEventHandlerFuncs {
AddFunc : ks . handlePodCreate ,
UpdateFunc : func ( oldObj , newObj interface { } ) {
ks . handlePodUpdate ( oldObj , newObj )
} ,
DeleteFunc : ks . handlePodDelete ,
} ,
)
2016-02-02 10:57:06 +00:00
go eController . Run ( wait . NeverStop )
2015-09-04 14:30:21 +00:00
return eStore
}
2015-05-28 22:28:17 +00:00
func getHash ( text string ) string {
h := fnv . New32a ( )
h . Write ( [ ] byte ( text ) )
return fmt . Sprintf ( "%x" , h . Sum32 ( ) )
}
2016-03-01 03:40:15 +00:00
// waitForKubernetesService waits for the "Kuberntes" master service.
// Since the health probe on the kube2sky container is essentially an nslookup
// of this service, we cannot serve any DNS records if it doesn't show up.
// Once the Service is found, we start replying on this containers readiness
// probe endpoint.
func waitForKubernetesService ( client * kclient . Client ) ( svc * kapi . Service ) {
name := fmt . Sprintf ( "%v/%v" , kapi . NamespaceDefault , kubernetesSvcName )
glog . Infof ( "Waiting for service: %v" , name )
var err error
servicePollInterval := 1 * time . Second
for {
svc , err = client . Services ( kapi . NamespaceDefault ) . Get ( kubernetesSvcName )
if err != nil || svc == nil {
glog . Infof ( "Ignoring error while waiting for service %v: %v. Sleeping %v before retrying." , name , err , servicePollInterval )
time . Sleep ( servicePollInterval )
continue
}
break
}
return
}
// setupSignalHandlers runs a goroutine that waits on SIGINT or SIGTERM and logs it
// before exiting.
func setupSignalHandlers ( ) {
sigChan := make ( chan os . Signal )
signal . Notify ( sigChan , syscall . SIGINT , syscall . SIGTERM )
// This program should always exit gracefully logging that it received
// either a SIGINT or SIGTERM. Since kube2sky is run in a container
// without a liveness probe as part of the kube-dns pod, it shouldn't
// restart unless the pod is deleted. If it restarts without logging
// anything it means something is seriously wrong.
// TODO: Remove once #22290 is fixed.
go func ( ) {
glog . Fatalf ( "Received signal %s" , <- sigChan )
} ( )
}
// setupHealthzHandlers sets up a readiness and liveness endpoint for kube2sky.
func setupHealthzHandlers ( ks * kube2sky ) {
http . HandleFunc ( "/readiness" , func ( w http . ResponseWriter , req * http . Request ) {
fmt . Fprintf ( w , "ok\n" )
} )
}
2015-05-07 18:06:02 +00:00
func main ( ) {
2015-11-30 00:06:59 +00:00
flag . CommandLine . SetNormalizeFunc ( util . WarnWordSepNormalizeFunc )
2015-05-07 18:06:02 +00:00
flag . Parse ( )
var err error
2016-03-01 03:40:15 +00:00
setupSignalHandlers ( )
2015-05-07 18:06:02 +00:00
// TODO: Validate input flags.
2015-05-18 17:03:30 +00:00
domain := * argDomain
if ! strings . HasSuffix ( domain , "." ) {
domain = fmt . Sprintf ( "%s." , domain )
}
2015-05-07 18:06:02 +00:00
ks := kube2sky {
2015-05-18 17:03:30 +00:00
domain : domain ,
2015-05-07 18:06:02 +00:00
etcdMutationTimeout : * argEtcdMutationTimeout ,
2014-11-11 21:15:59 +00:00
}
2015-05-07 18:06:02 +00:00
if ks . etcdClient , err = newEtcdClient ( * argEtcdServer ) ; err != nil {
glog . Fatalf ( "Failed to create etcd client - %v" , err )
2014-11-11 21:15:59 +00:00
}
2015-05-12 21:35:34 +00:00
kubeClient , err := newKubeClient ( )
if err != nil {
2015-05-07 18:06:02 +00:00
glog . Fatalf ( "Failed to create a kubernetes client: %v" , err )
2014-11-11 21:15:59 +00:00
}
2016-03-01 03:40:15 +00:00
// Wait synchronously for the Kubernetes service and add a DNS record for it.
ks . newService ( waitForKubernetesService ( kubeClient ) )
glog . Infof ( "Successfully added DNS record for Kubernetes service." )
2014-11-11 21:15:59 +00:00
2015-05-18 17:03:30 +00:00
ks . endpointsStore = watchEndpoints ( kubeClient , & ks )
ks . servicesStore = watchForServices ( kubeClient , & ks )
2015-11-30 16:21:44 +00:00
ks . podsStore = watchPods ( kubeClient , & ks )
2015-05-18 17:03:30 +00:00
2016-03-01 03:40:15 +00:00
// We declare kube2sky ready when:
// 1. It has retrieved the Kubernetes master service from the apiserver. If this
// doesn't happen skydns will fail its liveness probe assuming that it can't
// perform any cluster local DNS lookups.
// 2. It has setup the 3 watches above.
// Once ready this container never flips to not-ready.
setupHealthzHandlers ( & ks )
glog . Fatal ( http . ListenAndServe ( fmt . Sprintf ( ":%d" , * healthzPort ) , nil ) )
2014-11-11 21:15:59 +00:00
}