mirror of https://github.com/k3s-io/k3s
Wire in the pod cache. Just used for List for now.
parent
6cf4585ae8
commit
302ec0f37b
|
@ -44,7 +44,7 @@ func main() {
|
||||||
reg := registry.MakeEtcdRegistry(etcdClient, machineList)
|
reg := registry.MakeEtcdRegistry(etcdClient, machineList)
|
||||||
|
|
||||||
apiserver := apiserver.New(map[string]apiserver.RESTStorage{
|
apiserver := apiserver.New(map[string]apiserver.RESTStorage{
|
||||||
"pods": registry.MakePodRegistryStorage(reg, &client.FakeContainerInfo{}, registry.MakeRoundRobinScheduler(machineList), nil),
|
"pods": registry.MakePodRegistryStorage(reg, &client.FakeContainerInfo{}, registry.MakeRoundRobinScheduler(machineList), nil, nil),
|
||||||
"replicationControllers": registry.MakeControllerRegistryStorage(reg),
|
"replicationControllers": registry.MakeControllerRegistryStorage(reg),
|
||||||
}, "/api/v1beta1")
|
}, "/api/v1beta1")
|
||||||
server := httptest.NewServer(apiserver)
|
server := httptest.NewServer(apiserver)
|
||||||
|
|
|
@ -71,8 +71,10 @@ func (m *Master) init(minions []string, cloud cloudprovider.Interface) {
|
||||||
|
|
||||||
m.minions = minions
|
m.minions = minions
|
||||||
m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
|
m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
|
||||||
|
podCache := NewPodCache(containerInfo, m.podRegistry, time.Second*30)
|
||||||
|
go podCache.Loop()
|
||||||
m.storage = map[string]apiserver.RESTStorage{
|
m.storage = map[string]apiserver.RESTStorage{
|
||||||
"pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minions, m.podRegistry, m.random), cloud),
|
"pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minions, m.podRegistry, m.random), cloud, podCache),
|
||||||
"replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry),
|
"replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry),
|
||||||
"services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minions),
|
"services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minions),
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||||
package master
|
package master
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -54,7 +53,7 @@ func (p *PodCache) GetContainerInfo(host, id string) (interface{}, error) {
|
||||||
defer p.podLock.Unlock()
|
defer p.podLock.Unlock()
|
||||||
value, ok := p.podInfo[id]
|
value, ok := p.podInfo[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("Couldn't find any information for %s", id)
|
return nil, nil
|
||||||
} else {
|
} else {
|
||||||
return value, nil
|
return value, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,9 +58,12 @@ func TestPodCacheGet(t *testing.T) {
|
||||||
func TestPodCacheGetMissing(t *testing.T) {
|
func TestPodCacheGetMissing(t *testing.T) {
|
||||||
cache := NewPodCache(nil, nil, time.Second*1)
|
cache := NewPodCache(nil, nil, time.Second*1)
|
||||||
|
|
||||||
_, err := cache.GetContainerInfo("host", "foo")
|
info, err := cache.GetContainerInfo("host", "foo")
|
||||||
if err == nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected non-error")
|
t.Errorf("Unexpected error: %#v", err)
|
||||||
|
}
|
||||||
|
if info != nil {
|
||||||
|
t.Errorf("Unexpected info: %#v", info)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,16 +32,25 @@ import (
|
||||||
type PodRegistryStorage struct {
|
type PodRegistryStorage struct {
|
||||||
registry PodRegistry
|
registry PodRegistry
|
||||||
containerInfo client.ContainerInfo
|
containerInfo client.ContainerInfo
|
||||||
|
podCache client.ContainerInfo
|
||||||
scheduler Scheduler
|
scheduler Scheduler
|
||||||
cloud cloudprovider.Interface
|
cloud cloudprovider.Interface
|
||||||
}
|
}
|
||||||
|
|
||||||
func MakePodRegistryStorage(registry PodRegistry, containerInfo client.ContainerInfo, scheduler Scheduler, cloud cloudprovider.Interface) apiserver.RESTStorage {
|
// MakePodRegistryStorage makes a RESTStorage object for a pod registry.
|
||||||
|
// Parameters:
|
||||||
|
// registry The pod registry
|
||||||
|
// containerInfo Source of fresh container info
|
||||||
|
// scheduler The scheduler for assigning pods to machines
|
||||||
|
// cloud Interface to a cloud provider (may be null)
|
||||||
|
// podCache Source of cached container info
|
||||||
|
func MakePodRegistryStorage(registry PodRegistry, containerInfo client.ContainerInfo, scheduler Scheduler, cloud cloudprovider.Interface, podCache client.ContainerInfo) apiserver.RESTStorage {
|
||||||
return &PodRegistryStorage{
|
return &PodRegistryStorage{
|
||||||
registry: registry,
|
registry: registry,
|
||||||
containerInfo: containerInfo,
|
containerInfo: containerInfo,
|
||||||
scheduler: scheduler,
|
scheduler: scheduler,
|
||||||
cloud: cloud,
|
cloud: cloud,
|
||||||
|
podCache: podCache,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,7 +59,20 @@ func (storage *PodRegistryStorage) List(query labels.Query) (interface{}, error)
|
||||||
pods, err := storage.registry.ListPods(query)
|
pods, err := storage.registry.ListPods(query)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
result.Items = pods
|
result.Items = pods
|
||||||
|
// Get cached info for the list currently.
|
||||||
|
// TODO: Optionally use fresh info
|
||||||
|
if storage.podCache != nil {
|
||||||
|
for ix, pod := range pods {
|
||||||
|
info, err := storage.podCache.GetContainerInfo(pod.CurrentState.Host, pod.ID)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error getting container info: %#v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
result.Items[ix].CurrentState.Info = info
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result.Kind = "cluster#podList"
|
result.Kind = "cluster#podList"
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue