Merge pull request #36889 from wojtek-t/reuse_fields_and_labels

Automatic merge from submit-queue

Reuse fields and labels

This should significantly reduce memory allocations in apiserver in large cluster.
Explanation:
- every kubelet is refreshing watch every 5-10 minutes (this generally is not causing relist - it just renews watch)
- that means, in 5000-node cluster, we are issuing ~10 watches per second
- since we don't have "watch heartbets", the watch is issued from previously received resourceVersion
- to make some assumption, let's assume pods are evenly spread across pods, and writes for them are evenly spread - that means, that a given kubelet is interested in 1 per 5000 pod changes
- with that assumption, each watch, has to process 2500 (on average) previous watch events
- for each of such even, we are currently computing fields.

This PR is fixing this problem.
pull/6/head
Kubernetes Submit Queue 2016-12-02 21:49:43 -08:00 committed by GitHub
commit cd560926bd
78 changed files with 558 additions and 362 deletions

View File

@ -40,8 +40,15 @@ func NewREST(config *storagebackend.Config, storageDecorator generic.StorageDeco
newListFunc := func() runtime.Object { return &testgroup.TestTypeList{} }
// Usually you should reuse your RESTCreateStrategy.
strategy := &NotNamespaceScoped{}
getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) {
testObj, ok := obj.(*testgroup.TestType)
if !ok {
return nil, nil, fmt.Errorf("not a TestType")
}
return labels.Set(testObj.Labels), nil, nil
}
storageInterface, _ := storageDecorator(
config, 100, &testgroup.TestType{}, prefix, strategy, newListFunc, storage.NoTriggerPublisher)
config, 100, &testgroup.TestType{}, prefix, strategy, newListFunc, getAttrs, storage.NoTriggerPublisher)
store := &registry.Store{
NewFunc: func() runtime.Object { return &testgroup.TestType{} },
// NewListFunc returns an object capable of storing results of an etcd list.

View File

@ -56,6 +56,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
cluster.Strategy,
newListFunc,
cluster.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -45,17 +45,20 @@ func ClusterToSelectableFields(cluster *federation.Cluster) fields.Set {
return generic.ObjectMetaFieldsSet(&cluster.ObjectMeta, false)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
cluster, ok := obj.(*federation.Cluster)
if !ok {
return nil, nil, fmt.Errorf("given object is not a cluster.")
}
return labels.Set(cluster.ObjectMeta.Labels), ClusterToSelectableFields(cluster), nil
}
func MatchCluster(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
cluster, ok := obj.(*federation.Cluster)
if !ok {
return nil, nil, fmt.Errorf("given object is not a cluster.")
}
return labels.Set(cluster.ObjectMeta.Labels), ClusterToSelectableFields(cluster), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -45,6 +45,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
petset.Strategy,
newListFunc,
petset.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -102,19 +102,22 @@ func StatefulSetToSelectableFields(statefulSet *apps.StatefulSet) fields.Set {
return generic.ObjectMetaFieldsSet(&statefulSet.ObjectMeta, true)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
statefulSet, ok := obj.(*apps.StatefulSet)
if !ok {
return nil, nil, fmt.Errorf("given object is not an StatefulSet.")
}
return labels.Set(statefulSet.ObjectMeta.Labels), StatefulSetToSelectableFields(statefulSet), nil
}
// MatchStatefulSet is the filter used by the generic etcd backend to watch events
// from etcd to clients of the apiserver only interested in specific labels/fields.
func MatchStatefulSet(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
statefulSet, ok := obj.(*apps.StatefulSet)
if !ok {
return nil, nil, fmt.Errorf("given object is not an StatefulSet.")
}
return labels.Set(statefulSet.ObjectMeta.Labels), StatefulSetToSelectableFields(statefulSet), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -44,6 +44,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
horizontalpodautoscaler.Strategy,
newListFunc,
horizontalpodautoscaler.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -88,17 +88,20 @@ func AutoscalerToSelectableFields(hpa *autoscaling.HorizontalPodAutoscaler) fiel
return nil
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
hpa, ok := obj.(*autoscaling.HorizontalPodAutoscaler)
if !ok {
return nil, nil, fmt.Errorf("given object is not a horizontal pod autoscaler.")
}
return labels.Set(hpa.ObjectMeta.Labels), AutoscalerToSelectableFields(hpa), nil
}
func MatchAutoscaler(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
hpa, ok := obj.(*autoscaling.HorizontalPodAutoscaler)
if !ok {
return nil, nil, fmt.Errorf("given object is not a horizontal pod autoscaler.")
}
return labels.Set(hpa.ObjectMeta.Labels), AutoscalerToSelectableFields(hpa), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -45,6 +45,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
cronjob.Strategy,
newListFunc,
cronjob.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -102,19 +102,22 @@ func CronJobToSelectableFields(scheduledJob *batch.CronJob) fields.Set {
return generic.ObjectMetaFieldsSet(&scheduledJob.ObjectMeta, true)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
scheduledJob, ok := obj.(*batch.CronJob)
if !ok {
return nil, nil, fmt.Errorf("Given object is not a scheduled job.")
}
return labels.Set(scheduledJob.ObjectMeta.Labels), CronJobToSelectableFields(scheduledJob), nil
}
// MatchCronJob is the filter used by the generic etcd backend to route
// watch events from etcd to clients of the apiserver only interested in specific
// labels/fields.
func MatchCronJob(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
scheduledJob, ok := obj.(*batch.CronJob)
if !ok {
return nil, nil, fmt.Errorf("Given object is not a scheduled job.")
}
return labels.Set(scheduledJob.ObjectMeta.Labels), CronJobToSelectableFields(scheduledJob), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -45,6 +45,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
job.Strategy,
newListFunc,
job.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -164,19 +164,22 @@ func JobToSelectableFields(job *batch.Job) fields.Set {
return generic.MergeFieldsSets(objectMetaFieldsSet, specificFieldsSet)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
job, ok := obj.(*batch.Job)
if !ok {
return nil, nil, fmt.Errorf("Given object is not a job.")
}
return labels.Set(job.ObjectMeta.Labels), JobToSelectableFields(job), nil
}
// MatchJob is the filter used by the generic etcd backend to route
// watch events from etcd to clients of the apiserver only interested in specific
// labels/fields.
func MatchJob(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
job, ok := obj.(*batch.Job)
if !ok {
return nil, nil, fmt.Errorf("Given object is not a job.")
}
return labels.Set(job.ObjectMeta.Labels), JobToSelectableFields(job), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -47,6 +47,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *ApprovalREST) {
prefix,
csrregistry.Strategy,
newListFunc,
csrregistry.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -168,18 +168,21 @@ func (csrApprovalStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Obje
return validation.ValidateCertificateSigningRequestUpdate(obj.(*certificates.CertificateSigningRequest), old.(*certificates.CertificateSigningRequest))
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
sa, ok := obj.(*certificates.CertificateSigningRequest)
if !ok {
return nil, nil, fmt.Errorf("not a CertificateSigningRequest")
}
return labels.Set(sa.Labels), SelectableFields(sa), nil
}
// Matcher returns a generic matcher for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
sa, ok := obj.(*certificates.CertificateSigningRequest)
if !ok {
return nil, nil, fmt.Errorf("not a CertificateSigningRequest")
}
return labels.Set(sa.Labels), SelectableFields(sa), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -43,6 +43,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
configmap.Strategy,
newListFunc,
configmap.GetAttrs,
storage.NoTriggerPublisher)
store := &registry.Store{

View File

@ -88,18 +88,20 @@ func ConfigMapToSelectableFields(cfg *api.ConfigMap) fields.Set {
return generic.ObjectMetaFieldsSet(&cfg.ObjectMeta, true)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
cfg, ok := obj.(*api.ConfigMap)
if !ok {
return nil, nil, fmt.Errorf("given object is not a ConfigMap")
}
return labels.Set(cfg.ObjectMeta.Labels), ConfigMapToSelectableFields(cfg), nil
}
// MatchConfigMap returns a generic matcher for a given label and field selector.
func MatchConfigMap(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
cfg, ok := obj.(*api.ConfigMap)
if !ok {
return nil, nil, fmt.Errorf("given object is not a ConfigMap")
}
return labels.Set(cfg.ObjectMeta.Labels), ConfigMapToSelectableFields(cfg), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -69,6 +69,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
controller.Strategy,
newListFunc,
controller.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -118,20 +118,23 @@ func ControllerToSelectableFields(controller *api.ReplicationController) fields.
return generic.MergeFieldsSets(objectMetaFieldsSet, controllerSpecificFieldsSet)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
rc, ok := obj.(*api.ReplicationController)
if !ok {
return nil, nil, fmt.Errorf("Given object is not a replication controller.")
}
return labels.Set(rc.ObjectMeta.Labels), ControllerToSelectableFields(rc), nil
}
// MatchController is the filter used by the generic etcd backend to route
// watch events from etcd to clients of the apiserver only interested in specific
// labels/fields.
func MatchController(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
rc, ok := obj.(*api.ReplicationController)
if !ok {
return nil, nil, fmt.Errorf("Given object is not a replication controller.")
}
return labels.Set(rc.ObjectMeta.Labels), ControllerToSelectableFields(rc), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -42,6 +42,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
endpoint.Strategy,
newListFunc,
endpoint.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -79,27 +79,21 @@ func (endpointsStrategy) AllowUnconditionalUpdate() bool {
return true
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
return nil, nil, fmt.Errorf("invalid object type %#v", obj)
}
return endpoints.Labels, EndpointsToSelectableFields(endpoints), nil
}
// MatchEndpoints returns a generic matcher for a given label and field selector.
func MatchEndpoints(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate {
return pkgstorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
return nil, nil, fmt.Errorf("invalid object type %#v", obj)
}
// Compute fields only if field selectors is non-empty
// (otherwise those won't be used).
// Those are generally also not needed if label selector does
// not match labels, but additional computation of it is expensive.
var endpointsFields fields.Set
if !field.Empty() {
endpointsFields = EndpointsToSelectableFields(endpoints)
}
return endpoints.Labels, endpointsFields, nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -70,17 +70,20 @@ func (eventStrategy) AllowUnconditionalUpdate() bool {
return true
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
event, ok := obj.(*api.Event)
if !ok {
return nil, nil, fmt.Errorf("not an event")
}
return labels.Set(event.Labels), EventToSelectableFields(event), nil
}
func MatchEvent(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
event, ok := obj.(*api.Event)
if !ok {
return nil, nil, fmt.Errorf("not an event")
}
return labels.Set(event.Labels), EventToSelectableFields(event), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -42,6 +42,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
limitrange.Strategy,
newListFunc,
limitrange.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -85,16 +85,19 @@ func (limitrangeStrategy) Export(api.Context, runtime.Object, bool) error {
return nil
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
lr, ok := obj.(*api.LimitRange)
if !ok {
return nil, nil, fmt.Errorf("given object is not a limit range.")
}
return labels.Set(lr.ObjectMeta.Labels), LimitRangeToSelectableFields(lr), nil
}
func MatchLimitRange(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
lr, ok := obj.(*api.LimitRange)
if !ok {
return nil, nil, fmt.Errorf("given object is not a limit range.")
}
return labels.Set(lr.ObjectMeta.Labels), LimitRangeToSelectableFields(lr), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -60,6 +60,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *FinalizeREST) {
prefix,
namespace.Strategy,
newListFunc,
namespace.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -135,18 +135,21 @@ func (namespaceFinalizeStrategy) PrepareForUpdate(ctx api.Context, obj, old runt
newNamespace.Status = oldNamespace.Status
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
namespaceObj, ok := obj.(*api.Namespace)
if !ok {
return nil, nil, fmt.Errorf("not a namespace")
}
return labels.Set(namespaceObj.Labels), NamespaceToSelectableFields(namespaceObj), nil
}
// MatchNamespace returns a generic matcher for a given label and field selector.
func MatchNamespace(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
namespaceObj, ok := obj.(*api.Namespace)
if !ok {
return nil, nil, fmt.Errorf("not a namespace")
}
return labels.Set(namespaceObj.Labels), NamespaceToSelectableFields(namespaceObj), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -79,6 +79,7 @@ func NewStorage(opts generic.RESTOptions, kubeletClientConfig client.KubeletClie
prefix,
node.Strategy,
newListFunc,
node.GetAttrs,
node.NodeNameTriggerFunc)
store := &registry.Store{

View File

@ -144,27 +144,21 @@ func NodeToSelectableFields(node *api.Node) fields.Set {
return generic.MergeFieldsSets(objectMetaFieldsSet, specificFieldsSet)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
nodeObj, ok := obj.(*api.Node)
if !ok {
return nil, nil, fmt.Errorf("not a node")
}
return labels.Set(nodeObj.ObjectMeta.Labels), NodeToSelectableFields(nodeObj), nil
}
// MatchNode returns a generic matcher for a given label and field selector.
func MatchNode(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate {
return pkgstorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
nodeObj, ok := obj.(*api.Node)
if !ok {
return nil, nil, fmt.Errorf("not a node")
}
// Compute fields only if field selectors is non-empty
// (otherwise those won't be used).
// Those are generally also not needed if label selector does
// not match labels, but additional computation of it is expensive.
var nodeFields fields.Set
if !field.Empty() {
nodeFields = NodeToSelectableFields(nodeObj)
}
return labels.Set(nodeObj.ObjectMeta.Labels), nodeFields, nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
IndexFields: []string{"metadata.name"},
}
}

View File

@ -43,6 +43,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
persistentvolume.Strategy,
newListFunc,
persistentvolume.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -95,18 +95,21 @@ func (persistentvolumeStatusStrategy) ValidateUpdate(ctx api.Context, obj, old r
return validation.ValidatePersistentVolumeStatusUpdate(obj.(*api.PersistentVolume), old.(*api.PersistentVolume))
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
persistentvolumeObj, ok := obj.(*api.PersistentVolume)
if !ok {
return nil, nil, fmt.Errorf("not a persistentvolume")
}
return labels.Set(persistentvolumeObj.Labels), PersistentVolumeToSelectableFields(persistentvolumeObj), nil
}
// MatchPersistentVolume returns a generic matcher for a given label and field selector.
func MatchPersistentVolumes(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
persistentvolumeObj, ok := obj.(*api.PersistentVolume)
if !ok {
return nil, nil, fmt.Errorf("not a persistentvolume")
}
return labels.Set(persistentvolumeObj.Labels), PersistentVolumeToSelectableFields(persistentvolumeObj), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -43,6 +43,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
persistentvolumeclaim.Strategy,
newListFunc,
persistentvolumeclaim.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -95,18 +95,21 @@ func (persistentvolumeclaimStatusStrategy) ValidateUpdate(ctx api.Context, obj,
return validation.ValidatePersistentVolumeClaimStatusUpdate(obj.(*api.PersistentVolumeClaim), old.(*api.PersistentVolumeClaim))
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
persistentvolumeclaimObj, ok := obj.(*api.PersistentVolumeClaim)
if !ok {
return nil, nil, fmt.Errorf("not a persistentvolumeclaim")
}
return labels.Set(persistentvolumeclaimObj.Labels), PersistentVolumeClaimToSelectableFields(persistentvolumeclaimObj), nil
}
// MatchPersistentVolumeClaim returns a generic matcher for a given label and field selector.
func MatchPersistentVolumeClaim(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
persistentvolumeclaimObj, ok := obj.(*api.PersistentVolumeClaim)
if !ok {
return nil, nil, fmt.Errorf("not a persistentvolumeclaim")
}
return labels.Set(persistentvolumeclaimObj.Labels), PersistentVolumeClaimToSelectableFields(persistentvolumeclaimObj), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -69,6 +69,7 @@ func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTr
prefix,
pod.Strategy,
newListFunc,
pod.GetAttrs,
pod.NodeNameTriggerFunc,
)

View File

@ -155,27 +155,21 @@ func (podStatusStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object
return validation.ValidatePodStatusUpdate(obj.(*api.Pod), old.(*api.Pod))
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, nil, fmt.Errorf("not a pod")
}
return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), nil
}
// MatchPod returns a generic matcher for a given label and field selector.
func MatchPod(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, nil, fmt.Errorf("not a pod")
}
// Compute fields only if field selectors is non-empty
// (otherwise those won't be used).
// Those are generally also not needed if label selector does
// not match labels, but additional computation of it is expensive.
var podFields fields.Set
if !field.Empty() {
podFields = PodToSelectableFields(pod)
}
return labels.Set(pod.ObjectMeta.Labels), podFields, nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
IndexFields: []string{"spec.nodeName"},
}
}

View File

@ -42,6 +42,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
podtemplate.Strategy,
newListFunc,
podtemplate.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -86,16 +86,19 @@ func PodTemplateToSelectableFields(podTemplate *api.PodTemplate) fields.Set {
return nil
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
pt, ok := obj.(*api.PodTemplate)
if !ok {
return nil, nil, fmt.Errorf("given object is not a pod template.")
}
return labels.Set(pt.ObjectMeta.Labels), PodTemplateToSelectableFields(pt), nil
}
func MatchPodTemplate(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pt, ok := obj.(*api.PodTemplate)
if !ok {
return nil, nil, fmt.Errorf("given object is not a pod template.")
}
return labels.Set(pt.ObjectMeta.Labels), PodTemplateToSelectableFields(pt), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -43,6 +43,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
resourcequota.Strategy,
newListFunc,
resourcequota.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -98,18 +98,21 @@ func (resourcequotaStatusStrategy) ValidateUpdate(ctx api.Context, obj, old runt
return validation.ValidateResourceQuotaStatusUpdate(obj.(*api.ResourceQuota), old.(*api.ResourceQuota))
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
resourcequotaObj, ok := obj.(*api.ResourceQuota)
if !ok {
return nil, nil, fmt.Errorf("not a resourcequota")
}
return labels.Set(resourcequotaObj.Labels), ResourceQuotaToSelectableFields(resourcequotaObj), nil
}
// MatchResourceQuota returns a generic matcher for a given label and field selector.
func MatchResourceQuota(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
resourcequotaObj, ok := obj.(*api.ResourceQuota)
if !ok {
return nil, nil, fmt.Errorf("not a resourcequota")
}
return labels.Set(resourcequotaObj.Labels), ResourceQuotaToSelectableFields(resourcequotaObj), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -42,6 +42,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
secret.Strategy,
newListFunc,
secret.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -94,18 +94,21 @@ func (s strategy) Export(ctx api.Context, obj runtime.Object, exact bool) error
return nil
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
secret, ok := obj.(*api.Secret)
if !ok {
return nil, nil, fmt.Errorf("not a secret")
}
return labels.Set(secret.Labels), SelectableFields(secret), nil
}
// Matcher returns a generic matcher for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
secret, ok := obj.(*api.Secret)
if !ok {
return nil, nil, fmt.Errorf("not a secret")
}
return labels.Set(secret.Labels), SelectableFields(secret), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -43,6 +43,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
service.Strategy,
newListFunc,
service.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -101,17 +101,20 @@ func (svcStrategy) Export(ctx api.Context, obj runtime.Object, exact bool) error
return nil
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
service, ok := obj.(*api.Service)
if !ok {
return nil, nil, fmt.Errorf("Given object is not a service")
}
return labels.Set(service.ObjectMeta.Labels), ServiceToSelectableFields(service), nil
}
func MatchServices(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
service, ok := obj.(*api.Service)
if !ok {
return nil, nil, fmt.Errorf("Given object is not a service")
}
return labels.Set(service.ObjectMeta.Labels), ServiceToSelectableFields(service), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -42,6 +42,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
serviceaccount.Strategy,
newListFunc,
serviceaccount.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -77,18 +77,21 @@ func (strategy) AllowUnconditionalUpdate() bool {
return true
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
sa, ok := obj.(*api.ServiceAccount)
if !ok {
return nil, nil, fmt.Errorf("not a serviceaccount")
}
return labels.Set(sa.Labels), SelectableFields(sa), nil
}
// Matcher returns a generic matcher for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
sa, ok := obj.(*api.ServiceAccount)
if !ok {
return nil, nil, fmt.Errorf("not a serviceaccount")
}
return labels.Set(sa.Labels), SelectableFields(sa), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -45,6 +45,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
daemonset.Strategy,
newListFunc,
daemonset.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -110,20 +110,23 @@ func DaemonSetToSelectableFields(daemon *extensions.DaemonSet) fields.Set {
return generic.ObjectMetaFieldsSet(&daemon.ObjectMeta, true)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
ds, ok := obj.(*extensions.DaemonSet)
if !ok {
return nil, nil, fmt.Errorf("given object is not a ds.")
}
return labels.Set(ds.ObjectMeta.Labels), DaemonSetToSelectableFields(ds), nil
}
// MatchSetDaemon is the filter used by the generic etcd backend to route
// watch events from etcd to clients of the apiserver only interested in specific
// labels/fields.
func MatchDaemonSet(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
ds, ok := obj.(*extensions.DaemonSet)
if !ok {
return nil, nil, fmt.Errorf("given object is not a ds.")
}
return labels.Set(ds.ObjectMeta.Labels), DaemonSetToSelectableFields(ds), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -71,6 +71,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *RollbackREST) {
prefix,
deployment.Strategy,
newListFunc,
deployment.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -128,19 +128,22 @@ func DeploymentToSelectableFields(deployment *extensions.Deployment) fields.Set
return generic.ObjectMetaFieldsSet(&deployment.ObjectMeta, true)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
deployment, ok := obj.(*extensions.Deployment)
if !ok {
return nil, nil, fmt.Errorf("given object is not a deployment.")
}
return labels.Set(deployment.ObjectMeta.Labels), DeploymentToSelectableFields(deployment), nil
}
// MatchDeployment is the filter used by the generic etcd backend to route
// watch events from etcd to clients of the apiserver only interested in specific
// labels/fields.
func MatchDeployment(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
deployment, ok := obj.(*extensions.Deployment)
if !ok {
return nil, nil, fmt.Errorf("given object is not a deployment.")
}
return labels.Set(deployment.ObjectMeta.Labels), DeploymentToSelectableFields(deployment), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -45,6 +45,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
ingress.Strategy,
newListFunc,
ingress.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -103,20 +103,23 @@ func IngressToSelectableFields(ingress *extensions.Ingress) fields.Set {
return generic.ObjectMetaFieldsSet(&ingress.ObjectMeta, true)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
ingress, ok := obj.(*extensions.Ingress)
if !ok {
return nil, nil, fmt.Errorf("Given object is not an Ingress.")
}
return labels.Set(ingress.ObjectMeta.Labels), IngressToSelectableFields(ingress), nil
}
// MatchIngress is the filter used by the generic etcd backend to ingress
// watch events from etcd to clients of the apiserver only interested in specific
// labels/fields.
func MatchIngress(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
ingress, ok := obj.(*extensions.Ingress)
if !ok {
return nil, nil, fmt.Errorf("Given object is not an Ingress.")
}
return labels.Set(ingress.ObjectMeta.Labels), IngressToSelectableFields(ingress), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -44,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
networkpolicy.Strategy,
newListFunc,
networkpolicy.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -96,18 +96,21 @@ func NetworkPolicyToSelectableFields(networkPolicy *extensions.NetworkPolicy) fi
return generic.ObjectMetaFieldsSet(&networkPolicy.ObjectMeta, true)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
networkPolicy, ok := obj.(*extensions.NetworkPolicy)
if !ok {
return nil, nil, fmt.Errorf("given object is not a NetworkPolicy.")
}
return labels.Set(networkPolicy.ObjectMeta.Labels), NetworkPolicyToSelectableFields(networkPolicy), nil
}
// MatchNetworkPolicy is the filter used by the generic etcd backend to watch events
// from etcd to clients of the apiserver only interested in specific labels/fields.
func MatchNetworkPolicy(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
networkPolicy, ok := obj.(*extensions.NetworkPolicy)
if !ok {
return nil, nil, fmt.Errorf("given object is not a NetworkPolicy.")
}
return labels.Set(networkPolicy.ObjectMeta.Labels), NetworkPolicyToSelectableFields(networkPolicy), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -44,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
podsecuritypolicy.Strategy,
newListFunc,
podsecuritypolicy.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -74,18 +74,21 @@ func (strategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) field.E
return validation.ValidatePodSecurityPolicyUpdate(old.(*extensions.PodSecurityPolicy), obj.(*extensions.PodSecurityPolicy))
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
psp, ok := obj.(*extensions.PodSecurityPolicy)
if !ok {
return nil, nil, fmt.Errorf("given object is not a pod security policy.")
}
return labels.Set(psp.ObjectMeta.Labels), PodSecurityPolicyToSelectableFields(psp), nil
}
// Matcher returns a generic matcher for a given label and field selector.
func MatchPodSecurityPolicy(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
psp, ok := obj.(*extensions.PodSecurityPolicy)
if !ok {
return nil, nil, fmt.Errorf("given object is not a pod security policy.")
}
return labels.Set(psp.ObjectMeta.Labels), PodSecurityPolicyToSelectableFields(psp), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -68,6 +68,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
replicaset.Strategy,
newListFunc,
replicaset.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -119,20 +119,23 @@ func ReplicaSetToSelectableFields(rs *extensions.ReplicaSet) fields.Set {
return generic.MergeFieldsSets(objectMetaFieldsSet, rsSpecificFieldsSet)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
rs, ok := obj.(*extensions.ReplicaSet)
if !ok {
return nil, nil, fmt.Errorf("Given object is not a ReplicaSet.")
}
return labels.Set(rs.ObjectMeta.Labels), ReplicaSetToSelectableFields(rs), nil
}
// MatchReplicaSet is the filter used by the generic etcd backend to route
// watch events from etcd to clients of the apiserver only interested in specific
// labels/fields.
func MatchReplicaSet(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
rs, ok := obj.(*extensions.ReplicaSet)
if !ok {
return nil, nil, fmt.Errorf("Given object is not a ReplicaSet.")
}
return labels.Set(rs.ObjectMeta.Labels), ReplicaSetToSelectableFields(rs), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -77,18 +77,21 @@ func (strategy) AllowUnconditionalUpdate() bool {
return true
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
tpr, ok := obj.(*extensions.ThirdPartyResource)
if !ok {
return nil, nil, fmt.Errorf("not a ThirdPartyResource")
}
return labels.Set(tpr.Labels), SelectableFields(tpr), nil
}
// Matcher returns a generic matcher for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
tpr, ok := obj.(*extensions.ThirdPartyResource)
if !ok {
return nil, nil, fmt.Errorf("not a ThirdPartyResource")
}
return labels.Set(tpr.Labels), SelectableFields(tpr), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -74,18 +74,21 @@ func (strategy) AllowUnconditionalUpdate() bool {
return true
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
tprd, ok := obj.(*extensions.ThirdPartyResourceData)
if !ok {
return nil, nil, fmt.Errorf("not a ThirdPartyResourceData")
}
return labels.Set(tprd.Labels), SelectableFields(tprd), nil
}
// Matcher returns a generic matcher for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
tprd, ok := obj.(*extensions.ThirdPartyResourceData)
if !ok {
return nil, nil, fmt.Errorf("not a ThirdPartyResourceData")
}
return labels.Set(tprd.Labels), SelectableFields(tprd), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -23,6 +23,7 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/api/rest:go_default_library",
"//pkg/fields:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/storage:go_default_library",
"//pkg/storage/storagebackend:go_default_library",

View File

@ -18,6 +18,8 @@ package registry
import (
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
@ -34,6 +36,7 @@ func StorageWithCacher(
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object,
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
s, d := generic.NewRawStorage(storageConfig)
@ -46,6 +49,7 @@ func StorageWithCacher(
Type: objectType,
ResourcePrefix: resourcePrefix,
NewListFunc: newListFunc,
GetAttrsFunc: getAttrsFunc,
TriggerPublisherFunc: triggerFunc,
Codec: storageConfig.Codec,
}

View File

@ -103,6 +103,11 @@ func NewTestGenericStoreRegistry(t *testing.T) (factory.DestroyFunc, *Store) {
return newTestGenericStoreRegistry(t, false)
}
func getPodAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*api.Pod)
return labels.Set{"name": pod.ObjectMeta.Name}, nil, nil
}
// matchPodName returns selection predicate that matches any pod with name in the set.
// Makes testing simpler.
func matchPodName(names ...string) storage.SelectionPredicate {
@ -113,12 +118,9 @@ func matchPodName(names ...string) storage.SelectionPredicate {
panic("Labels requirement must validate successfully")
}
return storage.SelectionPredicate{
Label: labels.Everything().Add(*l),
Field: fields.Everything(),
GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) {
pod := obj.(*api.Pod)
return labels.Set{"name": pod.ObjectMeta.Name}, nil, nil
},
Label: labels.Everything().Add(*l),
Field: fields.Everything(),
GetAttrs: getPodAttrs,
}
}
@ -1249,6 +1251,7 @@ func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (factory.De
Type: &api.Pod{},
ResourcePrefix: podPrefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
GetAttrsFunc: getPodAttrs,
NewListFunc: func() runtime.Object { return &api.PodList{} },
Codec: sc.Codec,
}

View File

@ -19,6 +19,8 @@ package generic
import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
@ -34,6 +36,7 @@ type StorageDecorator func(
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object,
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc)
// Returns given 'storageInterface' without any decoration.
@ -44,6 +47,7 @@ func UndecoratedStorage(
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object,
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
return NewRawStorage(config)
}

View File

@ -45,6 +45,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix,
poddisruptionbudget.Strategy,
newListFunc,
poddisruptionbudget.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -102,19 +102,22 @@ func PodDisruptionBudgetToSelectableFields(podDisruptionBudget *policy.PodDisrup
return generic.ObjectMetaFieldsSet(&podDisruptionBudget.ObjectMeta, true)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
podDisruptionBudget, ok := obj.(*policy.PodDisruptionBudget)
if !ok {
return nil, nil, fmt.Errorf("given object is not a PodDisruptionBudget.")
}
return labels.Set(podDisruptionBudget.ObjectMeta.Labels), PodDisruptionBudgetToSelectableFields(podDisruptionBudget), nil
}
// MatchPodDisruptionBudget is the filter used by the generic etcd backend to watch events
// from etcd to clients of the apiserver only interested in specific labels/fields.
func MatchPodDisruptionBudget(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
podDisruptionBudget, ok := obj.(*policy.PodDisruptionBudget)
if !ok {
return nil, nil, fmt.Errorf("given object is not a PodDisruptionBudget.")
}
return labels.Set(podDisruptionBudget.ObjectMeta.Labels), PodDisruptionBudgetToSelectableFields(podDisruptionBudget), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -44,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
clusterrole.Strategy,
newListFunc,
clusterrole.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -101,18 +101,21 @@ func (s strategy) Export(ctx api.Context, obj runtime.Object, exact bool) error
return nil
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
role, ok := obj.(*rbac.ClusterRole)
if !ok {
return nil, nil, fmt.Errorf("not a ClusterRole")
}
return labels.Set(role.Labels), SelectableFields(role), nil
}
// Matcher returns a generic matcher for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
role, ok := obj.(*rbac.ClusterRole)
if !ok {
return nil, nil, fmt.Errorf("not a ClusterRole")
}
return labels.Set(role.Labels), SelectableFields(role), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -44,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
clusterrolebinding.Strategy,
newListFunc,
clusterrolebinding.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -101,18 +101,21 @@ func (s strategy) Export(ctx api.Context, obj runtime.Object, exact bool) error
return nil
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
roleBinding, ok := obj.(*rbac.ClusterRoleBinding)
if !ok {
return nil, nil, fmt.Errorf("not a ClusterRoleBinding")
}
return labels.Set(roleBinding.Labels), SelectableFields(roleBinding), nil
}
// Matcher returns a generic matcher for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
roleBinding, ok := obj.(*rbac.ClusterRoleBinding)
if !ok {
return nil, nil, fmt.Errorf("not a ClusterRoleBinding")
}
return labels.Set(roleBinding.Labels), SelectableFields(roleBinding), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -44,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
role.Strategy,
newListFunc,
role.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -101,18 +101,21 @@ func (s strategy) Export(ctx api.Context, obj runtime.Object, exact bool) error
return nil
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
role, ok := obj.(*rbac.Role)
if !ok {
return nil, nil, fmt.Errorf("not a Role")
}
return labels.Set(role.Labels), SelectableFields(role), nil
}
// Matcher returns a generic matcher for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
role, ok := obj.(*rbac.Role)
if !ok {
return nil, nil, fmt.Errorf("not a Role")
}
return labels.Set(role.Labels), SelectableFields(role), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -44,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
rolebinding.Strategy,
newListFunc,
rolebinding.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -101,18 +101,21 @@ func (s strategy) Export(ctx api.Context, obj runtime.Object, exact bool) error
return nil
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
roleBinding, ok := obj.(*rbac.RoleBinding)
if !ok {
return nil, nil, fmt.Errorf("not a RoleBinding")
}
return labels.Set(roleBinding.Labels), SelectableFields(roleBinding), nil
}
// Matcher returns a generic matcher for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
roleBinding, ok := obj.(*rbac.RoleBinding)
if !ok {
return nil, nil, fmt.Errorf("not a RoleBinding")
}
return labels.Set(roleBinding.Labels), SelectableFields(roleBinding), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -43,6 +43,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
storageclass.Strategy,
newListFunc,
storageclass.GetAttrs,
storage.NoTriggerPublisher,
)

View File

@ -77,19 +77,21 @@ func (storageClassStrategy) AllowUnconditionalUpdate() bool {
return true
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
cls, ok := obj.(*storage.StorageClass)
if !ok {
return nil, nil, fmt.Errorf("given object is not of type StorageClass")
}
return labels.Set(cls.ObjectMeta.Labels), StorageClassToSelectableFields(cls), nil
}
// MatchStorageClass returns a generic matcher for a given label and field selector.
func MatchStorageClasses(label labels.Selector, field fields.Selector) apistorage.SelectionPredicate {
return apistorage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
cls, ok := obj.(*storage.StorageClass)
if !ok {
return nil, nil, fmt.Errorf("given object is not of type StorageClass")
}
return labels.Set(cls.ObjectMeta.Labels), StorageClassToSelectableFields(cls), nil
},
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@ -85,6 +85,7 @@ go_test(
"//pkg/api/unversioned:go_default_library",
"//pkg/fields:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/registry/core/pod:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/storage:go_default_library",
"//pkg/storage/etcd:go_default_library",

View File

@ -31,6 +31,8 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -60,6 +62,9 @@ type CacherConfig struct {
// KeyFunc is used to get a key in the underyling storage for a given object.
KeyFunc func(runtime.Object) (string, error)
// GetAttrsFunc is used to get object labels and fields.
GetAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
// TriggerPublisherFunc is used for optimizing amount of watchers that
// needs to process an incoming event.
TriggerPublisherFunc TriggerPublisherFunc
@ -126,7 +131,7 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
}
}
type filterObjectFunc func(string, runtime.Object) bool
type watchFilterFunc func(string, labels.Set, fields.Set) bool
// Cacher is responsible for serving WATCH and LIST requests for a given
// resource from its internal cache and updating its cache in the background
@ -183,7 +188,7 @@ type Cacher struct {
// internal cache and updating its cache in the background based on the given
// configuration.
func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc)
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
// Give this error when it is constructed rather than when you get the
@ -327,7 +332,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
c.Lock()
defer c.Unlock()
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, pred), forget)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
@ -599,7 +604,7 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
}
}
func filterFunction(key string, p SelectionPredicate) filterObjectFunc {
func filterFunction(key string, p SelectionPredicate) func(string, runtime.Object) bool {
f := SimpleFilter(p)
filterFunc := func(objKey string, obj runtime.Object) bool {
if !hasPathPrefix(objKey, key) {
@ -610,6 +615,16 @@ func filterFunction(key string, p SelectionPredicate) filterObjectFunc {
return filterFunc
}
func watchFilterFunction(key string, p SelectionPredicate) watchFilterFunc {
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
if !hasPathPrefix(objKey, key) {
return false
}
return p.MatchesLabelsAndFields(label, field)
}
return filterFunc
}
// Returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
c.ready.wait()
@ -696,12 +711,12 @@ type cacheWatcher struct {
sync.Mutex
input chan watchCacheEvent
result chan watch.Event
filter filterObjectFunc
filter watchFilterFunc
stopped bool
forget func(bool)
}
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter filterObjectFunc, forget func(bool)) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
@ -779,10 +794,10 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) {
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.Object)
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
oldObjPasses := false
if event.PrevObject != nil {
oldObjPasses = c.filter(event.Key, event.PrevObject)
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields)
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
corepod "k8s.io/kubernetes/pkg/registry/core/pod"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
@ -60,6 +61,7 @@ func newTestCacher(s storage.Interface, cap int) *storage.Cacher {
Type: &api.Pod{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: corepod.GetAttrs,
NewListFunc: func() runtime.Object { return &api.PodList{} },
Codec: testapi.Default.Codec(),
}

View File

@ -46,12 +46,25 @@ func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) {
return false, err
}
matched := s.Label.Matches(labels)
if s.Field != nil {
if matched && s.Field != nil {
matched = (matched && s.Field.Matches(fields))
}
return matched, nil
}
// MatchesLabelsAndFields returns true if the given labels and fields
// match s.Label and s.Field.
func (s *SelectionPredicate) MatchesLabelsAndFields(l labels.Set, f fields.Set) bool {
if s.Label.Empty() && s.Field.Empty() {
return true
}
matched := s.Label.Matches(l)
if matched && s.Field != nil {
matched = (matched && s.Field.Matches(f))
}
return matched
}
// MatchesSingle will return (name, true) if and only if s.Field matches on the object's
// name.
func (s *SelectionPredicate) MatchesSingle() (string, bool) {

View File

@ -26,6 +26,8 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/clock"
@ -45,7 +47,11 @@ const (
type watchCacheEvent struct {
Type watch.EventType
Object runtime.Object
ObjLabels labels.Set
ObjFields fields.Set
PrevObject runtime.Object
PrevObjLabels labels.Set
PrevObjFields fields.Set
Key string
ResourceVersion uint64
}
@ -93,6 +99,9 @@ type watchCache struct {
// keyFunc is used to get a key in the underlying storage for a given object.
keyFunc func(runtime.Object) (string, error)
// getAttrsFunc is used to get labels and fields of an object.
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
// cache is used a cyclic buffer - its first element (with the smallest
// resourceVersion) is defined by startIndex, its last element is defined
// by endIndex (if cache is full it will be startIndex + capacity).
@ -122,10 +131,14 @@ type watchCache struct {
clock clock.Clock
}
func newWatchCache(capacity int, keyFunc func(runtime.Object) (string, error)) *watchCache {
func newWatchCache(
capacity int,
keyFunc func(runtime.Object) (string, error),
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)) *watchCache {
wc := &watchCache{
capacity: capacity,
keyFunc: keyFunc,
getAttrsFunc: getAttrsFunc,
cache: make([]watchCacheElement, capacity),
startIndex: 0,
endIndex: 0,
@ -213,14 +226,28 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
if err != nil {
return err
}
objLabels, objFields, err := w.getAttrsFunc(event.Object)
if err != nil {
return err
}
var prevObject runtime.Object
var prevObjLabels labels.Set
var prevObjFields fields.Set
if exists {
prevObject = previous.(*storeElement).Object
prevObjLabels, prevObjFields, err = w.getAttrsFunc(prevObject)
if err != nil {
return err
}
}
watchCacheEvent := watchCacheEvent{
Type: event.Type,
Object: event.Object,
ObjLabels: objLabels,
ObjFields: objFields,
PrevObject: prevObject,
PrevObjLabels: prevObjLabels,
PrevObjFields: prevObjFields,
Key: key,
ResourceVersion: resourceVersion,
}
@ -394,9 +421,15 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", elem)
}
objLabels, objFields, err := w.getAttrsFunc(elem.Object)
if err != nil {
return nil, err
}
result[i] = watchCacheEvent{
Type: watch.Added,
Object: elem.Object,
ObjLabels: objLabels,
ObjFields: objFields,
Key: elem.Key,
ResourceVersion: w.resourceVersion,
}

View File

@ -26,6 +26,8 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/clock"
"k8s.io/kubernetes/pkg/util/sets"
@ -48,7 +50,10 @@ func newTestWatchCache(capacity int) *watchCache {
keyFunc := func(obj runtime.Object) (string, error) {
return NamespaceKeyFunc("prefix", obj)
}
wc := newWatchCache(capacity, keyFunc)
getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) {
return nil, nil, nil
}
wc := newWatchCache(capacity, keyFunc, getAttrsFunc)
wc.clock = clock.NewFakeClock(time.Now())
return wc
}