Merge pull request #3015 from brendandburns/fix

Remove HTTP Server support for pushing pods onto the kubelet.
pull/6/head
Daniel Smith 2014-12-17 17:30:14 -08:00
commit 8379966ac5
3 changed files with 9 additions and 226 deletions

View File

@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
@ -36,22 +35,20 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/ghodss/yaml"
"github.com/golang/glog"
"github.com/google/cadvisor/info"
)
// Server is a http.Handler which exposes kubelet functionality over HTTP.
type Server struct {
host HostInterface
updates chan<- interface{}
mux *http.ServeMux
host HostInterface
mux *http.ServeMux
}
// ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, address net.IP, port uint, enableDebuggingHandlers bool) {
func ListenAndServeKubeletServer(host HostInterface, address net.IP, port uint, enableDebuggingHandlers bool) {
glog.V(1).Infof("Starting to listen on %s:%d", address, port)
handler := NewServer(host, updates, enableDebuggingHandlers)
handler := NewServer(host, enableDebuggingHandlers)
s := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: &handler,
@ -59,7 +56,6 @@ func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{},
WriteTimeout: 5 * time.Minute,
MaxHeaderBytes: 1 << 20,
}
updates <- PodUpdate{[]api.BoundPod{}, SET, ServerSource}
glog.Fatal(s.ListenAndServe())
}
@ -77,11 +73,10 @@ type HostInterface interface {
}
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
func NewServer(host HostInterface, updates chan<- interface{}, enableDebuggingHandlers bool) Server {
func NewServer(host HostInterface, enableDebuggingHandlers bool) Server {
server := Server{
host: host,
updates: updates,
mux: http.NewServeMux(),
host: host,
mux: http.NewServeMux(),
}
server.InstallDefaultHandlers()
if enableDebuggingHandlers {
@ -102,9 +97,6 @@ func (s *Server) InstallDefaultHandlers() {
// InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers() {
// ToDo: /container, /run, and /containers aren't debugging options, should probably be handled separately
s.mux.HandleFunc("/container", s.handleContainer)
s.mux.HandleFunc("/containers", s.handleContainers)
s.mux.HandleFunc("/run/", s.handleRun)
s.mux.HandleFunc("/logs/", s.handleLogs)
@ -116,60 +108,6 @@ func (s *Server) error(w http.ResponseWriter, err error) {
http.Error(w, fmt.Sprintf("Internal Error: %v", err), http.StatusInternalServerError)
}
// handleContainer handles container requests against the Kubelet.
func (s *Server) handleContainer(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
data, err := ioutil.ReadAll(req.Body)
if err != nil {
s.error(w, err)
return
}
// This is to provide backward compatibility. It only supports a single manifest
var pod api.BoundPod
var containerManifest api.ContainerManifest
err = yaml.Unmarshal(data, &containerManifest)
if err != nil {
s.error(w, err)
return
}
pod.Name = containerManifest.ID
pod.UID = containerManifest.UUID
pod.Spec.Containers = containerManifest.Containers
pod.Spec.Volumes = containerManifest.Volumes
pod.Spec.RestartPolicy = containerManifest.RestartPolicy
//TODO: sha1 of manifest?
if pod.Name == "" {
pod.Name = "1"
}
if pod.UID == "" {
pod.UID = "1"
}
s.updates <- PodUpdate{[]api.BoundPod{pod}, SET, ServerSource}
}
// handleContainers handles containers requests against the Kubelet.
func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
data, err := ioutil.ReadAll(req.Body)
if err != nil {
s.error(w, err)
return
}
var specs []api.PodSpec
err = yaml.Unmarshal(data, &specs)
if err != nil {
s.error(w, err)
return
}
pods := make([]api.BoundPod, len(specs))
for i := range specs {
pods[i].Name = fmt.Sprintf("%d", i+1)
pods[i].Spec = specs[i]
}
s.updates <- PodUpdate{pods, SET, ServerSource}
}
// handleContainerLogs handles containerLogs request against the Kubelet
func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()

View File

@ -17,7 +17,6 @@ limitations under the License.
package kubelet
import (
"bytes"
"encoding/json"
"fmt"
"io"
@ -90,7 +89,7 @@ func newServerTest() *serverTestFramework {
}
fw.updateReader = startReading(fw.updateChan)
fw.fakeKubelet = &fakeKubelet{}
server := NewServer(fw.fakeKubelet, fw.updateChan, true)
server := NewServer(fw.fakeKubelet, true)
fw.serverUnderTest = &server
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
return fw
@ -111,160 +110,6 @@ func readResp(resp *http.Response) (string, error) {
return string(body), err
}
func TestContainer(t *testing.T) {
fw := newServerTest()
expected := []api.ContainerManifest{
{
ID: "test_manifest",
UUID: "value",
Containers: []api.Container{
{
Name: "container",
},
},
Volumes: []api.Volume{
{
Name: "test",
},
},
RestartPolicy: api.RestartPolicy{
Never: &api.RestartPolicyNever{},
},
},
}
body := bytes.NewBuffer([]byte(encodeJSON(expected[0]))) // Only send a single ContainerManifest
resp, err := http.Post(fw.testHTTPServer.URL+"/container", "application/json", body)
if err != nil {
t.Errorf("Post returned: %v", err)
}
resp.Body.Close()
close(fw.updateChan)
received := fw.updateReader.GetList()
if len(received) != 1 {
t.Errorf("Expected 1 manifest, but got %v", len(received))
}
expectedPods := []api.BoundPod{
{
ObjectMeta: api.ObjectMeta{
Name: "test_manifest",
UID: "value",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "container",
},
},
Volumes: []api.Volume{
{
Name: "test",
},
},
RestartPolicy: api.RestartPolicy{
Never: &api.RestartPolicyNever{},
},
},
},
}
if !reflect.DeepEqual(expectedPods, received[0]) {
t.Errorf("Expected %#v, but got %#v", expectedPods, received[0])
}
}
func TestContainers(t *testing.T) {
fw := newServerTest()
expected := []api.ContainerManifest{
{
ID: "test_manifest_1",
Containers: []api.Container{
{
Name: "container",
},
},
Volumes: []api.Volume{
{
Name: "test",
},
},
RestartPolicy: api.RestartPolicy{
Never: &api.RestartPolicyNever{},
},
},
{
ID: "test_manifest_2",
Containers: []api.Container{
{
Name: "container2",
},
},
Volumes: []api.Volume{
{
Name: "test2",
},
},
RestartPolicy: api.RestartPolicy{
Never: &api.RestartPolicyNever{},
},
},
}
body := bytes.NewBuffer([]byte(encodeJSON(expected)))
resp, err := http.Post(fw.testHTTPServer.URL+"/containers", "application/json", body)
if err != nil {
t.Errorf("Post returned: %v", err)
}
resp.Body.Close()
close(fw.updateChan)
received := fw.updateReader.GetList()
if len(received) != 1 {
t.Errorf("Expected 1 update, but got %v", len(received))
}
expectedPods := []api.BoundPod{
{
ObjectMeta: api.ObjectMeta{
Name: "1",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "container",
},
},
Volumes: []api.Volume{
{
Name: "test",
},
},
RestartPolicy: api.RestartPolicy{
Never: &api.RestartPolicyNever{},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "2",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "container2",
},
},
Volumes: []api.Volume{
{
Name: "test2",
},
},
RestartPolicy: api.RestartPolicy{
Never: &api.RestartPolicyNever{},
},
},
},
}
if !reflect.DeepEqual(expectedPods, received[0]) {
t.Errorf("Expected %#v, but got %#v", expectedPods, received[0])
}
}
func TestPodInfo(t *testing.T) {
fw := newServerTest()
expected := api.PodInfo{

View File

@ -194,7 +194,7 @@ func startKubelet(k *kubelet.Kubelet, cfg *config.PodConfig, kc *KubeletConfig)
// start the kubelet server
if kc.EnableServer {
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, cfg.Channel(kubelet.ServerSource), net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers)
kubelet.ListenAndServeKubeletServer(k, net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers)
}, 0)
}
}