dockertools: Introduce GetPods() for docker runtime.

pull/6/head
Yifan Gu 2015-03-20 14:14:19 -07:00
parent 30c3583900
commit f4c3ccf639
3 changed files with 75 additions and 13 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -816,3 +817,61 @@ type ContainerCommandRunner interface {
ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
PortForward(podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error PortForward(podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error
} }
// Parse the pod full name. TODO(yifan): This is duplicated with kubelet.ParsePodFullName.
func parsePodFullName(podFullName string) (string, string, error) {
parts := strings.Split(podFullName, "_")
if len(parts) != 2 {
return "", "", fmt.Errorf("failed to parse the pod full name %q", podFullName)
}
return parts[0], parts[1], nil
}
func GetPods(client DockerInterface, all bool) ([]*container.Pod, error) {
pods := make(map[types.UID]*container.Pod)
var result []*container.Pod
containers, err := GetKubeletDockerContainers(client, all)
if err != nil {
return nil, err
}
// Group containers by pod.
for _, c := range containers {
if len(c.Names) == 0 {
glog.Warningf("Cannog parse empty docker container name: %#v", c.Names)
continue
}
podFullName, podUID, containerName, hash, err := ParseDockerName(c.Names[0])
if err != nil {
glog.Warningf("Parse docker container name %q error: %v", c.Names[0], err)
continue
}
pod, found := pods[podUID]
if !found {
name, namespace, err := parsePodFullName(podFullName)
if err != nil {
glog.Warningf("Parse pod full name %q error: %v", podFullName, err)
continue
}
pod = &container.Pod{
ID: podUID,
Name: name,
Namespace: namespace,
}
pods[podUID] = pod
}
pod.Containers = append(pod.Containers, &container.Container{
ID: types.UID(c.ID),
Name: containerName,
Hash: hash,
Created: c.Created,
})
}
// Convert map to list.
for _, c := range pods {
result = append(result, c)
}
return result, nil
}

View File

@ -19,10 +19,12 @@ package dockertools
import ( import (
"sync" "sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
) )
type DockerCache interface { type DockerCache interface {
RunningContainers() (DockerContainers, error) GetPods() ([]*container.Pod, error)
ForceUpdateIfOlder(time.Time) error ForceUpdateIfOlder(time.Time) error
} }
@ -43,7 +45,7 @@ type dockerCache struct {
// Last time when cache was updated. // Last time when cache was updated.
cacheTime time.Time cacheTime time.Time
// The content of the cache. // The content of the cache.
containers DockerContainers pods []*container.Pod
// Whether the background thread updating the cache is running. // Whether the background thread updating the cache is running.
updatingCache bool updatingCache bool
// Time when the background thread should be stopped. // Time when the background thread should be stopped.
@ -53,15 +55,15 @@ type dockerCache struct {
// Ensure that dockerCache abides by the DockerCache interface. // Ensure that dockerCache abides by the DockerCache interface.
var _ DockerCache = new(dockerCache) var _ DockerCache = new(dockerCache)
func (d *dockerCache) RunningContainers() (DockerContainers, error) { func (d *dockerCache) GetPods() ([]*container.Pod, error) {
d.lock.Lock() d.lock.Lock()
defer d.lock.Unlock() defer d.lock.Unlock()
if time.Since(d.cacheTime) > 2*time.Second { if time.Since(d.cacheTime) > 2*time.Second {
containers, err := GetKubeletDockerContainers(d.client, false) pods, err := GetPods(d.client, false)
if err != nil { if err != nil {
return containers, err return pods, err
} }
d.containers = containers d.pods = pods
d.cacheTime = time.Now() d.cacheTime = time.Now()
} }
// Stop refreshing thread if there were no requests within last 2 seconds. // Stop refreshing thread if there were no requests within last 2 seconds.
@ -70,18 +72,18 @@ func (d *dockerCache) RunningContainers() (DockerContainers, error) {
d.updatingCache = true d.updatingCache = true
go d.startUpdatingCache() go d.startUpdatingCache()
} }
return d.containers, nil return d.pods, nil
} }
func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error { func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error {
d.lock.Lock() d.lock.Lock()
defer d.lock.Unlock() defer d.lock.Unlock()
if d.cacheTime.Before(minExpectedCacheTime) { if d.cacheTime.Before(minExpectedCacheTime) {
containers, err := GetKubeletDockerContainers(d.client, false) pods, err := GetPods(d.client, false)
if err != nil { if err != nil {
return err return err
} }
d.containers = containers d.pods = pods
d.cacheTime = time.Now() d.cacheTime = time.Now()
} }
return nil return nil
@ -91,7 +93,7 @@ func (d *dockerCache) startUpdatingCache() {
run := true run := true
for run { for run {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
containers, err := GetKubeletDockerContainers(d.client, false) pods, err := GetPods(d.client, false)
cacheTime := time.Now() cacheTime := time.Now()
if err != nil { if err != nil {
continue continue
@ -102,7 +104,7 @@ func (d *dockerCache) startUpdatingCache() {
d.updatingCache = false d.updatingCache = false
run = false run = false
} }
d.containers = containers d.pods = pods
d.cacheTime = cacheTime d.cacheTime = cacheTime
d.lock.Unlock() d.lock.Unlock()
} }

View File

@ -22,6 +22,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient" "github.com/fsouza/go-dockerclient"
) )
@ -249,8 +250,8 @@ func NewFakeDockerCache(client DockerInterface) DockerCache {
} }
} }
func (f *FakeDockerCache) RunningContainers() (DockerContainers, error) { func (f *FakeDockerCache) GetPods() ([]*container.Pod, error) {
return GetKubeletDockerContainers(f.client, false) return GetPods(f.client, false)
} }
func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error { func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error {