From fd8741edf2521d5f5df3a5794b030018f3034dd9 Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Wed, 20 Aug 2014 13:24:51 -0500 Subject: [PATCH] Refactor kubelet to use http.ServeMux --- cmd/integration/integration.go | 4 +- cmd/kubelet/kubelet.go | 2 +- pkg/kubelet/server.go | 234 ++++++++++++++++++++------------- pkg/kubelet/server_test.go | 6 +- 4 files changed, 144 insertions(+), 102 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index c330646119..f0a906842e 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -118,7 +118,7 @@ func startComponents(manifestURL string) (apiServerURL string) { myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1) go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0) go util.Forever(func() { - kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), http.DefaultServeMux, "localhost", 10250) + kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), "localhost", 10250) }, 0) // Kubelet (machine) @@ -129,7 +129,7 @@ func startComponents(manifestURL string) (apiServerURL string) { otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2) go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0) go util.Forever(func() { - kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251) + kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), "localhost", 10251) }, 0) return apiServer.URL diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 7ac463858c..b658121581 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -162,7 +162,7 @@ func main() { // start the kubelet server if *enableServer { go util.Forever(func() { - kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), http.DefaultServeMux, *address, *port) + kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), *address, *port) }, 0) } diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 2b0f93f848..57383b09d0 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -41,16 +41,13 @@ import ( type Server struct { host HostInterface updates chan<- interface{} - handler http.Handler + mux *http.ServeMux } -func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, delegate http.Handler, address string, port uint) { +// ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet +func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, address string, port uint) { glog.Infof("Starting to listen on %s:%d", address, port) - handler := Server{ - host: host, - updates: updates, - handler: delegate, - } + handler := NewServer(host, updates) s := &http.Server{ Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)), Handler: &handler, @@ -71,10 +68,143 @@ type HostInterface interface { ServeLogs(w http.ResponseWriter, req *http.Request) } +// NewServer initializes and configures a kubelet.Server object to handle HTTP requests +func NewServer(host HostInterface, updates chan<- interface{}) Server { + server := Server{ + host: host, + updates: updates, + mux: http.NewServeMux(), + } + server.InstallDefaultHandlers() + return server +} + +// InstallDefaultHandlers registers the set of supported HTTP request patterns with the mux +func (s *Server) InstallDefaultHandlers() { + s.mux.HandleFunc("/healthz", s.handleHealth) + s.mux.HandleFunc("/container", s.handleContainer) + s.mux.HandleFunc("/containers", s.handleContainers) + s.mux.HandleFunc("/podInfo", s.handlePodInfo) + s.mux.HandleFunc("/stats/", s.handleStats) + s.mux.HandleFunc("/logs/", s.handleLogs) + s.mux.HandleFunc("/spec/", s.handleSpec) +} + +// error serializes an error object into an HTTP response func (s *Server) error(w http.ResponseWriter, err error) { http.Error(w, fmt.Sprintf("Internal Error: %v", err), http.StatusInternalServerError) } +// handleHealth handles health checking requests against the Kubelet +func (s *Server) handleHealth(w http.ResponseWriter, req *http.Request) { +} + +// handleContainer handles container requests against the Kubelet +func (s *Server) handleContainer(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + data, err := ioutil.ReadAll(req.Body) + if err != nil { + s.error(w, err) + return + } + // This is to provide backward compatibility. It only supports a single manifest + var pod Pod + err = yaml.Unmarshal(data, &pod.Manifest) + if err != nil { + s.error(w, err) + return + } + //TODO: sha1 of manifest? + pod.Name = "1" + s.updates <- PodUpdate{[]Pod{pod}, SET} + +} + +// handleContainers handles containers requests against the Kubelet +func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + data, err := ioutil.ReadAll(req.Body) + if err != nil { + s.error(w, err) + return + } + var manifests []api.ContainerManifest + err = yaml.Unmarshal(data, &manifests) + if err != nil { + s.error(w, err) + return + } + pods := make([]Pod, len(manifests)) + for i := range manifests { + pods[i].Name = fmt.Sprintf("%d", i+1) + pods[i].Manifest = manifests[i] + } + s.updates <- PodUpdate{pods, SET} + +} + +// handlePodInfo handles podInfo requests against the Kubelet +func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request) { + u, err := url.ParseRequestURI(req.RequestURI) + if err != nil { + s.error(w, err) + return + } + podID := u.Query().Get("podID") + if len(podID) == 0 { + w.WriteHeader(http.StatusBadRequest) + http.Error(w, "Missing 'podID=' query entry.", http.StatusBadRequest) + return + } + // TODO: backwards compatibility with existing API, needs API change + podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"}) + info, err := s.host.GetPodInfo(podFullName) + if err == ErrNoContainersInPod { + http.Error(w, "Pod does not exist", http.StatusNotFound) + return + } + if err != nil { + s.error(w, err) + return + } + data, err := json.Marshal(info) + if err != nil { + s.error(w, err) + return + } + w.WriteHeader(http.StatusOK) + w.Header().Add("Content-type", "application/json") + w.Write(data) +} + +// handleStats handles stats requests against the Kubelet +func (s *Server) handleStats(w http.ResponseWriter, req *http.Request) { + s.serveStats(w, req) +} + +// handleLogs handles logs requests against the Kubelet +func (s *Server) handleLogs(w http.ResponseWriter, req *http.Request) { + s.host.ServeLogs(w, req) +} + +// handleSpec handles spec requests against the Kubelet +func (s *Server) handleSpec(w http.ResponseWriter, req *http.Request) { + info, err := s.host.GetMachineInfo() + if err != nil { + s.error(w, err) + return + } + data, err := json.Marshal(info) + if err != nil { + s.error(w, err) + return + } + w.Header().Add("Content-type", "application/json") + w.Write(data) + +} + +// ServeHTTP responds to HTTP requests on the Kubelet func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { defer httplog.MakeLogged(req, &w).StacktraceWhen( httplog.StatusIsNot( @@ -82,96 +212,10 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { http.StatusNotFound, ), ).Log() - - u, err := url.ParseRequestURI(req.RequestURI) - if err != nil { - s.error(w, err) - return - } - // TODO: use an http.ServeMux instead of a switch. - switch { - case u.Path == "/container" || u.Path == "/containers": - defer req.Body.Close() - data, err := ioutil.ReadAll(req.Body) - if err != nil { - s.error(w, err) - return - } - if u.Path == "/container" { - // This is to provide backward compatibility. It only supports a single manifest - var pod Pod - err = yaml.Unmarshal(data, &pod.Manifest) - if err != nil { - s.error(w, err) - return - } - //TODO: sha1 of manifest? - pod.Name = "1" - s.updates <- PodUpdate{[]Pod{pod}, SET} - } else if u.Path == "/containers" { - var manifests []api.ContainerManifest - err = yaml.Unmarshal(data, &manifests) - if err != nil { - s.error(w, err) - return - } - pods := make([]Pod, len(manifests)) - for i := range manifests { - pods[i].Name = fmt.Sprintf("%d", i+1) - pods[i].Manifest = manifests[i] - } - s.updates <- PodUpdate{pods, SET} - } - case u.Path == "/podInfo": - podID := u.Query().Get("podID") - if len(podID) == 0 { - w.WriteHeader(http.StatusBadRequest) - http.Error(w, "Missing 'podID=' query entry.", http.StatusBadRequest) - return - } - // TODO: backwards compatibility with existing API, needs API change - podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"}) - info, err := s.host.GetPodInfo(podFullName) - if err == ErrNoContainersInPod { - http.Error(w, "Pod does not exist", http.StatusNotFound) - return - } - if err != nil { - s.error(w, err) - return - } - data, err := json.Marshal(info) - if err != nil { - s.error(w, err) - return - } - w.WriteHeader(http.StatusOK) - w.Header().Add("Content-type", "application/json") - w.Write(data) - case strings.HasPrefix(u.Path, "/stats"): - s.serveStats(w, req) - case strings.HasPrefix(u.Path, "/spec"): - info, err := s.host.GetMachineInfo() - if err != nil { - s.error(w, err) - return - } - data, err := json.Marshal(info) - if err != nil { - s.error(w, err) - return - } - w.Header().Add("Content-type", "application/json") - w.Write(data) - case strings.HasPrefix(u.Path, "/logs/"): - s.host.ServeLogs(w, req) - default: - if s.handler != nil { - s.handler.ServeHTTP(w, req) - } - } + s.mux.ServeHTTP(w, req) } +// serveStats implements stats logic func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) { // /stats// components := strings.Split(strings.TrimPrefix(path.Clean(req.URL.Path), "/"), "/") diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index 9ea5f92f20..99a542e00b 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -76,10 +76,8 @@ func makeServerTest() *serverTestFramework { } fw.updateReader = startReading(fw.updateChan) fw.fakeKubelet = &fakeKubelet{} - fw.serverUnderTest = &Server{ - host: fw.fakeKubelet, - updates: fw.updateChan, - } + server := NewServer(fw.fakeKubelet, fw.updateChan) + fw.serverUnderTest = &server fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest) return fw }