Merge pull request #3270 from smarterclayton/kubelet_fixes

Cleanup to client initialization in Kubelet
pull/6/head
Eric Tune 2015-01-07 15:55:38 -08:00
commit 0d4d1e28b2
13 changed files with 139 additions and 111 deletions

1
.gitignore vendored
View File

@ -24,6 +24,7 @@ Session.vim
# Go test binaries # Go test binaries
*.test *.test
/hack/.test-cmd-auth
# Mercurial files # Mercurial files
**/.hg **/.hg

View File

@ -191,11 +191,11 @@ func startComponents(manifestURL string) (apiServerURL string) {
minionController.Run(10 * time.Second) minionController.Run(10 * time.Second)
// Kubelet (localhost) // Kubelet (localhost)
standalone.SimpleRunKubelet(etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250) standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250)
// Kubelet (machine) // Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both // Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule. // have a place they can schedule.
standalone.SimpleRunKubelet(etcdClient, &fakeDocker2, machineList[1], testRootDir2, "", "127.0.0.1", 10251) standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker2, machineList[1], testRootDir2, "", "127.0.0.1", 10251)
return apiServer.URL return apiServer.URL
} }

View File

@ -100,10 +100,13 @@ func main() {
glog.Info(err) glog.Info(err)
} }
client, err := standalone.GetAPIServerClient(*authPath, apiServerList)
if err != nil && len(apiServerList) > 0 {
glog.Warningf("No API client: %v", err)
}
kcfg := standalone.KubeletConfig{ kcfg := standalone.KubeletConfig{
Address: address, Address: address,
AuthPath: *authPath,
ApiServerList: apiServerList,
AllowPrivileged: *allowPrivileged, AllowPrivileged: *allowPrivileged,
HostnameOverride: *hostnameOverride, HostnameOverride: *hostnameOverride,
RootDirectory: *rootDirectory, RootDirectory: *rootDirectory,
@ -125,6 +128,7 @@ func main() {
EnableServer: *enableServer, EnableServer: *enableServer,
EnableDebuggingHandlers: *enableDebuggingHandlers, EnableDebuggingHandlers: *enableDebuggingHandlers,
DockerClient: util.ConnectToDockerOrDie(*dockerEndpoint), DockerClient: util.ConnectToDockerOrDie(*dockerEndpoint),
KubeClient: client,
EtcdClient: kubelet.EtcdClientOrDie(etcdServerList, *etcdConfigFile), EtcdClient: kubelet.EtcdClientOrDie(etcdServerList, *etcdConfigFile),
} }

View File

@ -52,7 +52,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string
standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory) standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)
dockerClient := util.ConnectToDockerOrDie(*dockerEndpoint) dockerClient := util.ConnectToDockerOrDie(*dockerEndpoint)
standalone.SimpleRunKubelet(etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250) standalone.SimpleRunKubelet(cl, etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250)
} }
func newApiClient(addr string, port int) *client.Client { func newApiClient(addr string, port int) *client.Client {

View File

@ -113,6 +113,8 @@ sudo "${GO_OUT}/kubelet" \
--etcd_servers="http://127.0.0.1:4001" \ --etcd_servers="http://127.0.0.1:4001" \
--hostname_override="127.0.0.1" \ --hostname_override="127.0.0.1" \
--address="127.0.0.1" \ --address="127.0.0.1" \
--api_servers="${API_HOST}:${API_PORT}" \
--auth_path="${KUBE_ROOT}/hack/.test-cmd-auth" \
--port="$KUBELET_PORT" >"${KUBELET_LOG}" 2>&1 & --port="$KUBELET_PORT" >"${KUBELET_LOG}" 2>&1 &
KUBELET_PID=$! KUBELET_PID=$!

View File

@ -58,6 +58,8 @@ kube::log::status "Starting kubelet"
--etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \ --etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \
--hostname_override="127.0.0.1" \ --hostname_override="127.0.0.1" \
--address="127.0.0.1" \ --address="127.0.0.1" \
--api_servers="${API_HOST}:${API_PORT}" \
--auth_path="${KUBE_ROOT}/hack/.test-cmd-auth" \
--port="$KUBELET_PORT" 1>&2 & --port="$KUBELET_PORT" 1>&2 &
KUBELET_PID=$! KUBELET_PID=$!

View File

@ -54,17 +54,21 @@ func (h *WatchHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request)
return h.selfLinker.SetSelfLink(obj, newURL.String()) return h.selfLinker.SetSelfLink(obj, newURL.String())
} }
func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion string) { func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion string, err error) {
if s, err := labels.ParseSelector(query.Get("labels")); err != nil { s, perr := labels.ParseSelector(query.Get("labels"))
label = labels.Everything() if perr != nil {
} else { err = perr
label = s return
} }
if s, err := labels.ParseSelector(query.Get("fields")); err != nil { label = s
field = labels.Everything()
} else { s, perr = labels.ParseSelector(query.Get("fields"))
field = s if perr != nil {
err = perr
return
} }
field = s
resourceVersion = query.Get("resourceVersion") resourceVersion = query.Get("resourceVersion")
return return
} }
@ -95,7 +99,11 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return return
} }
if watcher, ok := storage.(ResourceWatcher); ok { if watcher, ok := storage.(ResourceWatcher); ok {
label, field, resourceVersion := getWatchParams(req.URL.Query()) label, field, resourceVersion, err := getWatchParams(req.URL.Query())
if err != nil {
errorJSON(err, h.codec, w)
return
}
watching, err := watcher.Watch(ctx, label, field, resourceVersion) watching, err := watcher.Watch(ctx, label, field, resourceVersion)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) errorJSON(err, h.codec, w)

View File

@ -1132,6 +1132,8 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
// GetKubeletContainerLogs returns logs from the container // GetKubeletContainerLogs returns logs from the container
// The second parameter of GetPodInfo and FindPodContainer methods represents pod UUID, which is allowed to be blank // The second parameter of GetPodInfo and FindPodContainer methods represents pod UUID, which is allowed to be blank
// TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
// or all of them.
func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error { func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error {
_, err := kl.GetPodInfo(podFullName, "") _, err := kl.GetPodInfo(podFullName, "")
if err == dockertools.ErrNoContainersInPod { if err == dockertools.ErrNoContainersInPod {
@ -1153,6 +1155,18 @@ func (kl *Kubelet) GetBoundPods() ([]api.BoundPod, error) {
return kl.pods, nil return kl.pods, nil
} }
// GetPodFullName provides the first pod that matches namespace and name, or false
// if no such pod can be found.
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) {
for i := range kl.pods {
pod := &kl.pods[i]
if pod.Namespace == namespace && pod.Name == name {
return pod, true
}
}
return nil, false
}
// GetPodInfo returns information from Docker about the containers in a pod // GetPodInfo returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) { func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) {
var manifest api.PodSpec var manifest api.PodSpec

View File

@ -33,7 +33,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/google/cadvisor/info" "github.com/google/cadvisor/info"
@ -66,6 +65,7 @@ type HostInterface interface {
GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error) GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
GetMachineInfo() (*info.MachineInfo, error) GetMachineInfo() (*info.MachineInfo, error)
GetBoundPods() ([]api.BoundPod, error) GetBoundPods() ([]api.BoundPod, error)
GetPodByName(namespace, name string) (*api.BoundPod, bool)
GetPodInfo(name, uuid string) (api.PodInfo, error) GetPodInfo(name, uuid string) (api.PodInfo, error)
RunInContainer(name, uuid, container string, cmd []string) ([]byte, error) RunInContainer(name, uuid, container string, cmd []string) ([]byte, error)
GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error
@ -146,13 +146,11 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
follow, _ := strconv.ParseBool(uriValues.Get("follow")) follow, _ := strconv.ParseBool(uriValues.Get("follow"))
tail := uriValues.Get("tail") tail := uriValues.Get("tail")
podFullName := GetPodFullName(&api.BoundPod{ pod, ok := s.host.GetPodByName(podNamespace, podID)
ObjectMeta: api.ObjectMeta{ if !ok {
Name: podID, http.Error(w, "Pod does not exist", http.StatusNotFound)
Namespace: podNamespace, return
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, }
},
})
fw := FlushWriter{writer: w} fw := FlushWriter{writer: w}
if flusher, ok := fw.writer.(http.Flusher); ok { if flusher, ok := fw.writer.(http.Flusher); ok {
@ -162,7 +160,7 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
} }
w.Header().Set("Transfer-Encoding", "chunked") w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
err = s.host.GetKubeletContainerLogs(podFullName, containerName, tail, follow, &fw, &fw) err = s.host.GetKubeletContainerLogs(GetPodFullName(pod), containerName, tail, follow, &fw, &fw)
if err != nil { if err != nil {
s.error(w, err) s.error(w, err)
return return
@ -217,19 +215,12 @@ func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request, version
http.Error(w, "Missing 'podNamespace=' query entry.", http.StatusBadRequest) http.Error(w, "Missing 'podNamespace=' query entry.", http.StatusBadRequest)
return return
} }
// TODO: backwards compatibility with existing API, needs API change pod, ok := s.host.GetPodByName(podNamespace, podID)
podFullName := GetPodFullName(&api.BoundPod{ if !ok {
ObjectMeta: api.ObjectMeta{ http.Error(w, "Pod does not exist", http.StatusNotFound)
Name: podID,
Namespace: podNamespace,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
info, err := s.host.GetPodInfo(podFullName, podUUID)
if err == dockertools.ErrNoContainersInPod {
http.Error(w, "api.BoundPod does not exist", http.StatusNotFound)
return return
} }
info, err := s.host.GetPodInfo(GetPodFullName(pod), podUUID)
if err != nil { if err != nil {
s.error(w, err) s.error(w, err)
return return
@ -293,15 +284,13 @@ func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) {
http.Error(w, "Unexpected path for command running", http.StatusBadRequest) http.Error(w, "Unexpected path for command running", http.StatusBadRequest)
return return
} }
podFullName := GetPodFullName(&api.BoundPod{ pod, ok := s.host.GetPodByName(podNamespace, podID)
ObjectMeta: api.ObjectMeta{ if !ok {
Name: podID, http.Error(w, "Pod does not exist", http.StatusNotFound)
Namespace: podNamespace, return
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, }
},
})
command := strings.Split(u.Query().Get("cmd"), " ") command := strings.Split(u.Query().Get("cmd"), " ")
data, err := s.host.RunInContainer(podFullName, uuid, container, command) data, err := s.host.RunInContainer(GetPodFullName(pod), uuid, container, command)
if err != nil { if err != nil {
s.error(w, err) s.error(w, err)
return return
@ -344,24 +333,20 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
// TODO(monnand) Implement this // TODO(monnand) Implement this
errors.New("pod level status currently unimplemented") errors.New("pod level status currently unimplemented")
case 3: case 3:
// Backward compatibility without uuid information // Backward compatibility without uuid information, does not support namespace
podFullName := GetPodFullName(&api.BoundPod{ pod, ok := s.host.GetPodByName(api.NamespaceDefault, components[1])
ObjectMeta: api.ObjectMeta{ if !ok {
Name: components[1], http.Error(w, "Pod does not exist", http.StatusNotFound)
Namespace: api.NamespaceDefault, return
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, }
}, stats, err = s.host.GetContainerInfo(GetPodFullName(pod), "", components[2], &query)
})
stats, err = s.host.GetContainerInfo(podFullName, "", components[2], &query)
case 5: case 5:
podFullName := GetPodFullName(&api.BoundPod{ pod, ok := s.host.GetPodByName(components[1], components[2])
ObjectMeta: api.ObjectMeta{ if !ok {
Name: components[2], http.Error(w, "Pod does not exist", http.StatusNotFound)
Namespace: components[1], return
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, }
}, stats, err = s.host.GetContainerInfo(GetPodFullName(pod), components[3], components[4], &query)
})
stats, err = s.host.GetContainerInfo(podFullName, components[3], components[4], &query)
default: default:
http.Error(w, "unknown resource.", http.StatusNotFound) http.Error(w, "unknown resource.", http.StatusNotFound)
return return

View File

@ -33,6 +33,7 @@ import (
) )
type fakeKubelet struct { type fakeKubelet struct {
podByNameFunc func(namespace, name string) (*api.BoundPod, bool)
infoFunc func(name string) (api.PodInfo, error) infoFunc func(name string) (api.PodInfo, error)
containerInfoFunc func(podFullName, uid, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) containerInfoFunc func(podFullName, uid, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error) rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
@ -43,6 +44,10 @@ type fakeKubelet struct {
containerLogsFunc func(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error containerLogsFunc func(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error
} }
func (fk *fakeKubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) {
return fk.podByNameFunc(namespace, name)
}
func (fk *fakeKubelet) GetPodInfo(name, uuid string) (api.PodInfo, error) { func (fk *fakeKubelet) GetPodInfo(name, uuid string) (api.PodInfo, error) {
return fk.infoFunc(name) return fk.infoFunc(name)
} }
@ -88,7 +93,19 @@ func newServerTest() *serverTestFramework {
updateChan: make(chan interface{}), updateChan: make(chan interface{}),
} }
fw.updateReader = startReading(fw.updateChan) fw.updateReader = startReading(fw.updateChan)
fw.fakeKubelet = &fakeKubelet{} fw.fakeKubelet = &fakeKubelet{
podByNameFunc: func(namespace, name string) (*api.BoundPod, bool) {
return &api.BoundPod{
ObjectMeta: api.ObjectMeta{
Namespace: namespace,
Name: name,
Annotations: map[string]string{
ConfigSourceAnnotationKey: "etcd",
},
},
}, true
},
}
server := NewServer(fw.fakeKubelet, true) server := NewServer(fw.fakeKubelet, true)
fw.serverUnderTest = &server fw.serverUnderTest = &server
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest) fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)

View File

@ -17,7 +17,6 @@ limitations under the License.
package kubelet package kubelet
import ( import (
"fmt"
"net/http" "net/http"
"os" "os"
"path" "path"
@ -27,7 +26,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
@ -97,47 +95,12 @@ func SetupLogging() {
record.StartLogging(glog.Infof) record.StartLogging(glog.Infof)
} }
// TODO: move this into pkg/client func SetupEventSending(client *client.Client) {
func getApiserverClient(authPath string, apiServerList util.StringList) (*client.Client, error) { glog.Infof("Sending events to api server.")
authInfo, err := clientauth.LoadFromFile(authPath) hostname := util.GetHostname("")
if err != nil { record.StartRecording(client.Events(""),
return nil, err api.EventSource{
} Component: "kubelet",
clientConfig, err := authInfo.MergeWithConfig(client.Config{}) Host: hostname,
if err != nil { })
return nil, err
}
if len(apiServerList) < 1 {
return nil, fmt.Errorf("no apiservers specified.")
}
// TODO: adapt Kube client to support LB over several servers
if len(apiServerList) > 1 {
glog.Infof("Mulitple api servers specified. Picking first one")
}
clientConfig.Host = apiServerList[0]
if c, err := client.New(&clientConfig); err != nil {
return nil, err
} else {
return c, nil
}
}
func SetupEventSending(authPath string, apiServerList util.StringList) {
// Make an API client if possible.
if len(apiServerList) < 1 {
glog.Info("No api servers specified.")
} else {
if apiClient, err := getApiserverClient(authPath, apiServerList); err != nil {
glog.Errorf("Unable to make apiserver client: %v", err)
} else {
// Send events to APIserver if there is a client.
hostname := util.GetHostname("")
glog.Infof("Sending events to APIserver.")
record.StartRecording(apiClient.Events(""),
api.EventSource{
Component: "kubelet",
Host: hostname,
})
}
}
} }

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
minionControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" minionControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
@ -55,6 +56,31 @@ func (h *delegateHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
} }
// TODO: replace this with clientcmd
func GetAPIServerClient(authPath string, apiServerList util.StringList) (*client.Client, error) {
authInfo, err := clientauth.LoadFromFile(authPath)
if err != nil {
return nil, err
}
clientConfig, err := authInfo.MergeWithConfig(client.Config{})
if err != nil {
return nil, err
}
if len(apiServerList) < 1 {
return nil, fmt.Errorf("no api servers specified.")
}
// TODO: adapt Kube client to support LB over several servers
if len(apiServerList) > 1 {
glog.Infof("Mulitple api servers specified. Picking first one")
}
clientConfig.Host = apiServerList[0]
c, err := client.New(&clientConfig)
if err != nil {
return nil, err
}
return c, nil
}
// RunApiServer starts an API server in a go routine. // RunApiServer starts an API server in a go routine.
func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, port int) { func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, port int) {
handler := delegateHandler{} handler := delegateHandler{}
@ -129,8 +155,9 @@ func RunControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient. // SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient.
// Under the hood it calls RunKubelet (below) // Under the hood it calls RunKubelet (below)
func SimpleRunKubelet(etcdClient tools.EtcdClient, dockerClient dockertools.DockerInterface, hostname, rootDir, manifestURL, address string, port uint) { func SimpleRunKubelet(client *client.Client, etcdClient tools.EtcdClient, dockerClient dockertools.DockerInterface, hostname, rootDir, manifestURL, address string, port uint) {
kcfg := KubeletConfig{ kcfg := KubeletConfig{
KubeClient: client,
EtcdClient: etcdClient, EtcdClient: etcdClient,
DockerClient: dockerClient, DockerClient: dockerClient,
HostnameOverride: hostname, HostnameOverride: hostname,
@ -152,7 +179,11 @@ func SimpleRunKubelet(etcdClient tools.EtcdClient, dockerClient dockertools.Dock
// 3 Standalone 'kubernetes' binary // 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3 // Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig) { func RunKubelet(kcfg *KubeletConfig) {
kubelet.SetupEventSending(kcfg.AuthPath, kcfg.ApiServerList) if kcfg.KubeClient != nil {
kubelet.SetupEventSending(kcfg.KubeClient)
} else {
glog.Infof("No api server defined - no events will be sent.")
}
kubelet.SetupLogging() kubelet.SetupLogging()
kubelet.SetupCapabilities(kcfg.AllowPrivileged) kubelet.SetupCapabilities(kcfg.AllowPrivileged)
@ -210,11 +241,10 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
type KubeletConfig struct { type KubeletConfig struct {
EtcdClient tools.EtcdClient EtcdClient tools.EtcdClient
KubeClient *client.Client
DockerClient dockertools.DockerInterface DockerClient dockertools.DockerInterface
CAdvisorPort uint CAdvisorPort uint
Address util.IP Address util.IP
AuthPath string
ApiServerList util.StringList
AllowPrivileged bool AllowPrivileged bool
HostnameOverride string HostnameOverride string
RootDirectory string RootDirectory string

View File

@ -131,7 +131,7 @@ func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response,
} }
return &etcd.Response{}, EtcdErrorNotFound return &etcd.Response{}, EtcdErrorNotFound
} }
f.t.Logf("returning %v: %v %#v", key, result.R, result.E) f.t.Logf("returning %v: %#v %#v", key, result.R, result.E)
return result.R, result.E return result.R, result.E
} }
@ -262,6 +262,7 @@ func (f *FakeEtcdClient) WaitForWatchCompletion() {
} }
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
f.Mutex.Lock()
if f.WatchImmediateError != nil { if f.WatchImmediateError != nil {
return nil, f.WatchImmediateError return nil, f.WatchImmediateError
} }
@ -273,6 +274,7 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool,
defer close(injectedError) defer close(injectedError)
f.WatchInjectError = injectedError f.WatchInjectError = injectedError
f.Mutex.Unlock()
if receiver == nil { if receiver == nil {
return f.Get(prefix, false, recursive) return f.Get(prefix, false, recursive)
} else { } else {