Make admission control plug-ins work from indexes

pull/6/head
derekwaynecarr 2015-02-16 10:54:29 -05:00
parent d4755704b1
commit 2ed8eed004
13 changed files with 148 additions and 19 deletions

View File

@ -48,6 +48,8 @@ PORTAL_NET=10.247.0.0/16
MASTER_USER=vagrant
MASTER_PASSWD=vagrant
# Admission Controllers to invoke prior to persisting objects in cluster
ADMISSION_CONTROL=NamespaceExists,LimitRanger,ResourceQuota,AlwaysAdmit
# Optional: Install node monitoring.
ENABLE_NODE_MONITORING=true

View File

@ -83,7 +83,7 @@ grains:
cloud_provider: vagrant
roles:
- kubernetes-master
admission_control: NamespaceExists,AlwaysAdmit
admission_control: '$(echo "$ADMISSION_CONTROL" | sed -e "s/'/''/g")'
runtime_config: '$(echo "$RUNTIME_CONFIG" | sed -e "s/'/''/g")'
EOF

View File

@ -88,6 +88,7 @@ function create-provision-scripts {
echo "DNS_DOMAIN='${DNS_DOMAIN:-}'"
echo "DNS_REPLICAS='${DNS_REPLICAS:-}'"
echo "RUNTIME_CONFIG='${RUNTIME_CONFIG:-}'"
echo "ADMISSION_CONTROL='${ADMISSION_CONTROL:-}'"
grep -v "^#" "${KUBE_ROOT}/cluster/vagrant/provision-master.sh"
grep -v "^#" "${KUBE_ROOT}/cluster/vagrant/provision-network.sh"
) > "${KUBE_TEMP}/master-start.sh"

View File

@ -53,6 +53,14 @@ type Reflector struct {
resyncPeriod time.Duration
}
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
// The indexer is configured to key on namespace
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
return indexer, reflector
}
// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType.

View File

@ -19,6 +19,7 @@ package client
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// FakeLimitRanges implements PodsInterface. Meant to be embedded into a struct to get a default
@ -52,3 +53,8 @@ func (c *FakeLimitRanges) Update(limitRange *api.LimitRange) (*api.LimitRange, e
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-limitRange", Value: limitRange.Name})
return &api.LimitRange{}, nil
}
func (c *FakeLimitRanges) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-limitRange", Value: resourceVersion})
return c.Fake.Watch, nil
}

View File

@ -19,6 +19,7 @@ package client
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// FakeResourceQuotas implements ResourceQuotaInterface. Meant to be embedded into a struct to get a default
@ -52,3 +53,8 @@ func (c *FakeResourceQuotas) Update(resourceQuota *api.ResourceQuota) (*api.Reso
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-resourceQuota", Value: resourceQuota.Name})
return &api.ResourceQuota{}, nil
}
func (c *FakeResourceQuotas) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-resourceQuota", Value: resourceVersion})
return c.Fake.Watch, nil
}

View File

@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// LimitRangesNamespacer has methods to work with LimitRange resources in a namespace
@ -36,6 +37,7 @@ type LimitRangeInterface interface {
Delete(name string) error
Create(limitRange *api.LimitRange) (*api.LimitRange, error)
Update(limitRange *api.LimitRange) (*api.LimitRange, error)
Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error)
}
// limitRanges implements LimitRangesNamespacer interface
@ -92,3 +94,15 @@ func (c *limitRanges) Update(limitRange *api.LimitRange) (result *api.LimitRange
err = c.r.Put().Namespace(c.ns).Resource("limitRanges").Name(limitRange.Name).Body(limitRange).Do().Into(result)
return
}
// Watch returns a watch.Interface that watches the requested resource
func (c *limitRanges) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return c.r.Get().
Prefix("watch").
Namespace(c.ns).
Resource("limitRanges").
Param("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
}

View File

@ -17,12 +17,12 @@ limitations under the License.
package client
import (
"net/url"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
//"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestLimitRangeCreate(t *testing.T) {
@ -192,3 +192,12 @@ func TestLimitRangeDelete(t *testing.T) {
err := c.Setup().LimitRanges(ns).Delete("foo")
c.Validate(t, nil, err)
}
func TestLimitRangeWatch(t *testing.T) {
c := &testClient{
Request: testRequest{Method: "GET", Path: "/watch/limitRanges", Query: url.Values{"resourceVersion": []string{}}},
Response: Response{StatusCode: 200},
}
_, err := c.Setup().LimitRanges(api.NamespaceAll).Watch(labels.Everything(), labels.Everything(), "")
c.Validate(t, nil, err)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// ResourceQuotasNamespacer has methods to work with ResourceQuota resources in a namespace
@ -36,6 +37,7 @@ type ResourceQuotaInterface interface {
Delete(name string) error
Create(resourceQuota *api.ResourceQuota) (*api.ResourceQuota, error)
Update(resourceQuota *api.ResourceQuota) (*api.ResourceQuota, error)
Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error)
}
// resourceQuotas implements ResourceQuotasNamespacer interface
@ -92,3 +94,15 @@ func (c *resourceQuotas) Update(resourceQuota *api.ResourceQuota) (result *api.R
err = c.r.Put().Namespace(c.ns).Resource("resourceQuotas").Name(resourceQuota.Name).Body(resourceQuota).Do().Into(result)
return
}
// Watch returns a watch.Interface that watches the requested resource
func (c *resourceQuotas) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return c.r.Get().
Prefix("watch").
Namespace(c.ns).
Resource("resourceQuotas").
Param("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package client
import (
"net/url"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -175,3 +176,12 @@ func TestResourceQuotaDelete(t *testing.T) {
err := c.Setup().ResourceQuotas(ns).Delete("foo")
c.Validate(t, nil, err)
}
func TestResourceQuotaWatch(t *testing.T) {
c := &testClient{
Request: testRequest{Method: "GET", Path: "/watch/resourceQuotas", Query: url.Values{"resourceVersion": []string{}}},
Response: Response{StatusCode: 200},
}
_, err := c.Setup().ResourceQuotas(api.NamespaceAll).Watch(labels.Everything(), labels.Everything(), "")
c.Validate(t, nil, err)
}

View File

@ -23,10 +23,13 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/admission"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func init() {
@ -39,6 +42,7 @@ func init() {
type limitRanger struct {
client client.Interface
limitFunc LimitFunc
indexer cache.Indexer
}
// Admit admits resources into cluster that do not violate any defined LimitRange in the namespace
@ -48,16 +52,30 @@ func (l *limitRanger) Admit(a admission.Attributes) (err error) {
return nil
}
// look for a limit range in current namespace that requires enforcement
// TODO: Move to cache when issue is resolved: https://github.com/GoogleCloudPlatform/kubernetes/issues/2294
items, err := l.client.LimitRanges(a.GetNamespace()).List(labels.Everything())
obj := a.GetObject()
resource := a.GetResource()
name := "Unknown"
if obj != nil {
name, _ = meta.NewAccessor().Name(obj)
}
key := &api.LimitRange{
ObjectMeta: api.ObjectMeta{
Namespace: a.GetNamespace(),
Name: "",
},
}
items, err := l.indexer.Index("namespace", key)
if err != nil {
return err
return apierrors.NewForbidden(a.GetResource(), name, fmt.Errorf("Unable to %s %s at this time because there was an error enforcing limit ranges", a.GetOperation(), resource))
}
if len(items) == 0 {
return nil
}
// ensure it meets each prescribed min/max
for i := range items.Items {
limitRange := &items.Items[i]
for i := range items {
limitRange := items[i].(*api.LimitRange)
err = l.limitFunc(limitRange, a.GetResource(), a.GetObject())
if err != nil {
return err
@ -68,7 +86,17 @@ func (l *limitRanger) Admit(a admission.Attributes) (err error) {
// NewLimitRanger returns an object that enforces limits based on the supplied limit function
func NewLimitRanger(client client.Interface, limitFunc LimitFunc) admission.Interface {
return &limitRanger{client: client, limitFunc: limitFunc}
lw := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return client.LimitRanges(api.NamespaceAll).List(labels.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return client.LimitRanges(api.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion)
},
}
indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.LimitRange{}, 0)
reflector.Run()
return &limitRanger{client: client, limitFunc: limitFunc, indexer: indexer}
}
func Min(a int64, b int64) int64 {

View File

@ -82,7 +82,6 @@ func (e *exists) Admit(a admission.Attributes) (err error) {
func NewExists(c client.Interface) admission.Interface {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
// TODO: look into a list/watch that can work with client.Interface, maybe pass it a ListFunc and a WatchFunc
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {

View File

@ -26,8 +26,11 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func init() {
@ -37,11 +40,22 @@ func init() {
}
type quota struct {
client client.Interface
client client.Interface
indexer cache.Indexer
}
func NewResourceQuota(client client.Interface) admission.Interface {
return &quota{client: client}
lw := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return client.ResourceQuotas(api.NamespaceAll).List(labels.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return client.ResourceQuotas(api.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion)
},
}
indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0)
reflector.Run()
return &quota{client: client, indexer: indexer}
}
var resourceToResourceName = map[string]api.ResourceName{
@ -63,18 +77,36 @@ func (q *quota) Admit(a admission.Attributes) (err error) {
name, _ = meta.NewAccessor().Name(obj)
}
list, err := q.client.ResourceQuotas(a.GetNamespace()).List(labels.Everything())
key := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Namespace: a.GetNamespace(),
Name: "",
},
}
items, err := q.indexer.Index("namespace", key)
if err != nil {
return apierrors.NewForbidden(a.GetResource(), name, fmt.Errorf("Unable to %s %s at this time because there was an error enforcing quota", a.GetOperation(), resource))
}
if len(list.Items) == 0 {
if len(items) == 0 {
return nil
}
for i := range list.Items {
quota := list.Items[i]
dirty, err := IncrementUsage(a, &quota.Status, q.client)
for i := range items {
quota := items[i].(*api.ResourceQuota)
// we cannot modify the value directly in the cache, so we copy
status := &api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
}
for k, v := range quota.Status.Hard {
status.Hard[k] = *v.Copy()
}
for k, v := range quota.Status.Used {
status.Used[k] = *v.Copy()
}
dirty, err := IncrementUsage(a, status, q.client)
if err != nil {
return err
}
@ -87,7 +119,7 @@ func (q *quota) Admit(a admission.Attributes) (err error) {
Namespace: quota.Namespace,
ResourceVersion: quota.ResourceVersion},
}
usage.Status = quota.Status
usage.Status = *status
err = q.client.ResourceQuotaUsages(usage.Namespace).Create(&usage)
if err != nil {
return apierrors.NewForbidden(a.GetResource(), name, fmt.Errorf("Unable to %s %s at this time because there was an error enforcing quota", a.GetOperation(), a.GetResource()))