Merge pull request #4980 from rsokolowski/skydns-resilient-to-restart

Crash kube2sky after repeated etcd mutation failures.
pull/6/head
Tim Hockin 2015-03-05 09:22:16 -08:00
commit c021719e0a
2 changed files with 29 additions and 10 deletions

View File

@ -20,3 +20,6 @@ example, if this is set to `kubernetes.io`, then a service named "nifty" in the
"nifty.default.kubernetes.io". "nifty.default.kubernetes.io".
`-verbose`: Log additional information. `-verbose`: Log additional information.
'-etcd_mutation_timeout': For how long the application will keep retrying etcd
mutation (insertion or removal of a dns entry) before giving up and crashing.

View File

@ -25,6 +25,7 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"time"
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
@ -35,8 +36,9 @@ import (
) )
var ( var (
domain = flag.String("domain", "kubernetes.local", "domain under which to create names") domain = flag.String("domain", "kubernetes.local", "domain under which to create names")
verbose = flag.Bool("verbose", false, "log extra information") verbose = flag.Bool("verbose", false, "log extra information")
etcd_mutation_timeout = flag.Duration("etcd_mutation_timeout", 10*time.Second, "crash after retrying etcd mutation for a specified duration")
) )
func removeDNS(record string, etcdClient *etcd.Client) error { func removeDNS(record string, etcdClient *etcd.Client) error {
@ -64,6 +66,26 @@ func addDNS(record string, service *kapi.Service, etcdClient *etcd.Client) error
return err return err
} }
// Implements retry logic for arbitrary mutator. Crashes after retrying for
// etcd_mutation_timeout.
func mutateEtcdOrDie(mutator func() error) {
timeout := time.After(*etcd_mutation_timeout)
for {
select {
case <-timeout:
log.Fatalf("Failed to mutate etcd for %v using mutator: %v", *etcd_mutation_timeout, mutator)
default:
if err := mutator(); err != nil {
delay := 50 * time.Millisecond
log.Printf("Failed to mutate etcd using mutator: %v due to: %v. Will retry in: %v", mutator, err, delay)
time.Sleep(delay)
} else {
return
}
}
}
}
func newEtcdClient() (client *etcd.Client) { func newEtcdClient() (client *etcd.Client) {
// TODO: take a flag for etcd server(s). // TODO: take a flag for etcd server(s).
client = etcd.NewClient([]string{"http://127.0.0.1:4001"}) client = etcd.NewClient([]string{"http://127.0.0.1:4001"})
@ -116,19 +138,13 @@ func watchOnce(etcdClient *etcd.Client, kubeClient *kclient.Client) {
for i := range ev.Services { for i := range ev.Services {
s := &ev.Services[i] s := &ev.Services[i]
name := buildNameString(s.Name, s.Namespace, *domain) name := buildNameString(s.Name, s.Namespace, *domain)
err := addDNS(name, s, etcdClient) mutateEtcdOrDie(func() error { return addDNS(name, s, etcdClient) })
if err != nil {
log.Printf("Failed to add DNS for %s: %v", name, err)
}
} }
case RemoveService: case RemoveService:
for i := range ev.Services { for i := range ev.Services {
s := &ev.Services[i] s := &ev.Services[i]
name := buildNameString(s.Name, s.Namespace, *domain) name := buildNameString(s.Name, s.Namespace, *domain)
err := removeDNS(name, etcdClient) mutateEtcdOrDie(func() error { return removeDNS(name, etcdClient) })
if err != nil {
log.Printf("Failed to remove DNS for %s: %v", name, err)
}
} }
} }
} }