diff --git a/cluster/vagrant/config-default.sh b/cluster/vagrant/config-default.sh index 595b41af58..f6e91ca1dd 100755 --- a/cluster/vagrant/config-default.sh +++ b/cluster/vagrant/config-default.sh @@ -48,6 +48,8 @@ PORTAL_NET=10.247.0.0/16 MASTER_USER=vagrant MASTER_PASSWD=vagrant +# Admission Controllers to invoke prior to persisting objects in cluster +ADMISSION_CONTROL=NamespaceExists,LimitRanger,ResourceQuota,AlwaysAdmit # Optional: Install node monitoring. ENABLE_NODE_MONITORING=true diff --git a/cluster/vagrant/provision-master.sh b/cluster/vagrant/provision-master.sh index bc3a46f556..01987e3856 100755 --- a/cluster/vagrant/provision-master.sh +++ b/cluster/vagrant/provision-master.sh @@ -83,7 +83,7 @@ grains: cloud_provider: vagrant roles: - kubernetes-master - admission_control: NamespaceExists,AlwaysAdmit + admission_control: '$(echo "$ADMISSION_CONTROL" | sed -e "s/'/''/g")' runtime_config: '$(echo "$RUNTIME_CONFIG" | sed -e "s/'/''/g")' EOF diff --git a/cluster/vagrant/util.sh b/cluster/vagrant/util.sh index 37f8d2d0a2..8b466f7dc9 100644 --- a/cluster/vagrant/util.sh +++ b/cluster/vagrant/util.sh @@ -88,6 +88,7 @@ function create-provision-scripts { echo "DNS_DOMAIN='${DNS_DOMAIN:-}'" echo "DNS_REPLICAS='${DNS_REPLICAS:-}'" echo "RUNTIME_CONFIG='${RUNTIME_CONFIG:-}'" + echo "ADMISSION_CONTROL='${ADMISSION_CONTROL:-}'" grep -v "^#" "${KUBE_ROOT}/cluster/vagrant/provision-master.sh" grep -v "^#" "${KUBE_ROOT}/cluster/vagrant/provision-network.sh" ) > "${KUBE_TEMP}/master-start.sh" diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 0577c57f89..b7960417a4 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -53,6 +53,14 @@ type Reflector struct { resyncPeriod time.Duration } +// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector +// The indexer is configured to key on namespace +func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) { + indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc}) + reflector = NewReflector(lw, expectedType, indexer, resyncPeriod) + return indexer, reflector +} + // NewReflector creates a new Reflector object which will keep the given store up to // date with the server's contents for the given resource. Reflector promises to // only put things in the store that have the type of expectedType. diff --git a/pkg/client/fake_limit_ranges.go b/pkg/client/fake_limit_ranges.go index 9d1c7c237c..f45965587c 100644 --- a/pkg/client/fake_limit_ranges.go +++ b/pkg/client/fake_limit_ranges.go @@ -19,6 +19,7 @@ package client import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // FakeLimitRanges implements PodsInterface. Meant to be embedded into a struct to get a default @@ -52,3 +53,8 @@ func (c *FakeLimitRanges) Update(limitRange *api.LimitRange) (*api.LimitRange, e c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-limitRange", Value: limitRange.Name}) return &api.LimitRange{}, nil } + +func (c *FakeLimitRanges) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-limitRange", Value: resourceVersion}) + return c.Fake.Watch, nil +} diff --git a/pkg/client/fake_resource_quotas.go b/pkg/client/fake_resource_quotas.go index f7b870ed12..e24c160d49 100644 --- a/pkg/client/fake_resource_quotas.go +++ b/pkg/client/fake_resource_quotas.go @@ -19,6 +19,7 @@ package client import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // FakeResourceQuotas implements ResourceQuotaInterface. Meant to be embedded into a struct to get a default @@ -52,3 +53,8 @@ func (c *FakeResourceQuotas) Update(resourceQuota *api.ResourceQuota) (*api.Reso c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-resourceQuota", Value: resourceQuota.Name}) return &api.ResourceQuota{}, nil } + +func (c *FakeResourceQuotas) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-resourceQuota", Value: resourceVersion}) + return c.Fake.Watch, nil +} diff --git a/pkg/client/limit_ranges.go b/pkg/client/limit_ranges.go index fa03c61178..164bb66dc4 100644 --- a/pkg/client/limit_ranges.go +++ b/pkg/client/limit_ranges.go @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // LimitRangesNamespacer has methods to work with LimitRange resources in a namespace @@ -36,6 +37,7 @@ type LimitRangeInterface interface { Delete(name string) error Create(limitRange *api.LimitRange) (*api.LimitRange, error) Update(limitRange *api.LimitRange) (*api.LimitRange, error) + Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) } // limitRanges implements LimitRangesNamespacer interface @@ -92,3 +94,15 @@ func (c *limitRanges) Update(limitRange *api.LimitRange) (result *api.LimitRange err = c.r.Put().Namespace(c.ns).Resource("limitRanges").Name(limitRange.Name).Body(limitRange).Do().Into(result) return } + +// Watch returns a watch.Interface that watches the requested resource +func (c *limitRanges) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + return c.r.Get(). + Prefix("watch"). + Namespace(c.ns). + Resource("limitRanges"). + Param("resourceVersion", resourceVersion). + SelectorParam("labels", label). + SelectorParam("fields", field). + Watch() +} diff --git a/pkg/client/limit_ranges_test.go b/pkg/client/limit_ranges_test.go index 126baa7ac5..fab3f3fe53 100644 --- a/pkg/client/limit_ranges_test.go +++ b/pkg/client/limit_ranges_test.go @@ -17,12 +17,12 @@ limitations under the License. package client import ( + "net/url" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - //"github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) func TestLimitRangeCreate(t *testing.T) { @@ -192,3 +192,12 @@ func TestLimitRangeDelete(t *testing.T) { err := c.Setup().LimitRanges(ns).Delete("foo") c.Validate(t, nil, err) } + +func TestLimitRangeWatch(t *testing.T) { + c := &testClient{ + Request: testRequest{Method: "GET", Path: "/watch/limitRanges", Query: url.Values{"resourceVersion": []string{}}}, + Response: Response{StatusCode: 200}, + } + _, err := c.Setup().LimitRanges(api.NamespaceAll).Watch(labels.Everything(), labels.Everything(), "") + c.Validate(t, nil, err) +} diff --git a/pkg/client/resource_quotas.go b/pkg/client/resource_quotas.go index d0d51ea800..cced1e40f6 100644 --- a/pkg/client/resource_quotas.go +++ b/pkg/client/resource_quotas.go @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // ResourceQuotasNamespacer has methods to work with ResourceQuota resources in a namespace @@ -36,6 +37,7 @@ type ResourceQuotaInterface interface { Delete(name string) error Create(resourceQuota *api.ResourceQuota) (*api.ResourceQuota, error) Update(resourceQuota *api.ResourceQuota) (*api.ResourceQuota, error) + Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) } // resourceQuotas implements ResourceQuotasNamespacer interface @@ -92,3 +94,15 @@ func (c *resourceQuotas) Update(resourceQuota *api.ResourceQuota) (result *api.R err = c.r.Put().Namespace(c.ns).Resource("resourceQuotas").Name(resourceQuota.Name).Body(resourceQuota).Do().Into(result) return } + +// Watch returns a watch.Interface that watches the requested resource +func (c *resourceQuotas) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + return c.r.Get(). + Prefix("watch"). + Namespace(c.ns). + Resource("resourceQuotas"). + Param("resourceVersion", resourceVersion). + SelectorParam("labels", label). + SelectorParam("fields", field). + Watch() +} diff --git a/pkg/client/resource_quotas_test.go b/pkg/client/resource_quotas_test.go index 22eee5dada..e5b406a0b1 100644 --- a/pkg/client/resource_quotas_test.go +++ b/pkg/client/resource_quotas_test.go @@ -17,6 +17,7 @@ limitations under the License. package client import ( + "net/url" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -175,3 +176,12 @@ func TestResourceQuotaDelete(t *testing.T) { err := c.Setup().ResourceQuotas(ns).Delete("foo") c.Validate(t, nil, err) } + +func TestResourceQuotaWatch(t *testing.T) { + c := &testClient{ + Request: testRequest{Method: "GET", Path: "/watch/resourceQuotas", Query: url.Values{"resourceVersion": []string{}}}, + Response: Response{StatusCode: 200}, + } + _, err := c.Setup().ResourceQuotas(api.NamespaceAll).Watch(labels.Everything(), labels.Everything(), "") + c.Validate(t, nil, err) +} diff --git a/plugin/pkg/admission/limitranger/admission.go b/plugin/pkg/admission/limitranger/admission.go index bf7f4f83a4..61f81a7cd8 100644 --- a/plugin/pkg/admission/limitranger/admission.go +++ b/plugin/pkg/admission/limitranger/admission.go @@ -23,10 +23,13 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/admission" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func init() { @@ -39,6 +42,7 @@ func init() { type limitRanger struct { client client.Interface limitFunc LimitFunc + indexer cache.Indexer } // Admit admits resources into cluster that do not violate any defined LimitRange in the namespace @@ -48,16 +52,30 @@ func (l *limitRanger) Admit(a admission.Attributes) (err error) { return nil } - // look for a limit range in current namespace that requires enforcement - // TODO: Move to cache when issue is resolved: https://github.com/GoogleCloudPlatform/kubernetes/issues/2294 - items, err := l.client.LimitRanges(a.GetNamespace()).List(labels.Everything()) + obj := a.GetObject() + resource := a.GetResource() + name := "Unknown" + if obj != nil { + name, _ = meta.NewAccessor().Name(obj) + } + + key := &api.LimitRange{ + ObjectMeta: api.ObjectMeta{ + Namespace: a.GetNamespace(), + Name: "", + }, + } + items, err := l.indexer.Index("namespace", key) if err != nil { - return err + return apierrors.NewForbidden(a.GetResource(), name, fmt.Errorf("Unable to %s %s at this time because there was an error enforcing limit ranges", a.GetOperation(), resource)) + } + if len(items) == 0 { + return nil } // ensure it meets each prescribed min/max - for i := range items.Items { - limitRange := &items.Items[i] + for i := range items { + limitRange := items[i].(*api.LimitRange) err = l.limitFunc(limitRange, a.GetResource(), a.GetObject()) if err != nil { return err @@ -68,7 +86,17 @@ func (l *limitRanger) Admit(a admission.Attributes) (err error) { // NewLimitRanger returns an object that enforces limits based on the supplied limit function func NewLimitRanger(client client.Interface, limitFunc LimitFunc) admission.Interface { - return &limitRanger{client: client, limitFunc: limitFunc} + lw := &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return client.LimitRanges(api.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return client.LimitRanges(api.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } + indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.LimitRange{}, 0) + reflector.Run() + return &limitRanger{client: client, limitFunc: limitFunc, indexer: indexer} } func Min(a int64, b int64) int64 { diff --git a/plugin/pkg/admission/namespace/exists/admission.go b/plugin/pkg/admission/namespace/exists/admission.go index ac636b82a7..91c5879026 100644 --- a/plugin/pkg/admission/namespace/exists/admission.go +++ b/plugin/pkg/admission/namespace/exists/admission.go @@ -82,7 +82,6 @@ func (e *exists) Admit(a admission.Attributes) (err error) { func NewExists(c client.Interface) admission.Interface { store := cache.NewStore(cache.MetaNamespaceKeyFunc) - // TODO: look into a list/watch that can work with client.Interface, maybe pass it a ListFunc and a WatchFunc reflector := cache.NewReflector( &cache.ListWatch{ ListFunc: func() (runtime.Object, error) { diff --git a/plugin/pkg/admission/resourcequota/admission.go b/plugin/pkg/admission/resourcequota/admission.go index bad1edbb75..b8ab2764dc 100644 --- a/plugin/pkg/admission/resourcequota/admission.go +++ b/plugin/pkg/admission/resourcequota/admission.go @@ -26,8 +26,11 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func init() { @@ -37,11 +40,22 @@ func init() { } type quota struct { - client client.Interface + client client.Interface + indexer cache.Indexer } func NewResourceQuota(client client.Interface) admission.Interface { - return "a{client: client} + lw := &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return client.ResourceQuotas(api.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return client.ResourceQuotas(api.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } + indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0) + reflector.Run() + return "a{client: client, indexer: indexer} } var resourceToResourceName = map[string]api.ResourceName{ @@ -63,18 +77,36 @@ func (q *quota) Admit(a admission.Attributes) (err error) { name, _ = meta.NewAccessor().Name(obj) } - list, err := q.client.ResourceQuotas(a.GetNamespace()).List(labels.Everything()) + key := &api.ResourceQuota{ + ObjectMeta: api.ObjectMeta{ + Namespace: a.GetNamespace(), + Name: "", + }, + } + items, err := q.indexer.Index("namespace", key) if err != nil { return apierrors.NewForbidden(a.GetResource(), name, fmt.Errorf("Unable to %s %s at this time because there was an error enforcing quota", a.GetOperation(), resource)) } - - if len(list.Items) == 0 { + if len(items) == 0 { return nil } - for i := range list.Items { - quota := list.Items[i] - dirty, err := IncrementUsage(a, "a.Status, q.client) + for i := range items { + quota := items[i].(*api.ResourceQuota) + + // we cannot modify the value directly in the cache, so we copy + status := &api.ResourceQuotaStatus{ + Hard: api.ResourceList{}, + Used: api.ResourceList{}, + } + for k, v := range quota.Status.Hard { + status.Hard[k] = *v.Copy() + } + for k, v := range quota.Status.Used { + status.Used[k] = *v.Copy() + } + + dirty, err := IncrementUsage(a, status, q.client) if err != nil { return err } @@ -87,7 +119,7 @@ func (q *quota) Admit(a admission.Attributes) (err error) { Namespace: quota.Namespace, ResourceVersion: quota.ResourceVersion}, } - usage.Status = quota.Status + usage.Status = *status err = q.client.ResourceQuotaUsages(usage.Namespace).Create(&usage) if err != nil { return apierrors.NewForbidden(a.GetResource(), name, fmt.Errorf("Unable to %s %s at this time because there was an error enforcing quota", a.GetOperation(), a.GetResource()))