diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 7c6071036d..56308f9e55 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -33,7 +33,7 @@ import ( ) var ( - file = flag.String("config", "", "Path to the config file") + file = flag.String("config", "", "Path to the config file/dir") etcdServers = flag.String("etcd_servers", "", "Url of etcd servers in the cluster") syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data") diff --git a/cmd/localkube/localkube.go b/cmd/localkube/localkube.go index 5f20116219..42f108fcc1 100644 --- a/cmd/localkube/localkube.go +++ b/cmd/localkube/localkube.go @@ -38,7 +38,7 @@ import ( // kubelet flags var ( - file = flag.String("config", "", "Path to the config file") + file = flag.String("config", "", "Path to the config file/dir") syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data") httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data") diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index fc7236a723..2f1c922c63 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -26,6 +26,7 @@ import ( "net/http" "os" "os/exec" + "path/filepath" "strconv" "strings" "sync" @@ -364,28 +365,43 @@ func (kl *Kubelet) KillContainer(name string) error { return err } -func (kl *Kubelet) extractFromFile(name string, changeChannel chan<- []api.ContainerManifest) error { +func (kl *Kubelet) extractFromFile(name string) (api.ContainerManifest, error) { var file *os.File var err error + var manifest api.ContainerManifest + if file, err = os.Open(name); err != nil { - return err + return manifest, err } - return kl.extractFromReader(file, changeChannel) -} - -func (kl *Kubelet) extractFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error { - var manifest api.ContainerManifest - data, err := ioutil.ReadAll(reader) + data, err := ioutil.ReadAll(file) if err != nil { - log.Printf("Couldn't read from reader: %v", err) - return err + log.Printf("Couldn't read from file: %v", err) + return manifest, err } if err = kl.ExtractYAMLData(data, &manifest); err != nil { - return err + return manifest, err } - changeChannel <- []api.ContainerManifest{manifest} - return nil + return manifest, nil +} + +func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error) { + var manifests []api.ContainerManifest + + files, err := filepath.Glob(filepath.Join(name, "*")) + if err != nil { + return manifests, err + } + + for _, file := range files { + manifest, err := kl.extractFromFile(file) + if err != nil { + log.Printf("Couldn't read from file %s: %v", file, err) + return manifests, err + } + manifests = append(manifests, manifest) + } + return manifests, nil } func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error { @@ -407,10 +423,28 @@ func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel cha func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerManifest) { for { var err error + time.Sleep(kl.FileCheckFrequency) - err = kl.extractFromFile(file, changeChannel) + + fileInfo, err := os.Stat(file) if err != nil { log.Printf("Error polling file: %#v", err) + continue + } + if fileInfo.IsDir() { + manifests, err := kl.extractFromDir(file) + if err != nil { + log.Printf("Error polling dir: %#v", err) + continue + } + changeChannel <- manifests + } else { + manifest, err := kl.extractFromFile(file) + if err != nil { + log.Printf("Error polling file: %#v", err) + continue + } + changeChannel <- []api.ContainerManifest{manifest} } } } @@ -672,7 +706,7 @@ func (kl *Kubelet) RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChan for { select { case manifests := <-fileChannel: - log.Printf("Got new configuration from file... %v", manifests) + log.Printf("Got new configuration from file/dir... %v", manifests) lastFile = manifests case manifests := <-etcdChannel: log.Printf("Got new configuration from etcd... %v", manifests) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index f966489e33..b0c7b24bf2 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -729,26 +729,14 @@ func TestMakePortsAndBindings(t *testing.T) { func TestExtractFromNonExistentFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan []api.ContainerManifest) - reader := startReading(changeChannel) - - err := kubelet.extractFromFile("/some/fake/file", changeChannel) - close(changeChannel) - + _, err := kubelet.extractFromFile("/some/fake/file") if err == nil { t.Error("Unexpected non-error.") } - - list := reader.GetList() - if len(list) != 0 { - t.Errorf("Unexpected list: %#v", list) - } } func TestExtractFromBadDataFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan []api.ContainerManifest) - reader := startReading(changeChannel) badData := []byte{1, 2, 3} file, err := ioutil.TempFile("", "foo") @@ -756,44 +744,68 @@ func TestExtractFromBadDataFile(t *testing.T) { name := file.Name() file.Close() ioutil.WriteFile(name, badData, 0755) - err = kubelet.extractFromFile(name, changeChannel) - close(changeChannel) + _, err = kubelet.extractFromFile(name) if err == nil { t.Error("Unexpected non-error.") } - list := reader.GetList() - if len(list) != 0 { - t.Errorf("Unexpected list: %#v", list) - } } func TestExtractFromValidDataFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan []api.ContainerManifest) - reader := startReading(changeChannel) - manifests := []api.ContainerManifest{ - {Id: "bar"}, - } - data, err := json.Marshal(manifests[0]) // Right now, files only support a single manifest + manifest := api.ContainerManifest{Id: "bar"} + data, err := json.Marshal(manifest) expectNoError(t, err) file, err := ioutil.TempFile("", "foo") expectNoError(t, err) name := file.Name() expectNoError(t, file.Close()) ioutil.WriteFile(name, data, 0755) - err = kubelet.extractFromFile(name, changeChannel) - close(changeChannel) + read, err := kubelet.extractFromFile(name) expectNoError(t, err) - read := reader.GetList() - if len(read) != 1 { - t.Errorf("Unexpected channel traffic: %#v", read) + if !reflect.DeepEqual(read, manifest) { + t.Errorf("Unexpected difference. Expected %#v, got %#v", manifest, read) } - if !reflect.DeepEqual(read[0], manifests) { - t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read[0]) +} + +func TestExtractFromEmptyDir(t *testing.T) { + kubelet := Kubelet{} + + dirName, err := ioutil.TempDir("", "foo") + expectNoError(t, err) + + _, err = kubelet.extractFromDir(dirName) + expectNoError(t, err) +} + +func TestExtractFromDir(t *testing.T) { + kubelet := Kubelet{} + + manifests := []api.ContainerManifest{ + {Id: "foo"}, + {Id: "bar"}, + } + + dirName, err := ioutil.TempDir("", "foo") + expectNoError(t, err) + + for _, manifest := range manifests { + data, err := json.Marshal(manifest) + expectNoError(t, err) + file, err := ioutil.TempFile(dirName, "kub") + expectNoError(t, err) + name := file.Name() + expectNoError(t, file.Close()) + ioutil.WriteFile(name, data, 0755) + } + + read, err := kubelet.extractFromDir(dirName) + expectNoError(t, err) + if !reflect.DeepEqual(read, manifests) { + t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read) } }