Add container garbage collection.

pull/6/head
Brendan Burns 2014-10-27 17:29:55 -07:00
parent 8a60c7e8a1
commit 51bf451932
9 changed files with 407 additions and 10 deletions

View File

@ -67,6 +67,8 @@ var (
registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers and --enable-server")
enableDebuggingHandlers = flag.Bool("enable_debugging_handlers", true, "Enables server endpoints for log collection and local running of containers and commands")
minimumGCAge = flag.Duration("minimum_container_ttl_duration", 0, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'")
maxContainerCount = flag.Int("maximum_dead_containers_per_container", 5, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.")
)
func init() {
@ -183,7 +185,17 @@ func main() {
*networkContainerImage,
*syncFrequency,
float32(*registryPullQPS),
*registryBurst)
*registryBurst,
*minimumGCAge,
*maxContainerCount)
go func() {
util.Forever(func() {
err := k.GarbageCollectContainers()
if err != nil {
glog.Errorf("Garbage collect failed: %v", err)
}
}, time.Minute*1)
}()
go func() {
defer util.HandleCrash()

View File

@ -372,8 +372,10 @@ type ContainerState struct {
type ContainerStatus struct {
// TODO(dchen1107): Should we rename PodStatus to a more generic name or have a separate states
// defined for container?
State ContainerState `json:"state,omitempty" yaml:"state,omitempty"`
RestartCount int `json:"restartCount" yaml:"restartCount"`
State ContainerState `json:"state,omitempty" yaml:"state,omitempty"`
// Note that this is calculated from dead containers. But those containers are subject to
// garbage collection. This value will get capped at 5 by GC.
RestartCount int `json:"restartCount" yaml:"restartCount"`
// TODO(dchen1107): Deprecated this soon once we pull entire PodStatus from node,
// not just PodInfo. Now we need this to remove docker.Container from API
PodIP string `json:"podIP,omitempty" yaml:"podIP,omitempty"`

View File

@ -345,8 +345,10 @@ type ContainerState struct {
type ContainerStatus struct {
// TODO(dchen1107): Should we rename PodStatus to a more generic name or have a separate states
// defined for container?
State ContainerState `json:"state,omitempty" yaml:"state,omitempty"`
RestartCount int `json:"restartCount" yaml:"restartCount"`
State ContainerState `json:"state,omitempty" yaml:"state,omitempty"`
// Note that this is calculated from dead containers. But those containers are subject to
// garbage collection. This value will get capped at 5 by GC.
RestartCount int `json:"restartCount" yaml:"restartCount"`
// TODO(dchen1107): Deprecated this soon once we pull entire PodStatus from node,
// not just PodInfo. Now we need this to remove docker.Container from API
PodIP string `json:"podIP,omitempty" yaml:"podIP,omitempty"`

View File

@ -310,8 +310,10 @@ type ContainerState struct {
type ContainerStatus struct {
// TODO(dchen1107): Should we rename PodStatus to a more generic name or have a separate states
// defined for container?
State ContainerState `json:"state,omitempty" yaml:"state,omitempty"`
RestartCount int `json:"restartCount" yaml:"restartCount"`
State ContainerState `json:"state,omitempty" yaml:"state,omitempty"`
// Note that this is calculated from dead containers. But those containers are subject to
// garbage collection. This value will get capped at 5 by GC.
RestartCount int `json:"restartCount" yaml:"restartCount"`
// TODO(dchen1107): Deprecated this soon once we pull entire PodStatus from node,
// not just PodInfo. Now we need this to remove docker.Container from API
PodIP string `json:"podIP,omitempty" yaml:"podIP,omitempty"`

View File

@ -409,8 +409,10 @@ type ContainerState struct {
type ContainerStatus struct {
// TODO(dchen1107): Should we rename PodStatus to a more generic name or have a separate states
// defined for container?
State ContainerState `json:"state,omitempty" yaml:"state,omitempty"`
RestartCount int `json:"restartCount" yaml:"restartCount"`
State ContainerState `json:"state,omitempty" yaml:"state,omitempty"`
// Note that this is calculated from dead containers. But those containers are subject to
// garbage collection. This value will get capped at 5 by GC.
RestartCount int `json:"restartCount" yaml:"restartCount"`
// TODO(dchen1107): Introduce our own NetworkSettings struct here?
// TODO(dchen1107): Which image the container is running with?
// TODO(dchen1107): Once we have done with integration with cadvisor, resource

View File

@ -50,6 +50,7 @@ type DockerInterface interface {
CreateContainer(docker.CreateContainerOptions) (*docker.Container, error)
StartContainer(id string, hostConfig *docker.HostConfig) error
StopContainer(id string, timeout uint) error
RemoveContainer(opts docker.RemoveContainerOptions) error
InspectImage(image string) (*docker.Image, error)
PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error
Logs(opts docker.LogsOptions) error

View File

@ -29,12 +29,14 @@ type FakeDockerClient struct {
sync.Mutex
ContainerList []docker.APIContainers
Container *docker.Container
ContainerMap map[string]*docker.Container
Image *docker.Image
Err error
called []string
Stopped []string
pulled []string
Created []string
Removed []string
VersionInfo docker.Env
}
@ -70,6 +72,11 @@ func (f *FakeDockerClient) InspectContainer(id string) (*docker.Container, error
f.Lock()
defer f.Unlock()
f.called = append(f.called, "inspect_container")
if f.ContainerMap != nil {
if container, ok := f.ContainerMap[id]; ok {
return container, f.Err
}
}
return f.Container, f.Err
}
@ -122,6 +129,14 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error {
return f.Err
}
func (f *FakeDockerClient) RemoveContainer(opts docker.RemoveContainerOptions) error {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "remove")
f.Removed = append(f.Removed, opts.ID)
return f.Err
}
// Logs is a test-spy implementation of DockerInterface.Logs.
// It adds an entry "logs" to the internal method call record.
func (f *FakeDockerClient) Logs(opts docker.LogsOptions) error {

View File

@ -21,6 +21,7 @@ import (
"io"
"net/http"
"path"
"sort"
"strconv"
"strings"
"sync"
@ -61,7 +62,9 @@ func NewMainKubelet(
ni string,
ri time.Duration,
pullQPS float32,
pullBurst int) *Kubelet {
pullBurst int,
minimumGCAge time.Duration,
maxContainerCount int) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
@ -74,6 +77,8 @@ func NewMainKubelet(
httpClient: &http.Client{},
pullQPS: pullQPS,
pullBurst: pullBurst,
minimumGCAge: minimumGCAge,
maxContainerCount: maxContainerCount,
}
}
@ -125,6 +130,68 @@ type Kubelet struct {
// Optional, no statistics will be available if omitted
cadvisorClient cadvisorInterface
cadvisorLock sync.RWMutex
// Optional, minimum age required for garbage collection. If zero, no limit.
minimumGCAge time.Duration
maxContainerCount int
}
type ByCreated []*docker.Container
func (a ByCreated) Len() int { return len(a) }
func (a ByCreated) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByCreated) Less(i, j int) bool { return a[i].Created.After(a[j].Created) }
// TODO: these removals are racy, we should make dockerclient threadsafe across List/Inspect transactions.
func (kl *Kubelet) purgeOldest(ids []string) error {
dockerData := []*docker.Container{}
for _, id := range ids {
data, err := kl.dockerClient.InspectContainer(id)
if err != nil {
return err
}
if !data.State.Running && (kl.minimumGCAge == 0 || time.Now().Sub(data.State.FinishedAt) > kl.minimumGCAge) {
dockerData = append(dockerData, data)
}
}
sort.Sort(ByCreated(dockerData))
if len(dockerData) <= kl.maxContainerCount {
return nil
}
dockerData = dockerData[kl.maxContainerCount:]
for _, data := range dockerData {
if err := kl.dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: data.ID}); err != nil {
return err
}
}
return nil
}
// TODO: Also enforce a maximum total number of containers.
func (kl *Kubelet) GarbageCollectContainers() error {
if kl.maxContainerCount == 0 {
return nil
}
containers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, true)
if err != nil {
return err
}
uuidToIDMap := map[string][]string{}
for _, container := range containers {
_, uuid, name, _ := dockertools.ParseDockerName(container.ID)
uuidName := uuid + "." + name
uuidToIDMap[uuidName] = append(uuidToIDMap[uuidName], container.ID)
}
for _, list := range uuidToIDMap {
if len(list) <= kl.maxContainerCount {
continue
}
if err := kl.purgeOldest(list); err != nil {
return err
}
}
return nil
}
// SetCadvisorClient sets the cadvisor client in a thread-safe way.

View File

@ -1193,3 +1193,297 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
}
}
func TestKubeletGarbageCollection(t *testing.T) {
tests := []struct {
containers []docker.APIContainers
containerDetails map[string]*docker.Container
expectedRemoved []string
}{
{
containers: []docker.APIContainers{
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "1876",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "2876",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "3876",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "4876",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "5876",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "6876",
},
},
containerDetails: map[string]*docker.Container{
"1876": {
State: docker.State{
Running: false,
},
ID: "1876",
Created: time.Now(),
},
},
expectedRemoved: []string{"1876"},
},
{
containers: []docker.APIContainers{
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "1876",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "2876",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "3876",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "4876",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "5876",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "6876",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "7876",
},
},
containerDetails: map[string]*docker.Container{
"1876": {
State: docker.State{
Running: true,
},
ID: "1876",
Created: time.Now(),
},
"2876": {
State: docker.State{
Running: false,
},
ID: "2876",
Created: time.Now(),
},
},
expectedRemoved: []string{"2876"},
},
{
containers: []docker.APIContainers{
{
// network container
Names: []string{"/k8s_net_foo.new.test_.deadbeef"},
ID: "1876",
},
},
},
}
for _, test := range tests {
kubelet, _, fakeDocker := newTestKubelet(t)
kubelet.maxContainerCount = 5
fakeDocker.ContainerList = test.containers
fakeDocker.ContainerMap = test.containerDetails
fakeDocker.Container = &docker.Container{ID: "error", Created: time.Now()}
err := kubelet.GarbageCollectContainers()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(fakeDocker.Removed, test.expectedRemoved) {
t.Errorf("expected: %v, got: %v", test.expectedRemoved, fakeDocker.Removed)
}
}
}
func TestPurgeOldest(t *testing.T) {
created := time.Now()
tests := []struct {
ids []string
containerDetails map[string]*docker.Container
expectedRemoved []string
}{
{
ids: []string{"1", "2", "3", "4", "5"},
containerDetails: map[string]*docker.Container{
"1": {
State: docker.State{
Running: true,
},
ID: "1",
Created: created,
},
"2": {
State: docker.State{
Running: false,
},
ID: "2",
Created: created.Add(time.Second),
},
"3": {
State: docker.State{
Running: false,
},
ID: "3",
Created: created.Add(time.Second),
},
"4": {
State: docker.State{
Running: false,
},
ID: "4",
Created: created.Add(time.Second),
},
"5": {
State: docker.State{
Running: false,
},
ID: "5",
Created: created.Add(time.Second),
},
},
},
{
ids: []string{"1", "2", "3", "4", "5", "6"},
containerDetails: map[string]*docker.Container{
"1": {
State: docker.State{
Running: false,
},
ID: "1",
Created: created.Add(time.Second),
},
"2": {
State: docker.State{
Running: false,
},
ID: "2",
Created: created.Add(time.Millisecond),
},
"3": {
State: docker.State{
Running: false,
},
ID: "3",
Created: created.Add(time.Second),
},
"4": {
State: docker.State{
Running: false,
},
ID: "4",
Created: created.Add(time.Second),
},
"5": {
State: docker.State{
Running: false,
},
ID: "5",
Created: created.Add(time.Second),
},
"6": {
State: docker.State{
Running: false,
},
ID: "6",
Created: created.Add(time.Second),
},
},
expectedRemoved: []string{"2"},
},
{
ids: []string{"1", "2", "3", "4", "5", "6", "7"},
containerDetails: map[string]*docker.Container{
"1": {
State: docker.State{
Running: false,
},
ID: "1",
Created: created.Add(time.Second),
},
"2": {
State: docker.State{
Running: false,
},
ID: "2",
Created: created.Add(time.Millisecond),
},
"3": {
State: docker.State{
Running: false,
},
ID: "3",
Created: created.Add(time.Second),
},
"4": {
State: docker.State{
Running: false,
},
ID: "4",
Created: created.Add(time.Second),
},
"5": {
State: docker.State{
Running: false,
},
ID: "5",
Created: created.Add(time.Second),
},
"6": {
State: docker.State{
Running: false,
},
ID: "6",
Created: created.Add(time.Microsecond),
},
"7": {
State: docker.State{
Running: false,
},
ID: "7",
Created: created.Add(time.Second),
},
},
expectedRemoved: []string{"2", "6"},
},
}
for _, test := range tests {
kubelet, _, fakeDocker := newTestKubelet(t)
kubelet.maxContainerCount = 5
fakeDocker.ContainerMap = test.containerDetails
kubelet.purgeOldest(test.ids)
if !reflect.DeepEqual(fakeDocker.Removed, test.expectedRemoved) {
t.Errorf("expected: %v, got: %v", test.expectedRemoved, fakeDocker.Removed)
}
}
}