diff --git a/cmd/kube-controller-manager/controller-manager.go b/cmd/kube-controller-manager/controller-manager.go index fd7d52599f..02d98ad068 100644 --- a/cmd/kube-controller-manager/controller-manager.go +++ b/cmd/kube-controller-manager/controller-manager.go @@ -34,6 +34,7 @@ import ( replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" + "github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota" "github.com/GoogleCloudPlatform/kubernetes/pkg/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" @@ -55,8 +56,9 @@ var ( machineList util.StringList // TODO: Discover these by pinging the host machines, and rip out these flags. // TODO: in the meantime, use resource.QuantityFlag() instead of these - nodeMilliCPU = flag.Int64("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") - nodeMemory = resource.QuantityFlag("node_memory", "3Gi", "The amount of memory (in bytes) provisioned on each node") + nodeMilliCPU = flag.Int64("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") + nodeMemory = resource.QuantityFlag("node_memory", "3Gi", "The amount of memory (in bytes) provisioned on each node") + resourceQuotaSyncPeriod = flag.Duration("resource_quota_sync_period", 10*time.Second, "The period for syncing quota usage status in the system") ) func init() { @@ -112,5 +114,8 @@ func main() { nodeController := nodeControllerPkg.NewNodeController(cloud, *minionRegexp, machineList, nodeResources, kubeClient) nodeController.Run(*nodeSyncPeriod) + resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) + resourceQuotaManager.Run(*resourceQuotaSyncPeriod) + select {} } diff --git a/examples/resourcequota/resource-quota.json b/examples/resourcequota/resource-quota.json new file mode 100644 index 0000000000..7d0c40aefb --- /dev/null +++ b/examples/resourcequota/resource-quota.json @@ -0,0 +1,15 @@ +{ + "id": "quota", + "kind": "ResourceQuota", + "apiVersion": "v1beta1", + "spec": { + "hard": { + "memory": "1073741824", + "cpu": "20", + "pods": "10", + "services": "5", + "replicationcontrollers":"20", + "resourcequotas":"1", + }, + } +} diff --git a/pkg/resourcequota/doc.go b/pkg/resourcequota/doc.go new file mode 100644 index 0000000000..2e31b04933 --- /dev/null +++ b/pkg/resourcequota/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// resourcequota contains a controller that makes resource quota usage observations +package resourcequota diff --git a/pkg/resourcequota/resource_quota_controller.go b/pkg/resourcequota/resource_quota_controller.go new file mode 100644 index 0000000000..1dc5b9b151 --- /dev/null +++ b/pkg/resourcequota/resource_quota_controller.go @@ -0,0 +1,180 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequota + +import ( + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" +) + +// ResourceQuotaManager is responsible for tracking quota usage status in the system +type ResourceQuotaManager struct { + kubeClient client.Interface + syncTime <-chan time.Time + + // To allow injection of syncUsage for testing. + syncHandler func(quota api.ResourceQuota) error +} + +// NewResourceQuotaManager creates a new ResourceQuotaManager +func NewResourceQuotaManager(kubeClient client.Interface) *ResourceQuotaManager { + + rm := &ResourceQuotaManager{ + kubeClient: kubeClient, + } + + // set the synchronization handler + rm.syncHandler = rm.syncResourceQuota + return rm +} + +// Run begins watching and syncing. +func (rm *ResourceQuotaManager) Run(period time.Duration) { + rm.syncTime = time.Tick(period) + go util.Forever(func() { rm.synchronize() }, period) +} + +func (rm *ResourceQuotaManager) synchronize() { + var resourceQuotas []api.ResourceQuota + list, err := rm.kubeClient.ResourceQuotas(api.NamespaceAll).List(labels.Everything()) + if err != nil { + glog.Errorf("Synchronization error: %v (%#v)", err, err) + } + resourceQuotas = list.Items + wg := sync.WaitGroup{} + wg.Add(len(resourceQuotas)) + for ix := range resourceQuotas { + go func(ix int) { + defer wg.Done() + glog.V(4).Infof("periodic sync of %v/%v", resourceQuotas[ix].Namespace, resourceQuotas[ix].Name) + err := rm.syncHandler(resourceQuotas[ix]) + if err != nil { + glog.Errorf("Error synchronizing: %v", err) + } + }(ix) + } + wg.Wait() +} + +// syncResourceQuota runs a complete sync of current status +func (rm *ResourceQuotaManager) syncResourceQuota(quota api.ResourceQuota) (err error) { + + // dirty tracks if the usage status differs from the previous sync, + // if so, we send a new usage with latest status + // if this is our first sync, it will be dirty by default, since we need track usage + dirty := quota.Status.Hard == nil || quota.Status.Used == nil + + // Create a usage object that is based on the quota resource version + usage := api.ResourceQuotaUsage{ + ObjectMeta: api.ObjectMeta{ + Name: quota.Name, + Namespace: quota.Namespace, + ResourceVersion: quota.ResourceVersion}, + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{}, + Used: api.ResourceList{}, + }, + } + // populate the usage with the current observed hard/used limits + usage.Status.Hard = quota.Spec.Hard + usage.Status.Used = quota.Status.Used + + set := map[api.ResourceName]bool{} + for k := range usage.Status.Hard { + set[k] = true + } + + pods := &api.PodList{} + if set[api.ResourcePods] || set[api.ResourceMemory] || set[api.ResourceCPU] { + pods, err = rm.kubeClient.Pods(usage.Namespace).List(labels.Everything()) + if err != nil { + return err + } + } + + // iterate over each resource, and update observation + for k := range usage.Status.Hard { + + // look if there is a used value, if none, we are definitely dirty + prevQuantity, found := usage.Status.Used[k] + if !found { + dirty = true + } + + var value *resource.Quantity + + switch k { + case api.ResourcePods: + value = resource.NewQuantity(int64(len(pods.Items)), resource.DecimalSI) + case api.ResourceMemory: + val := int64(0) + for i := range pods.Items { + for j := range pods.Items[i].Spec.Containers { + val = val + pods.Items[i].Spec.Containers[j].Memory.Value() + } + } + value = resource.NewQuantity(int64(val), resource.DecimalSI) + case api.ResourceCPU: + val := int64(0) + for i := range pods.Items { + for j := range pods.Items[i].Spec.Containers { + val = val + pods.Items[i].Spec.Containers[j].CPU.MilliValue() + } + } + value = resource.NewMilliQuantity(int64(val), resource.DecimalSI) + case api.ResourceServices: + items, err := rm.kubeClient.Services(usage.Namespace).List(labels.Everything()) + if err != nil { + return err + } + value = resource.NewQuantity(int64(len(items.Items)), resource.DecimalSI) + case api.ResourceReplicationControllers: + items, err := rm.kubeClient.ReplicationControllers(usage.Namespace).List(labels.Everything()) + if err != nil { + return err + } + value = resource.NewQuantity(int64(len(items.Items)), resource.DecimalSI) + case api.ResourceQuotas: + items, err := rm.kubeClient.ResourceQuotas(usage.Namespace).List(labels.Everything()) + if err != nil { + return err + } + value = resource.NewQuantity(int64(len(items.Items)), resource.DecimalSI) + } + + // ignore fields we do not understand (assume another controller is tracking it) + if value != nil { + // see if the value has changed + dirty = dirty || (value.Value() != prevQuantity.Value()) + // just update the value + usage.Status.Used[k] = *value + } + } + + // update the usage only if it changed + if dirty { + return rm.kubeClient.ResourceQuotaUsages(usage.Namespace).Create(&usage) + } + return nil +}