diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a1c7dac82d..fc7236a723 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -17,7 +17,6 @@ limitations under the License. package kubelet import ( - "bytes" "encoding/json" "fmt" "io" @@ -79,12 +78,14 @@ type Kubelet struct { // Starts background goroutines. If file, manifest_url, or address are empty, // they are not watched. Never returns. func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) { - fileChannel := make(chan api.ContainerManifest) + fileChannel := make(chan []api.ContainerManifest) etcdChannel := make(chan []api.ContainerManifest) - httpChannel := make(chan api.ContainerManifest) - serverChannel := make(chan api.ContainerManifest) + httpChannel := make(chan []api.ContainerManifest) + serverChannel := make(chan []api.ContainerManifest) - go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second) + if file != "" { + go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second) + } if manifest_url != "" { go util.Forever(func() { kl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second) } @@ -363,72 +364,78 @@ func (kl *Kubelet) KillContainer(name string) error { return err } -func (kl *Kubelet) extractFromFile(lastData []byte, name string, changeChannel chan<- api.ContainerManifest) ([]byte, error) { +func (kl *Kubelet) extractFromFile(name string, changeChannel chan<- []api.ContainerManifest) error { var file *os.File var err error if file, err = os.Open(name); err != nil { - return lastData, err + return err } - return kl.extractFromReader(lastData, file, changeChannel) + return kl.extractFromReader(file, changeChannel) } -func (kl *Kubelet) extractFromReader(lastData []byte, reader io.Reader, changeChannel chan<- api.ContainerManifest) ([]byte, error) { +func (kl *Kubelet) extractFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error { var manifest api.ContainerManifest data, err := ioutil.ReadAll(reader) if err != nil { - log.Printf("Couldn't read file: %v", err) - return lastData, err + log.Printf("Couldn't read from reader: %v", err) + return err } if err = kl.ExtractYAMLData(data, &manifest); err != nil { - return lastData, err + return err } - if !bytes.Equal(lastData, data) { - lastData = data - // Ok, we have a valid configuration, send to channel for - // rejiggering. - changeChannel <- manifest - return data, nil + changeChannel <- []api.ContainerManifest{manifest} + return nil +} + +func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error { + var manifests []api.ContainerManifest + data, err := ioutil.ReadAll(reader) + if err != nil { + log.Printf("Couldn't read from reader: %v", err) + return err } - return lastData, nil + if err = kl.ExtractYAMLData(data, &manifests); err != nil { + return err + } + changeChannel <- manifests + return nil } // Watch a file for changes to the set of pods that should run on this Kubelet // This function loops forever and is intended to be run as a goroutine -func (kl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) { - var lastData []byte +func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerManifest) { for { var err error time.Sleep(kl.FileCheckFrequency) - lastData, err = kl.extractFromFile(lastData, file, changeChannel) + err = kl.extractFromFile(file, changeChannel) if err != nil { log.Printf("Error polling file: %#v", err) } } } -func (kl *Kubelet) extractFromHTTP(lastData []byte, url string, changeChannel chan<- api.ContainerManifest) ([]byte, error) { +func (kl *Kubelet) extractFromHTTP(url string, changeChannel chan<- []api.ContainerManifest) error { client := &http.Client{} request, err := http.NewRequest("GET", url, nil) if err != nil { - return lastData, err + return err } response, err := client.Do(request) if err != nil { - return lastData, err + return err } defer response.Body.Close() - return kl.extractFromReader(lastData, response.Body, changeChannel) + return kl.extractMultipleFromReader(response.Body, changeChannel) } // Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet // This function runs forever and is intended to be run as a goroutine -func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) { - var lastData []byte +func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- []api.ContainerManifest) { for { var err error time.Sleep(kl.HTTPCheckFrequency) - lastData, err = kl.extractFromHTTP(lastData, url, changeChannel) + err = kl.extractFromHTTP(url, changeChannel) if err != nil { log.Printf("Error syncing http: %#v", err) } @@ -655,27 +662,27 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { } // runSyncLoop is the main loop for processing changes. It watches for changes from -// four channels (file, etcd, server, and http) and creates a union of the two. For +// four channels (file, etcd, server, and http) and creates a union of them. For // any new change seen, will run a sync against desired state and running state. If // no changes are seen to the configuration, will synchronize the last known desired // state every sync_frequency seconds. // Never returns. -func (kl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileChannel, serverChannel, httpChannel <-chan api.ContainerManifest, handler SyncHandler) { +func (kl *Kubelet) RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel <-chan []api.ContainerManifest, handler SyncHandler) { var lastFile, lastEtcd, lastHttp, lastServer []api.ContainerManifest for { select { - case manifest := <-fileChannel: - log.Printf("Got new manifest from file... %v", manifest) - lastFile = []api.ContainerManifest{manifest} + case manifests := <-fileChannel: + log.Printf("Got new configuration from file... %v", manifests) + lastFile = manifests case manifests := <-etcdChannel: log.Printf("Got new configuration from etcd... %v", manifests) lastEtcd = manifests - case manifest := <-httpChannel: - log.Printf("Got new manifest from external http... %v", manifest) - lastHttp = []api.ContainerManifest{manifest} - case manifest := <-serverChannel: - log.Printf("Got new manifest from our server... %v", manifest) - lastServer = []api.ContainerManifest{manifest} + case manifests := <-httpChannel: + log.Printf("Got new configuration from external http... %v", manifests) + lastHttp = manifests + case manifests := <-serverChannel: + log.Printf("Got new configuration from our server... %v", manifests) + lastServer = manifests case <-time.After(kl.SyncFrequency): } @@ -683,6 +690,7 @@ func (kl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileC manifests = append(manifests, lastEtcd...) manifests = append(manifests, lastHttp...) manifests = append(manifests, lastServer...) + err := handler.SyncManifests(manifests) if err != nil { log.Printf("Couldn't sync containers : %#v", err) diff --git a/pkg/kubelet/kubelet_server.go b/pkg/kubelet/kubelet_server.go index 04899cee38..34ae0c9bc4 100644 --- a/pkg/kubelet/kubelet_server.go +++ b/pkg/kubelet/kubelet_server.go @@ -29,7 +29,7 @@ import ( type KubeletServer struct { Kubelet kubeletInterface - UpdateChannel chan api.ContainerManifest + UpdateChannel chan []api.ContainerManifest } // kubeletInterface contains all the kubelet methods required by the server. @@ -52,20 +52,31 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } switch { - case u.Path == "/container": + case u.Path == "/container" || u.Path == "/containers": defer req.Body.Close() data, err := ioutil.ReadAll(req.Body) if err != nil { s.error(w, err) return } - var manifest api.ContainerManifest - err = yaml.Unmarshal(data, &manifest) - if err != nil { - s.error(w, err) - return + if u.Path == "/container" { + // This is to provide backward compatibility. It only supports a single manifest + var manifest api.ContainerManifest + err = yaml.Unmarshal(data, &manifest) + if err != nil { + s.error(w, err) + return + } + s.UpdateChannel <- []api.ContainerManifest{manifest} + } else if u.Path == "/containers" { + var manifests []api.ContainerManifest + err = yaml.Unmarshal(data, &manifests) + if err != nil { + s.error(w, err) + return + } + s.UpdateChannel <- manifests } - s.UpdateChannel <- manifest case u.Path == "/containerStats": container := u.Query().Get("container") if len(container) == 0 { diff --git a/pkg/kubelet/kubelet_server_test.go b/pkg/kubelet/kubelet_server_test.go index ba2a3531f4..c8c026d131 100644 --- a/pkg/kubelet/kubelet_server_test.go +++ b/pkg/kubelet/kubelet_server_test.go @@ -8,7 +8,6 @@ import ( "net/http" "net/http/httptest" "reflect" - "sync" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -33,37 +32,9 @@ func (fk *fakeKubelet) GetContainerStats(name string) (*api.ContainerStats, erro return fk.statsFunc(name) } -// If we made everything distribute a list of ContainerManifests, we could just use -// channelReader. -type channelReaderSingle struct { - list []api.ContainerManifest - wg sync.WaitGroup -} - -func startReadingSingle(channel <-chan api.ContainerManifest) *channelReaderSingle { - cr := &channelReaderSingle{} - cr.wg.Add(1) - go func() { - for { - manifest, ok := <-channel - if !ok { - break - } - cr.list = append(cr.list, manifest) - } - cr.wg.Done() - }() - return cr -} - -func (cr *channelReaderSingle) GetList() []api.ContainerManifest { - cr.wg.Wait() - return cr.list -} - type serverTestFramework struct { - updateChan chan api.ContainerManifest - updateReader *channelReaderSingle + updateChan chan []api.ContainerManifest + updateReader *channelReader serverUnderTest *KubeletServer fakeKubelet *fakeKubelet testHttpServer *httptest.Server @@ -71,9 +42,9 @@ type serverTestFramework struct { func makeServerTest() *serverTestFramework { fw := &serverTestFramework{ - updateChan: make(chan api.ContainerManifest), + updateChan: make(chan []api.ContainerManifest), } - fw.updateReader = startReadingSingle(fw.updateChan) + fw.updateReader = startReading(fw.updateChan) fw.fakeKubelet = &fakeKubelet{} fw.serverUnderTest = &KubeletServer{ Kubelet: fw.fakeKubelet, @@ -91,8 +62,10 @@ func readResp(resp *http.Response) (string, error) { func TestContainer(t *testing.T) { fw := makeServerTest() - expected := api.ContainerManifest{Id: "test_manifest"} - body := bytes.NewBuffer([]byte(util.MakeJSONString(expected))) + expected := []api.ContainerManifest{ + {Id: "test_manifest"}, + } + body := bytes.NewBuffer([]byte(util.MakeJSONString(expected[0]))) // Only send a single ContainerManifest resp, err := http.Post(fw.testHttpServer.URL+"/container", "application/json", body) if err != nil { t.Errorf("Post returned: %v", err) @@ -108,6 +81,28 @@ func TestContainer(t *testing.T) { } } +func TestContainers(t *testing.T) { + fw := makeServerTest() + expected := []api.ContainerManifest{ + {Id: "test_manifest_1"}, + {Id: "test_manifest_2"}, + } + body := bytes.NewBuffer([]byte(util.MakeJSONString(expected))) + resp, err := http.Post(fw.testHttpServer.URL+"/containers", "application/json", body) + if err != nil { + t.Errorf("Post returned: %v", err) + } + resp.Body.Close() + close(fw.updateChan) + received := fw.updateReader.GetList() + if len(received) != 1 { + t.Errorf("Expected 1 update, but got %v", len(received)) + } + if !reflect.DeepEqual(expected, received[0]) { + t.Errorf("Expected %#v, but got %#v", expected, received[0]) + } +} + func TestContainerInfo(t *testing.T) { fw := makeServerTest() expected := "good container info string" diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 9116d9ce89..f966489e33 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -16,7 +16,6 @@ limitations under the License. package kubelet import ( - "bytes" "encoding/json" "fmt" "io/ioutil" @@ -730,164 +729,120 @@ func TestMakePortsAndBindings(t *testing.T) { func TestExtractFromNonExistentFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - lastData := []byte{1, 2, 3} - data, err := kubelet.extractFromFile(lastData, "/some/fake/file", changeChannel) + changeChannel := make(chan []api.ContainerManifest) + reader := startReading(changeChannel) + + err := kubelet.extractFromFile("/some/fake/file", changeChannel) + close(changeChannel) + if err == nil { t.Error("Unexpected non-error.") } - if !bytes.Equal(data, lastData) { - t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) + + list := reader.GetList() + if len(list) != 0 { + t.Errorf("Unexpected list: %#v", list) } } func TestExtractFromBadDataFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - lastData := []byte{1, 2, 3} + changeChannel := make(chan []api.ContainerManifest) + reader := startReading(changeChannel) + + badData := []byte{1, 2, 3} file, err := ioutil.TempFile("", "foo") expectNoError(t, err) name := file.Name() file.Close() - ioutil.WriteFile(name, lastData, 0755) - data, err := kubelet.extractFromFile(lastData, name, changeChannel) + ioutil.WriteFile(name, badData, 0755) + err = kubelet.extractFromFile(name, changeChannel) + close(changeChannel) if err == nil { t.Error("Unexpected non-error.") } - if !bytes.Equal(data, lastData) { - t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) + + list := reader.GetList() + if len(list) != 0 { + t.Errorf("Unexpected list: %#v", list) } } -func TestExtractFromSameDataFile(t *testing.T) { +func TestExtractFromValidDataFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - manifest := api.ContainerManifest{ - Id: "foo", + changeChannel := make(chan []api.ContainerManifest) + reader := startReading(changeChannel) + + manifests := []api.ContainerManifest{ + {Id: "bar"}, } - lastData, err := json.Marshal(manifest) + data, err := json.Marshal(manifests[0]) // Right now, files only support a single manifest expectNoError(t, err) file, err := ioutil.TempFile("", "foo") expectNoError(t, err) name := file.Name() expectNoError(t, file.Close()) - ioutil.WriteFile(name, lastData, 0755) - data, err := kubelet.extractFromFile(lastData, name, changeChannel) - - expectNoError(t, err) - if !bytes.Equal(data, lastData) { - t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) - } -} - -func TestExtractFromChangedDataFile(t *testing.T) { - kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - reader := startReadingSingle(changeChannel) - oldManifest := api.ContainerManifest{ - Id: "foo", - } - newManifest := api.ContainerManifest{ - Id: "bar", - } - lastData, err := json.Marshal(oldManifest) - expectNoError(t, err) - newData, err := json.Marshal(newManifest) - expectNoError(t, err) - file, err := ioutil.TempFile("", "foo") - expectNoError(t, err) - name := file.Name() - expectNoError(t, file.Close()) - ioutil.WriteFile(name, newData, 0755) - data, err := kubelet.extractFromFile(lastData, name, changeChannel) + ioutil.WriteFile(name, data, 0755) + err = kubelet.extractFromFile(name, changeChannel) close(changeChannel) expectNoError(t, err) - if !bytes.Equal(data, newData) { - t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) - } read := reader.GetList() if len(read) != 1 { t.Errorf("Unexpected channel traffic: %#v", read) } - if !reflect.DeepEqual(read[0], newManifest) { - t.Errorf("Unexpected difference. Expected %#v, got %#v", newManifest, read[0]) + if !reflect.DeepEqual(read[0], manifests) { + t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read[0]) } } func TestExtractFromHttpBadness(t *testing.T) { kubelet := Kubelet{} - lastData := []byte{1, 2, 3} - changeChannel := make(chan api.ContainerManifest) - data, err := kubelet.extractFromHTTP(lastData, "http://localhost:12345", changeChannel) + changeChannel := make(chan []api.ContainerManifest) + reader := startReading(changeChannel) + + err := kubelet.extractFromHTTP("http://localhost:12345", changeChannel) if err == nil { t.Error("Unexpected non-error.") } - if !bytes.Equal(lastData, data) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) + close(changeChannel) + list := reader.GetList() + + if len(list) != 0 { + t.Errorf("Unexpected list: %#v", list) } } -func TestExtractFromHttpNoChange(t *testing.T) { +func TestExtractFromHttp(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) + changeChannel := make(chan []api.ContainerManifest) + reader := startReading(changeChannel) - manifest := api.ContainerManifest{ - Id: "foo", + manifests := []api.ContainerManifest{ + {Id: "foo"}, } - lastData, err := json.Marshal(manifest) + data, err := json.Marshal(manifests) fakeHandler := util.FakeHandler{ StatusCode: 200, - ResponseBody: string(lastData), + ResponseBody: string(data), } testServer := httptest.NewServer(&fakeHandler) - data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel) + err = kubelet.extractFromHTTP(testServer.URL, changeChannel) if err != nil { t.Errorf("Unexpected error: %#v", err) } - if !bytes.Equal(lastData, data) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) - } -} - -func TestExtractFromHttpChanges(t *testing.T) { - kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - reader := startReadingSingle(changeChannel) - - manifest := api.ContainerManifest{ - Id: "foo", - } - newManifest := api.ContainerManifest{ - Id: "bar", - } - lastData, _ := json.Marshal(manifest) - newData, _ := json.Marshal(newManifest) - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(newData), - } - testServer := httptest.NewServer(&fakeHandler) - - data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel) close(changeChannel) read := reader.GetList() - if err != nil { - t.Errorf("Unexpected error: %#v", err) - } if len(read) != 1 { t.Errorf("Unexpected list: %#v", read) } - if !bytes.Equal(newData, data) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) - } - if !reflect.DeepEqual(newManifest, read[0]) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", newManifest, read[0]) + if !reflect.DeepEqual(manifests, read[0]) { + t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0]) } }