Add config dir support to kubelet

pull/6/head
Justin Huff 2014-06-19 13:06:52 -07:00
parent f49b9c2429
commit d204f76484
4 changed files with 95 additions and 49 deletions

View File

@ -33,7 +33,7 @@ import (
) )
var ( var (
file = flag.String("config", "", "Path to the config file") file = flag.String("config", "", "Path to the config file/dir")
etcdServers = flag.String("etcd_servers", "", "Url of etcd servers in the cluster") etcdServers = flag.String("etcd_servers", "", "Url of etcd servers in the cluster")
syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config")
fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data") fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data")

View File

@ -38,7 +38,7 @@ import (
// kubelet flags // kubelet flags
var ( var (
file = flag.String("config", "", "Path to the config file") file = flag.String("config", "", "Path to the config file/dir")
syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config")
fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data") fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data")
httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data") httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data")

View File

@ -26,6 +26,7 @@ import (
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -364,28 +365,43 @@ func (kl *Kubelet) KillContainer(name string) error {
return err return err
} }
func (kl *Kubelet) extractFromFile(name string, changeChannel chan<- []api.ContainerManifest) error { func (kl *Kubelet) extractFromFile(name string) (api.ContainerManifest, error) {
var file *os.File var file *os.File
var err error var err error
var manifest api.ContainerManifest
if file, err = os.Open(name); err != nil { if file, err = os.Open(name); err != nil {
return err return manifest, err
} }
return kl.extractFromReader(file, changeChannel) data, err := ioutil.ReadAll(file)
}
func (kl *Kubelet) extractFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error {
var manifest api.ContainerManifest
data, err := ioutil.ReadAll(reader)
if err != nil { if err != nil {
log.Printf("Couldn't read from reader: %v", err) log.Printf("Couldn't read from file: %v", err)
return err return manifest, err
} }
if err = kl.ExtractYAMLData(data, &manifest); err != nil { if err = kl.ExtractYAMLData(data, &manifest); err != nil {
return err return manifest, err
} }
changeChannel <- []api.ContainerManifest{manifest} return manifest, nil
return nil }
func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error) {
var manifests []api.ContainerManifest
files, err := filepath.Glob(filepath.Join(name, "*"))
if err != nil {
return manifests, err
}
for _, file := range files {
manifest, err := kl.extractFromFile(file)
if err != nil {
log.Printf("Couldn't read from file %s: %v", file, err)
return manifests, err
}
manifests = append(manifests, manifest)
}
return manifests, nil
} }
func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error { func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error {
@ -407,10 +423,28 @@ func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel cha
func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerManifest) { func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerManifest) {
for { for {
var err error var err error
time.Sleep(kl.FileCheckFrequency) time.Sleep(kl.FileCheckFrequency)
err = kl.extractFromFile(file, changeChannel)
fileInfo, err := os.Stat(file)
if err != nil { if err != nil {
log.Printf("Error polling file: %#v", err) log.Printf("Error polling file: %#v", err)
continue
}
if fileInfo.IsDir() {
manifests, err := kl.extractFromDir(file)
if err != nil {
log.Printf("Error polling dir: %#v", err)
continue
}
changeChannel <- manifests
} else {
manifest, err := kl.extractFromFile(file)
if err != nil {
log.Printf("Error polling file: %#v", err)
continue
}
changeChannel <- []api.ContainerManifest{manifest}
} }
} }
} }
@ -672,7 +706,7 @@ func (kl *Kubelet) RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChan
for { for {
select { select {
case manifests := <-fileChannel: case manifests := <-fileChannel:
log.Printf("Got new configuration from file... %v", manifests) log.Printf("Got new configuration from file/dir... %v", manifests)
lastFile = manifests lastFile = manifests
case manifests := <-etcdChannel: case manifests := <-etcdChannel:
log.Printf("Got new configuration from etcd... %v", manifests) log.Printf("Got new configuration from etcd... %v", manifests)

View File

@ -729,26 +729,14 @@ func TestMakePortsAndBindings(t *testing.T) {
func TestExtractFromNonExistentFile(t *testing.T) { func TestExtractFromNonExistentFile(t *testing.T) {
kubelet := Kubelet{} kubelet := Kubelet{}
changeChannel := make(chan []api.ContainerManifest) _, err := kubelet.extractFromFile("/some/fake/file")
reader := startReading(changeChannel)
err := kubelet.extractFromFile("/some/fake/file", changeChannel)
close(changeChannel)
if err == nil { if err == nil {
t.Error("Unexpected non-error.") t.Error("Unexpected non-error.")
} }
list := reader.GetList()
if len(list) != 0 {
t.Errorf("Unexpected list: %#v", list)
}
} }
func TestExtractFromBadDataFile(t *testing.T) { func TestExtractFromBadDataFile(t *testing.T) {
kubelet := Kubelet{} kubelet := Kubelet{}
changeChannel := make(chan []api.ContainerManifest)
reader := startReading(changeChannel)
badData := []byte{1, 2, 3} badData := []byte{1, 2, 3}
file, err := ioutil.TempFile("", "foo") file, err := ioutil.TempFile("", "foo")
@ -756,44 +744,68 @@ func TestExtractFromBadDataFile(t *testing.T) {
name := file.Name() name := file.Name()
file.Close() file.Close()
ioutil.WriteFile(name, badData, 0755) ioutil.WriteFile(name, badData, 0755)
err = kubelet.extractFromFile(name, changeChannel) _, err = kubelet.extractFromFile(name)
close(changeChannel)
if err == nil { if err == nil {
t.Error("Unexpected non-error.") t.Error("Unexpected non-error.")
} }
list := reader.GetList()
if len(list) != 0 {
t.Errorf("Unexpected list: %#v", list)
}
} }
func TestExtractFromValidDataFile(t *testing.T) { func TestExtractFromValidDataFile(t *testing.T) {
kubelet := Kubelet{} kubelet := Kubelet{}
changeChannel := make(chan []api.ContainerManifest)
reader := startReading(changeChannel)
manifests := []api.ContainerManifest{ manifest := api.ContainerManifest{Id: "bar"}
{Id: "bar"}, data, err := json.Marshal(manifest)
}
data, err := json.Marshal(manifests[0]) // Right now, files only support a single manifest
expectNoError(t, err) expectNoError(t, err)
file, err := ioutil.TempFile("", "foo") file, err := ioutil.TempFile("", "foo")
expectNoError(t, err) expectNoError(t, err)
name := file.Name() name := file.Name()
expectNoError(t, file.Close()) expectNoError(t, file.Close())
ioutil.WriteFile(name, data, 0755) ioutil.WriteFile(name, data, 0755)
err = kubelet.extractFromFile(name, changeChannel)
close(changeChannel)
read, err := kubelet.extractFromFile(name)
expectNoError(t, err) expectNoError(t, err)
read := reader.GetList() if !reflect.DeepEqual(read, manifest) {
if len(read) != 1 { t.Errorf("Unexpected difference. Expected %#v, got %#v", manifest, read)
t.Errorf("Unexpected channel traffic: %#v", read)
} }
if !reflect.DeepEqual(read[0], manifests) { }
t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read[0])
func TestExtractFromEmptyDir(t *testing.T) {
kubelet := Kubelet{}
dirName, err := ioutil.TempDir("", "foo")
expectNoError(t, err)
_, err = kubelet.extractFromDir(dirName)
expectNoError(t, err)
}
func TestExtractFromDir(t *testing.T) {
kubelet := Kubelet{}
manifests := []api.ContainerManifest{
{Id: "foo"},
{Id: "bar"},
}
dirName, err := ioutil.TempDir("", "foo")
expectNoError(t, err)
for _, manifest := range manifests {
data, err := json.Marshal(manifest)
expectNoError(t, err)
file, err := ioutil.TempFile(dirName, "kub")
expectNoError(t, err)
name := file.Name()
expectNoError(t, file.Close())
ioutil.WriteFile(name, data, 0755)
}
read, err := kubelet.extractFromDir(dirName)
expectNoError(t, err)
if !reflect.DeepEqual(read, manifests) {
t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read)
} }
} }