Scheduler uses TTLStore for assumed pods

pull/6/head
Prashanth Balasubramanian 2015-03-30 11:17:16 -07:00
parent 43949b41d4
commit a7864aa230
10 changed files with 772 additions and 145 deletions

189
pkg/client/cache/expiration_cache.go vendored Normal file
View File

@ -0,0 +1,189 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"time"
)
// ExpirationCache implements the store interface
// 1. All entries are automatically time stamped on insert
// a. The key is computed based off the original item/keyFunc
// b. The value inserted under that key is the timestamped item
// 2. Expiration happens lazily on read based on the expiration policy
// 3. Time-stamps are stripped off unexpired entries before return
type ExpirationCache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
clock util.Clock
expirationPolicy ExpirationPolicy
}
// ExpirationPolicy dictates when an object expires. Currently only abstracted out
// so unittests don't rely on the system clock.
type ExpirationPolicy interface {
IsExpired(obj *timestampedEntry) bool
}
// TTLPolicy implements a ttl based ExpirationPolicy.
type TTLPolicy struct {
// >0: Expire entries with an age > ttl
// <=0: Don't expire any entry
Ttl time.Duration
// Clock used to calculate ttl expiration
Clock util.Clock
}
// IsExpired returns true if the given object is older than the ttl, or it can't
// determine its age.
func (p *TTLPolicy) IsExpired(obj *timestampedEntry) bool {
return p.Ttl > 0 && p.Clock.Since(obj.timestamp) > p.Ttl
}
// timestampedEntry is the only type allowed in a ExpirationCache.
type timestampedEntry struct {
obj interface{}
timestamp time.Time
}
// getTimestampedEntry returnes the timestampedEntry stored under the given key.
func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) {
item, _ := c.cacheStorage.Get(key)
// TODO: Check the cast instead
if tsEntry, ok := item.(*timestampedEntry); ok {
return tsEntry, true
}
return nil, false
}
// getOrExpire retrieves the object from the timestampedEntry iff it hasn't
// already expired. It kicks-off a go routine to delete expired objects from
// the store and sets exists=false.
func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
timestampedItem, exists := c.getTimestampedEntry(key)
if !exists {
return nil, false
}
if c.expirationPolicy.IsExpired(timestampedItem) {
// Since expiration happens lazily on read, don't hold up
// the reader trying to acquire a write lock for the delete.
// The next reader will retry the delete even if this one
// fails; as long as we only return un-expired entries a
// reader doesn't need to wait for the result of the delete.
go func() {
defer util.HandleCrash()
c.cacheStorage.Delete(key)
}()
return nil, false
}
return timestampedItem.obj, true
}
// GetByKey returns the item stored under the key, or sets exists=false.
func (c *ExpirationCache) GetByKey(key string) (interface{}, bool, error) {
obj, exists := c.getOrExpire(key)
return obj, exists, nil
}
// Get returns unexpired items. It purges the cache of expired items in the
// process.
func (c *ExpirationCache) Get(obj interface{}) (interface{}, bool, error) {
key, err := c.keyFunc(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
obj, exists := c.getOrExpire(key)
return obj, exists, nil
}
// List retrieves a list of unexpired items. It purges the cache of expired
// items in the process.
func (c *ExpirationCache) List() []interface{} {
items := c.cacheStorage.List()
list := make([]interface{}, 0, len(items))
for _, item := range items {
obj := item.(*timestampedEntry).obj
if key, err := c.keyFunc(obj); err != nil {
list = append(list, obj)
} else if obj, exists := c.getOrExpire(key); exists {
list = append(list, obj)
}
}
return list
}
// ListKeys returns a list of all keys in the expiration cache.
func (c *ExpirationCache) ListKeys() []string {
return c.cacheStorage.ListKeys()
}
// Add timestamps an item and inserts it into the cache, overwriting entries
// that might exist under the same key.
func (c *ExpirationCache) Add(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Add(key, &timestampedEntry{obj, c.clock.Now()})
return nil
}
// Update has not been implemented yet for lack of a use case, so this method
// simply calls `Add`. This effectively refreshes the timestamp.
func (c *ExpirationCache) Update(obj interface{}) error {
return c.Add(obj)
}
// Delete removes an item from the cache.
func (c *ExpirationCache) Delete(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Delete(key)
return nil
}
// Replace will convert all items in the given list to TimestampedEntries
// before attempting the replace operation. The replace operation will
// delete the contents of the ExpirationCache `c`.
func (c *ExpirationCache) Replace(list []interface{}) error {
items := map[string]interface{}{}
ts := c.clock.Now()
for _, item := range list {
key, err := c.keyFunc(item)
if err != nil {
return KeyError{item, err}
}
items[key] = &timestampedEntry{item, ts}
}
c.cacheStorage.Replace(items)
return nil
}
// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy
func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store {
return &ExpirationCache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
clock: util.RealClock{},
expirationPolicy: &TTLPolicy{ttl, util.RealClock{}},
}
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
type fakeThreadSafeMap struct {
ThreadSafeStore
deletedKeys chan<- string
}
func (c *fakeThreadSafeMap) Delete(key string) {
if c.deletedKeys != nil {
c.deletedKeys <- key
}
}
type FakeExpirationPolicy struct {
NeverExpire util.StringSet
RetrieveKeyFunc KeyFunc
}
func (p *FakeExpirationPolicy) IsExpired(obj *timestampedEntry) bool {
key, _ := p.RetrieveKeyFunc(obj)
return !p.NeverExpire.Has(key)
}
func NewFakeExpirationStore(keyFunc KeyFunc, deletedKeys chan<- string, expirationPolicy ExpirationPolicy, cacheClock util.Clock) Store {
cacheStorage := NewThreadSafeStore(Indexers{}, Indices{})
return &ExpirationCache{
cacheStorage: &fakeThreadSafeMap{cacheStorage, deletedKeys},
keyFunc: keyFunc,
clock: cacheClock,
expirationPolicy: expirationPolicy,
}
}

View File

@ -0,0 +1,133 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"reflect"
"testing"
"time"
)
func TestTTLExpirationBasic(t *testing.T) {
testObj := testStoreObject{id: "foo", val: "bar"}
deleteChan := make(chan string)
ttlStore := NewFakeExpirationStore(
testStoreKeyFunc, deleteChan,
&FakeExpirationPolicy{
NeverExpire: util.NewStringSet(),
RetrieveKeyFunc: func(obj interface{}) (string, error) {
return obj.(*timestampedEntry).obj.(testStoreObject).id, nil
},
},
util.RealClock{},
)
err := ttlStore.Add(testObj)
if err != nil {
t.Errorf("Unable to add obj %#v", testObj)
}
item, exists, err := ttlStore.Get(testObj)
if err != nil {
t.Errorf("Failed to get from store, %v", err)
}
if exists || item != nil {
t.Errorf("Got unexpected item %#v", item)
}
key, _ := testStoreKeyFunc(testObj)
select {
case delKey := <-deleteChan:
if delKey != key {
t.Errorf("Unexpected delete for key %s", key)
}
case <-time.After(time.Millisecond * 100):
t.Errorf("Unexpected timeout waiting on delete")
}
close(deleteChan)
}
func TestTTLList(t *testing.T) {
testObjs := []testStoreObject{
{id: "foo", val: "bar"},
{id: "foo1", val: "bar1"},
{id: "foo2", val: "bar2"},
}
expireKeys := util.NewStringSet(testObjs[0].id, testObjs[2].id)
deleteChan := make(chan string)
defer close(deleteChan)
ttlStore := NewFakeExpirationStore(
testStoreKeyFunc, deleteChan,
&FakeExpirationPolicy{
NeverExpire: util.NewStringSet(testObjs[1].id),
RetrieveKeyFunc: func(obj interface{}) (string, error) {
return obj.(*timestampedEntry).obj.(testStoreObject).id, nil
},
},
util.RealClock{},
)
for _, obj := range testObjs {
err := ttlStore.Add(obj)
if err != nil {
t.Errorf("Unable to add obj %#v", obj)
}
}
listObjs := ttlStore.List()
if len(listObjs) != 1 || !reflect.DeepEqual(listObjs[0], testObjs[1]) {
t.Errorf("List returned unexpected results %#v", listObjs)
}
// Make sure all our deletes come through in an acceptable rate (1/100ms)
for expireKeys.Len() != 0 {
select {
case delKey := <-deleteChan:
if !expireKeys.Has(delKey) {
t.Errorf("Unexpected delete for key %s", delKey)
}
expireKeys.Delete(delKey)
case <-time.After(time.Millisecond * 100):
t.Errorf("Unexpected timeout waiting on delete")
return
}
}
}
func TestTTLPolicy(t *testing.T) {
fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
ttl := 30 * time.Second
exactlyOnTTL := fakeTime.Add(-ttl)
expiredTime := fakeTime.Add(-(ttl + 1))
policy := TTLPolicy{ttl, &util.FakeClock{fakeTime}}
fakeTimestampedEntry := &timestampedEntry{obj: struct{}{}, timestamp: exactlyOnTTL}
if policy.IsExpired(fakeTimestampedEntry) {
t.Errorf("TTL cache should not expire entries exactly on ttl")
}
fakeTimestampedEntry.timestamp = fakeTime
if policy.IsExpired(fakeTimestampedEntry) {
t.Errorf("TTL Cache should not expire entires before ttl")
}
fakeTimestampedEntry.timestamp = expiredTime
if !policy.IsExpired(fakeTimestampedEntry) {
t.Errorf("TTL Cache should expire entries older than ttl")
}
for _, ttl = range []time.Duration{0, -1} {
policy.Ttl = ttl
if policy.IsExpired(fakeTimestampedEntry) {
t.Errorf("TTL policy should only expire entries when initialized with a ttl > 0")
}
}
}

View File

@ -18,10 +18,7 @@ package cache
import (
"fmt"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// Store is a generic object storage interface. Reflector knows how to watch a server
@ -77,16 +74,15 @@ func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
return meta.Name(), nil
}
// cache responsibilities are limited to:
// 1. Computing keys for objects via keyFunc
// 2. Invoking methods of a ThreadSafeStorage interface
type cache struct {
lock sync.RWMutex
items map[string]interface{}
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
// Add inserts an item into the cache.
@ -95,66 +91,7 @@ func (c *cache) Add(obj interface{}) error {
if err != nil {
return KeyError{obj, err}
}
// keep a pointer to whatever could have been there previously
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj)
return nil
}
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *cache) updateIndices(oldObj interface{}, newObj interface{}) error {
// if we got an old object, we need to remove it before we add it again
if oldObj != nil {
c.deleteFromIndices(oldObj)
}
key, err := c.keyFunc(newObj)
if err != nil {
return err
}
for name, indexFunc := range c.indexers {
indexValue, err := indexFunc(newObj)
if err != nil {
return err
}
index := c.indices[name]
if index == nil {
index = Index{}
c.indices[name] = index
}
set := index[indexValue]
if set == nil {
set = util.StringSet{}
index[indexValue] = set
}
set.Insert(key)
}
return nil
}
// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
func (c *cache) deleteFromIndices(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return err
}
for name, indexFunc := range c.indexers {
indexValue, err := indexFunc(obj)
if err != nil {
return err
}
index := c.indices[name]
if index != nil {
set := index[indexValue]
if set != nil {
set.Delete(key)
}
}
}
c.cacheStorage.Add(key, obj)
return nil
}
@ -164,11 +101,7 @@ func (c *cache) Update(obj interface{}) error {
if err != nil {
return KeyError{obj, err}
}
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj)
c.cacheStorage.Update(key, obj)
return nil
}
@ -178,59 +111,26 @@ func (c *cache) Delete(obj interface{}) error {
if err != nil {
return KeyError{obj, err}
}
c.lock.Lock()
defer c.lock.Unlock()
delete(c.items, key)
c.deleteFromIndices(obj)
c.cacheStorage.Delete(key)
return nil
}
// List returns a list of all the items.
// List is completely threadsafe as long as you treat all items as immutable.
func (c *cache) List() []interface{} {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]interface{}, 0, len(c.items))
for _, item := range c.items {
list = append(list, item)
}
return list
return c.cacheStorage.List()
}
// ListKeys returns a list of all the keys of the objects currently
// in the cache.
func (c *cache) ListKeys() []string {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]string, 0, len(c.items))
for key := range c.items {
list = append(list, key)
}
return list
return c.cacheStorage.ListKeys()
}
// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
indexKey, err := indexFunc(obj)
if err != nil {
return nil, err
}
index := c.indices[indexName]
set := index[indexKey]
list := make([]interface{}, 0, set.Len())
for _, key := range set.List() {
list = append(list, c.items[key])
}
return list, nil
return c.cacheStorage.Index(indexName, obj)
}
// Get returns the requested item, or sets exists=false.
@ -246,9 +146,7 @@ func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error)
// GetByKey returns the request item, or exists=false.
// GetByKey is completely threadsafe as long as you treat all items as immutable.
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[key]
item, exists = c.cacheStorage.Get(key)
return item, exists, nil
}
@ -264,26 +162,22 @@ func (c *cache) Replace(list []interface{}) error {
}
items[key] = item
}
c.lock.Lock()
defer c.lock.Unlock()
c.items = items
// rebuild any index
c.indices = Indices{}
for _, item := range c.items {
c.updateIndices(nil, item)
}
c.cacheStorage.Replace(items)
return nil
}
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: Indexers{}, indices: Indices{}}
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: indexers, indices: Indices{}}
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}

201
pkg/client/cache/thread_safe_store.go vendored Normal file
View File

@ -0,0 +1,201 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
// TL;DR caveats: you must not modify anything returned by Get or List as it will break
// the indexing feature in addition to not being thread safe.
//
// The guarantees of thread safety provided by List/Get are only valid if the caller
// treats returned items as read-only. For example, a pointer inserted in the store
// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
// on the same key and modify the pointer in a non-thread-safe way. Also note that
// modifying objects stored by the indexers (if any) will *not* automatically lead
// to a re-index. So it's not a good idea to directly modify the objects returned by
// Get/List, in general.
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{})
Index(indexName string, obj interface{}) ([]interface{}, error)
}
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
func (c *threadSafeMap) Update(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
if obj, exists := c.items[key]; exists {
c.deleteFromIndices(obj, key)
delete(c.items, key)
}
}
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[key]
return item, exists
}
func (c *threadSafeMap) List() []interface{} {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]interface{}, 0, len(c.items))
for _, item := range c.items {
list = append(list, item)
}
return list
}
// ListKeys returns a list of all the keys of the objects currently
// in the threadSafeMap.
func (c *threadSafeMap) ListKeys() []string {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]string, 0, len(c.items))
for key := range c.items {
list = append(list, key)
}
return list
}
func (c *threadSafeMap) Replace(items map[string]interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
c.items = items
// rebuild any index
c.indices = Indices{}
for key, item := range c.items {
c.updateIndices(nil, item, key)
}
}
// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
indexKey, err := indexFunc(obj)
if err != nil {
return nil, err
}
index := c.indices[indexName]
set := index[indexKey]
list := make([]interface{}, 0, set.Len())
for _, key := range set.List() {
list = append(list, c.items[key])
}
return list, nil
}
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
// if we got an old object, we need to remove it before we add it again
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
for name, indexFunc := range c.indexers {
indexValue, err := indexFunc(newObj)
if err != nil {
return err
}
index := c.indices[name]
if index == nil {
index = Index{}
c.indices[name] = index
}
set := index[indexValue]
if set == nil {
set = util.StringSet{}
index[indexValue] = set
}
set.Insert(key)
}
return nil
}
// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
for name, indexFunc := range c.indexers {
indexValue, err := indexFunc(obj)
if err != nil {
return err
}
index := c.indices[name]
if index != nil {
set := index[indexValue]
if set != nil {
set.Delete(key)
}
}
}
return nil
}
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: Indices{},
}
}

View File

@ -20,16 +20,6 @@ import (
"bytes"
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"mime"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/metrics"
@ -41,6 +31,15 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
"github.com/golang/glog"
"io"
"io/ioutil"
"mime"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
)
// specialParams lists parameters that are handled specially and which users of Request

View File

@ -18,10 +18,9 @@ package runtime
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"net/url"
"reflect"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
)
// Scheme defines methods for serializing and deserializing API objects. It

View File

@ -24,6 +24,7 @@ import (
// needs to do arbitrary things based on time.
type Clock interface {
Now() time.Time
Since(time.Time) time.Duration
}
// RealClock really calls time.Now()
@ -34,6 +35,11 @@ func (r RealClock) Now() time.Time {
return time.Now()
}
// Since returns time since the specified timestamp.
func (r RealClock) Since(ts time.Time) time.Duration {
return time.Since(ts)
}
// FakeClock implements Clock, but returns an arbitary time.
type FakeClock struct {
Time time.Time
@ -43,3 +49,8 @@ type FakeClock struct {
func (f *FakeClock) Now() time.Time {
return f.Time
}
// Since returns time since the time in f.
func (f *FakeClock) Since(ts time.Time) time.Duration {
return f.Time.Sub(ts)
}

View File

@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
@ -95,7 +96,9 @@ func NewSimpleModeler(queuedPods, scheduledPods ExtendedPodLister) *SimpleModele
return &SimpleModeler{
queuedPods: queuedPods,
scheduledPods: scheduledPods,
assumedPods: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
assumedPods: &cache.StoreToPodLister{
cache.NewTTLStore(cache.MetaNamespaceKeyFunc, 30*time.Second),
},
}
}
@ -124,10 +127,6 @@ func (s *SimpleModeler) listPods(selector labels.Selector) (pods []api.Pod, err
// Since the assumed list will be short, just check every one.
// Goal here is to stop making assumptions about a pod once it shows
// up in one of these other lists.
// TODO: there's a possibility that a pod could get deleted at the
// exact wrong time and linger in assumedPods forever. So we
// need go through that periodically and check for deleted
// pods.
for _, pod := range assumed {
qExist, err := s.queuedPods.Exists(&pod)
if err != nil {
@ -151,7 +150,7 @@ func (s *SimpleModeler) listPods(selector labels.Selector) (pods []api.Pod, err
if err != nil {
return nil, err
}
// re-get in case we deleted any.
// Listing purges the ttl cache and re-gets, in case we deleted any entries.
assumed, err = s.assumedPods.List(selector)
if err != nil {
return nil, err

View File

@ -18,11 +18,14 @@ package scheduler
import (
"errors"
"math/rand"
"reflect"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -43,6 +46,14 @@ func podWithID(id, desiredHost string) *api.Pod {
}
}
func podWithPort(id, desiredHost string, port int) *api.Pod {
pod := podWithID(id, desiredHost)
pod.Spec.Containers = []api.Container{
{Name: "ctr", Ports: []api.ContainerPort{{HostPort: port}}},
}
return pod
}
type mockScheduler struct {
machine string
err error
@ -144,3 +155,142 @@ func TestScheduler(t *testing.T) {
events.Stop()
}
}
func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
defer eventBroadcaster.StartLogging(t.Logf).Stop()
// Setup modeler so we control the contents of all 3 stores: assumed,
// scheduled and queued
scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
scheduledPodLister := &cache.StoreToPodLister{scheduledPodStore}
queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
queuedPodLister := &cache.StoreToPodLister{queuedPodStore}
modeler := NewSimpleModeler(queuedPodLister, scheduledPodLister)
// Create a fake clock used to timestamp entries and calculate ttl. Nothing
// will expire till we flip to something older than the ttl, at which point
// all entries inserted with fakeTime will expire.
ttl := 30 * time.Second
fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
fakeClock := &util.FakeClock{fakeTime}
ttlPolicy := &cache.TTLPolicy{ttl, fakeClock}
assumedPodsStore := cache.NewFakeExpirationStore(
cache.MetaNamespaceKeyFunc, nil, ttlPolicy, fakeClock)
modeler.assumedPods = &cache.StoreToPodLister{assumedPodsStore}
// Port is the easiest way to cause a fit predicate failure
podPort := 8080
firstPod := podWithPort("foo", "", podPort)
// Create the scheduler config
algo := scheduler.NewGenericScheduler(
map[string]scheduler.FitPredicate{"PodFitsPorts": scheduler.PodFitsPorts},
[]scheduler.PriorityConfig{},
modeler.PodLister(),
rand.New(rand.NewSource(time.Now().UnixNano())))
var gotBinding *api.Binding
c := &Config{
Modeler: modeler,
MinionLister: scheduler.FakeMinionLister(
api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
),
Algorithm: algo,
Binder: fakeBinder{func(b *api.Binding) error {
scheduledPodStore.Add(podWithPort(b.Name, b.Target.Name, podPort))
gotBinding = b
return nil
}},
NextPod: func() *api.Pod {
return queuedPodStore.Pop().(*api.Pod)
},
Error: func(p *api.Pod, err error) {
t.Errorf("Unexpected error when scheduling pod %+v: %v", p, err)
},
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
}
// First scheduling pass should schedule the pod
s := New(c)
called := make(chan struct{})
events := eventBroadcaster.StartEventWatcher(func(e *api.Event) {
if e, a := "scheduled", e.Reason; e != a {
t.Errorf("expected %v, got %v", e, a)
}
close(called)
})
queuedPodStore.Add(firstPod)
// queuedPodStore: [foo:8080]
// scheduledPodStore: []
// assumedPods: []
s.scheduleOne()
// queuedPodStore: []
// scheduledPodStore: [foo:8080]
// assumedPods: [foo:8080]
pod, exists, _ := scheduledPodStore.GetByKey("foo")
if !exists {
t.Errorf("Expected scheduled pod store to contain pod")
}
pod, exists, _ = queuedPodStore.GetByKey("foo")
if exists {
t.Errorf("Did not expect a queued pod, found %+v", pod)
}
pod, exists, _ = assumedPodsStore.GetByKey("foo")
if !exists {
t.Errorf("Assumed pod store should contain stale pod")
}
expectBind := &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
}
if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) {
t.Errorf("Expected exact match on binding: %s", util.ObjectDiff(ex, ac))
}
<-called
events.Stop()
scheduledPodStore.Delete(pod)
_, exists, _ = assumedPodsStore.Get(pod)
if !exists {
t.Errorf("Expected pod %#v in assumed pod store", pod)
}
secondPod := podWithPort("bar", "", podPort)
queuedPodStore.Add(secondPod)
// queuedPodStore: [bar:8080]
// scheduledPodStore: []
// assumedPods: [foo:8080]
// Second scheduling pass will fail to schedule if the store hasn't expired
// the deleted pod. This would normally happen with a timeout.
//expirationPolicy.NeverExpire = util.NewStringSet()
fakeClock.Time = fakeClock.Time.Add(ttl + 1)
called = make(chan struct{})
events = eventBroadcaster.StartEventWatcher(func(e *api.Event) {
if e, a := "scheduled", e.Reason; e != a {
t.Errorf("expected %v, got %v", e, a)
}
close(called)
})
s.scheduleOne()
expectBind = &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
}
if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) {
t.Errorf("Expected exact match on binding: %s", util.ObjectDiff(ex, ac))
}
<-called
events.Stop()
}