Merge pull request #7382 from vmarmol/rkt-deps

Move Docker-specific log handling to DockerManager.
pull/6/head
Dawn Chen 2015-04-27 15:10:14 -07:00
commit aa487b7cab
10 changed files with 143 additions and 106 deletions

View File

@ -46,6 +46,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
@ -231,14 +232,14 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
testRootDir := makeTempDirOrDie("kubelet_integ_1.", "")
configFilePath := makeTempDirOrDie("config", testRootDir)
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil, kubelet.FakeOS{})
kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil, kubecontainer.FakeOS{})
kubeletapp.RunKubelet(kcfg, nil)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
testRootDir = makeTempDirOrDie("kubelet_integ_2.", "")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubelet.FakeOS{})
kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.FakeOS{})
kubeletapp.RunKubelet(kcfg, nil)
return apiServer.URL, configFilePath
}

View File

@ -40,6 +40,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
@ -375,7 +376,7 @@ func SimpleKubelet(client *client.Client,
cadvisorInterface cadvisor.Interface,
configFilePath string,
cloud cloudprovider.Interface,
osInterface kubelet.OSInterface) *KubeletConfig {
osInterface kubecontainer.OSInterface) *KubeletConfig {
imageGCPolicy := kubelet.ImageGCPolicy{
HighThresholdPercent: 90,
@ -436,7 +437,7 @@ func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) {
builder = createAndInitKubelet
}
if kcfg.OSInterface == nil {
kcfg.OSInterface = kubelet.RealOS{}
kcfg.OSInterface = kubecontainer.RealOS{}
}
k, podCfg, err := builder(kcfg)
if err != nil {
@ -534,7 +535,7 @@ type KubeletConfig struct {
Cloud cloudprovider.Interface
NodeStatusUpdateFrequency time.Duration
ResourceContainer string
OSInterface kubelet.OSInterface
OSInterface kubecontainer.OSInterface
}
func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {

View File

@ -37,8 +37,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
@ -159,7 +159,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP
if err != nil {
glog.Fatalf("Failed to create cAdvisor: %v", err)
}
kcfg := kubeletapp.SimpleKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubelet.RealOS{})
kcfg := kubeletapp.SimpleKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.RealOS{})
kubeletapp.RunKubelet(kcfg, nil)
}

View File

@ -0,0 +1,55 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"os"
)
// OSInterface collects system level operations that need to be mocked out
// during tests.
type OSInterface interface {
Mkdir(path string, perm os.FileMode) error
Symlink(oldname string, newname string) error
}
// RealOS is used to dispatch the real system level operaitons.
type RealOS struct{}
// MkDir will will call os.Mkdir to create a directory.
func (RealOS) Mkdir(path string, perm os.FileMode) error {
return os.Mkdir(path, perm)
}
// Symlink will call os.Symlink to create a symbolic link.
func (RealOS) Symlink(oldname string, newname string) error {
return os.Symlink(oldname, newname)
}
// FakeOS mocks out certain OS calls to avoid perturbing the filesystem
// on the test machine.
type FakeOS struct{}
// MkDir is a fake call that just returns nil.
func (FakeOS) Mkdir(path string, perm os.FileMode) error {
return nil
}
// Symlink is a fake call that just returns nil.
func (FakeOS) Symlink(oldname string, newname string) error {
return nil
}

View File

@ -392,7 +392,7 @@ func TestIsImagePresent(t *testing.T) {
func TestGetRunningContainers(t *testing.T) {
fakeDocker := &FakeDockerClient{Errors: make(map[string]error)}
fakeRecorder := &record.FakeRecorder{}
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0)
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{})
tests := []struct {
containers map[string]*docker.Container
inputIDs []string
@ -657,7 +657,7 @@ func TestFindContainersByPod(t *testing.T) {
},
}
fakeClient := &FakeDockerClient{}
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0)
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{})
for i, test := range tests {
fakeClient.ContainerList = test.containerList
fakeClient.ExitedContainerList = test.exitedContainerList

View File

@ -56,6 +56,7 @@ type DockerManager struct {
recorder record.EventRecorder
readinessManager *kubecontainer.ReadinessManager
containerRefManager *kubecontainer.RefManager
os kubecontainer.OSInterface
// TODO(yifan): PodInfraContainerImage can be unexported once
// we move createPodInfraContainer into dockertools.
@ -74,6 +75,12 @@ type DockerManager struct {
// use the concrete type so that we can record the pull failure and eliminate
// the image checking in GetPodStatus().
Puller DockerPuller
// Root of the Docker runtime.
dockerRoot string
// Directory of container logs.
containerLogsDir string
}
func NewDockerManager(
@ -83,16 +90,51 @@ func NewDockerManager(
containerRefManager *kubecontainer.RefManager,
podInfraContainerImage string,
qps float32,
burst int) *DockerManager {
burst int,
containerLogsDir string,
osInterface kubecontainer.OSInterface) *DockerManager {
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems.
dockerRoot := "/var/lib/docker"
dockerInfo, err := client.Info()
if err != nil {
glog.Errorf("Failed to execute Info() call to the Docker client: %v", err)
glog.Warningf("Using fallback default of /var/lib/docker for location of Docker runtime")
} else {
driverStatus := dockerInfo.Get("DriverStatus")
// The DriverStatus is a*string* which represents a list of list of strings (pairs) e.g.
// DriverStatus=[["Root Dir","/var/lib/docker/aufs"],["Backing Filesystem","extfs"],["Dirs","279"]]
// Strip out the square brakcets and quotes.
s := strings.Replace(driverStatus, "[", "", -1)
s = strings.Replace(s, "]", "", -1)
s = strings.Replace(s, `"`, "", -1)
// Separate by commas.
ss := strings.Split(s, ",")
// Search for the Root Dir string
for i, k := range ss {
if k == "Root Dir" && i+1 < len(ss) {
// Discard the /aufs suffix.
dockerRoot, _ = path.Split(ss[i+1])
// Trim the last slash.
dockerRoot = strings.TrimSuffix(dockerRoot, "/")
glog.Infof("Setting dockerRoot to %s", dockerRoot)
}
}
}
reasonCache := stringCache{cache: lru.New(maxReasonCacheEntries)}
return &DockerManager{
client: client,
recorder: recorder,
readinessManager: readinessManager,
containerRefManager: containerRefManager,
client: client,
recorder: recorder,
readinessManager: readinessManager,
containerRefManager: containerRefManager,
os: osInterface,
PodInfraContainerImage: podInfraContainerImage,
reasonCache: reasonCache,
Puller: newDockerPuller(client, qps, burst),
dockerRoot: dockerRoot,
containerLogsDir: containerLogsDir,
}
}
@ -936,6 +978,17 @@ func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, ge
return DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
}
}
// Create a symbolic link to the Docker container log file using a name which captures the
// full pod name, the container name and the Docker container ID. Cluster level logging will
// capture these symbolic filenames which can be used for search terms in Elasticsearch or for
// labels for Cloud Logging.
podFullName := kubecontainer.GetPodFullName(pod)
containerLogFile := path.Join(dm.dockerRoot, "containers", id, fmt.Sprintf("%s-json.log", id))
symlinkFile := path.Join(dm.containerLogsDir, fmt.Sprintf("%s-%s-%s.log", podFullName, container.Name, id))
if err = dm.os.Symlink(containerLogFile, symlinkFile); err != nil {
glog.Errorf("Failed to create symbolic link to the log file of pod %q container %q: %v", podFullName, container.Name, err)
}
return DockerID(id), err
}

View File

@ -70,6 +70,9 @@ const (
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
// Location of container logs.
containerLogsDir = "/var/log/containers"
)
var (
@ -96,40 +99,6 @@ type SourcesReadyFn func() bool
type volumeMap map[string]volume.Volume
// OSInterface collects system level operations that need to be mocked out
// during tests.
type OSInterface interface {
Mkdir(path string, perm os.FileMode) error
Symlink(oldname string, newname string) error
}
// RealOS is used to dispatch the real system level operaitons.
type RealOS struct{}
// MkDir will will call os.Mkdir to create a directory.
func (RealOS) Mkdir(path string, perm os.FileMode) error {
return os.Mkdir(path, perm)
}
// Symlink will call os.Symlink to create a symbolic link.
func (RealOS) Symlink(oldname string, newname string) error {
return os.Symlink(oldname, newname)
}
// FakeOS mocks out certain OS calls to avoid perturbing the filesystem
// on the test machine.
type FakeOS struct{}
// MkDir is a fake call that just returns nil.
func (FakeOS) Mkdir(path string, perm os.FileMode) error {
return nil
}
// Symlink is a fake call that just returns nil.
func (FakeOS) Symlink(oldname string, newname string) error {
return nil
}
// New creates a new Kubelet for use in main
func NewMainKubelet(
hostname string,
@ -155,7 +124,7 @@ func NewMainKubelet(
cloud cloudprovider.Interface,
nodeStatusUpdateFrequency time.Duration,
resourceContainer string,
osInterface OSInterface) (*Kubelet, error) {
osInterface kubecontainer.OSInterface) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@ -179,35 +148,6 @@ func NewMainKubelet(
if !dockerUp {
return nil, fmt.Errorf("timed out waiting for Docker to come up")
}
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems.
dockerRoot := "/var/lib/docker"
dockerInfo, err := dockerClient.Info()
if err != nil {
glog.Errorf("Failed to execute Info() call to the Docker client: %v", err)
glog.Warningf("Using fallback default of /var/lib/docker for location of Docker runtime")
} else {
driverStatus := dockerInfo.Get("DriverStatus")
// The DriverStatus is a*string* which represents a list of list of strings (pairs) e.g.
// DriverStatus=[["Root Dir","/var/lib/docker/aufs"],["Backing Filesystem","extfs"],["Dirs","279"]]
// Strip out the square brakcets and quotes.
s := strings.Replace(driverStatus, "[", "", -1)
s = strings.Replace(s, "]", "", -1)
s = strings.Replace(s, `"`, "", -1)
// Separate by commas.
ss := strings.Split(s, ",")
// Search for the Root Dir string
for i, k := range ss {
if k == "Root Dir" && i+1 < len(ss) {
// Discard the /aufs suffix.
dockerRoot, _ = path.Split(ss[i+1])
// Trim the last slash.
dockerRoot = strings.TrimSuffix(dockerRoot, "/")
glog.Infof("Setting dockerRoot to %s", dockerRoot)
}
}
}
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
@ -270,7 +210,9 @@ func NewMainKubelet(
containerRefManager,
podInfraContainerImage,
pullQPS,
pullBurst)
pullBurst,
containerLogsDir,
osInterface)
volumeManager := newVolumeManager()
@ -303,7 +245,6 @@ func NewMainKubelet(
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
resourceContainer: resourceContainer,
os: osInterface,
dockerRoot: dockerRoot,
}
klet.podManager = newBasicPodManager(klet.kubeClient)
@ -331,10 +272,10 @@ func NewMainKubelet(
} else {
klet.networkPlugin = plug
}
// If the /var/log/containers directory does not exist, create it.
if _, err := os.Stat("/var/log/containers"); err != nil {
if err := osInterface.Mkdir("/var/log/containers", 0755); err != nil {
glog.Errorf("Failed to create directory /var/log/containers: %v", err)
// If the container logs directory does not exist, create it.
if _, err := os.Stat(containerLogsDir); err != nil {
if err := osInterface.Mkdir(containerLogsDir, 0755); err != nil {
glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)
}
}
@ -454,8 +395,7 @@ type Kubelet struct {
// Name must be absolute.
resourceContainer string
os OSInterface
dockerRoot string
os kubecontainer.OSInterface
}
// getRootDir returns the full path to the directory under which kubelet can
@ -1048,21 +988,6 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Contain
glog.Errorf("Error running pod %q container %q: %v", podFullName, container.Name, err)
return "", err
}
// Create a symbolic link to the Docker container log file using a name which captures the
// full pod name, the container name and the Docker container ID. Cluster level logging will
// capture these symbolic filenames which can be used for search terms in Elasticsearch or for
// labels for Cloud Logging.
// If for any reason kl.dockerRoot is not set, default to /var/lib/docker
dockerRoot := kl.dockerRoot
if kl.dockerRoot == "" {
dockerRoot = "/var/lib/docker"
glog.Errorf("dockerRoot field not set in the Kubelet configuration")
}
containerLogFile := fmt.Sprintf("%s/containers/%s/%s-json.log", dockerRoot, containerID, containerID)
symlinkFile := fmt.Sprintf("/var/log/containers/%s-%s-%s.log", podFullName, container.Name, containerID)
if err = kl.os.Symlink(containerLogFile, symlinkFile); err != nil {
glog.Errorf("Failed to create symbolic link to the log file of pod %q container %q: %v", podFullName, container.Name, err)
}
return containerID, nil
}

View File

@ -77,7 +77,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet := &Kubelet{}
kubelet.dockerClient = fakeDocker
kubelet.kubeClient = fakeKubeClient
kubelet.os = FakeOS{}
kubelet.os = kubecontainer.FakeOS{}
kubelet.hostname = "testnode"
kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
@ -105,7 +105,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
podManager, fakeMirrorClient := newFakePodManager()
kubelet.podManager = podManager
kubelet.containerRefManager = kubecontainer.NewRefManager()
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0)
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os)
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager)
kubelet.podWorkers = newPodWorkers(
kubelet.runtimeCache,

View File

@ -40,7 +40,7 @@ func newPod(uid, name string) *api.Pod {
func createPodWorkers() (*podWorkers, map[types.UID][]string) {
fakeDocker := &dockertools.FakeDockerClient{}
fakeRecorder := &record.FakeRecorder{}
dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0)
dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{})
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager)
lock := sync.Mutex{}

View File

@ -86,7 +86,7 @@ func TestRunOnce(t *testing.T) {
containerRefManager: kubecontainer.NewRefManager(),
readinessManager: kubecontainer.NewReadinessManager(),
podManager: podManager,
os: FakeOS{},
os: kubecontainer.FakeOS{},
volumeManager: newVolumeManager(),
}
@ -154,7 +154,9 @@ func TestRunOnce(t *testing.T) {
kb.containerRefManager,
dockertools.PodInfraContainerImage,
0,
0)
0,
"",
kubecontainer.FakeOS{})
kb.containerManager.Puller = &dockertools.FakeDockerPuller{}
pods := []*api.Pod{