2020-03-26 21:07:15 +00:00
/ *
Copyright 2019 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package flowcontrol
import (
"context"
"crypto/sha256"
"encoding/binary"
"encoding/json"
"fmt"
"math"
"sort"
"sync"
"time"
"github.com/pkg/errors"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
apitypes "k8s.io/apimachinery/pkg/types"
apierrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/apihelpers"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
2020-08-10 17:43:49 +00:00
"k8s.io/klog/v2"
2020-03-26 21:07:15 +00:00
2020-12-01 01:06:26 +00:00
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
flowcontrollister "k8s.io/client-go/listers/flowcontrol/v1beta1"
2020-03-26 21:07:15 +00:00
)
// This file contains a simple local (to the apiserver) controller
// that digests API Priority and Fairness config objects (FlowSchema
// and PriorityLevelConfiguration) into the data structure that the
// filter uses. At this first level of development this controller
// takes the simplest possible approach: whenever notified of any
// change to any config object, or when any priority level that is
// undesired becomes completely unused, all the config objects are
// read and processed as a whole.
// StartFunction begins the process of handlig a request. If the
// request gets queued then this function uses the given hashValue as
// the source of entropy as it shuffle-shards the request into a
// queue. The descr1 and descr2 values play no role in the logic but
// appear in log messages. This method does not return until the
// queuing, if any, for this request is done. If `execute` is false
// then `afterExecution` is irrelevant and the request should be
// rejected. Otherwise the request should be executed and
// `afterExecution` must be called exactly once.
type StartFunction func ( ctx context . Context , hashValue uint64 ) ( execute bool , afterExecution func ( ) )
// RequestDigest holds necessary info from request for flow-control
type RequestDigest struct {
RequestInfo * request . RequestInfo
User user . Info
}
// `*configController` maintains eventual consistency with the API
// objects that configure API Priority and Fairness, and provides a
// procedural interface to the configured behavior. The methods of
// this type and cfgMeal follow the convention that the suffix
// "Locked" means that the caller must hold the configController lock.
type configController struct {
2020-08-10 17:43:49 +00:00
queueSetFactory fq . QueueSetFactory
obsPairGenerator metrics . TimedObserverPairGenerator
2020-03-26 21:07:15 +00:00
// configQueue holds `(interface{})(0)` when the configuration
// objects need to be reprocessed.
configQueue workqueue . RateLimitingInterface
2020-12-01 01:06:26 +00:00
plLister flowcontrollister . PriorityLevelConfigurationLister
2020-03-26 21:07:15 +00:00
plInformerSynced cache . InformerSynced
2020-12-01 01:06:26 +00:00
fsLister flowcontrollister . FlowSchemaLister
2020-03-26 21:07:15 +00:00
fsInformerSynced cache . InformerSynced
2020-12-01 01:06:26 +00:00
flowcontrolClient flowcontrolclient . FlowcontrolV1beta1Interface
2020-03-26 21:07:15 +00:00
// serverConcurrencyLimit is the limit on the server's total
// number of non-exempt requests being served at once. This comes
// from server configuration.
serverConcurrencyLimit int
// requestWaitLimit comes from server configuration.
requestWaitLimit time . Duration
// This must be locked while accessing flowSchemas or
// priorityLevelStates. It is the lock involved in
// LockingWriteMultiple.
lock sync . Mutex
// flowSchemas holds the flow schema objects, sorted by increasing
// numerical (decreasing logical) matching precedence. Every
// FlowSchema in this slice is immutable.
flowSchemas apihelpers . FlowSchemaSequence
// priorityLevelStates maps the PriorityLevelConfiguration object
// name to the state for that level. Every name referenced from a
// member of `flowSchemas` has an entry here.
priorityLevelStates map [ string ] * priorityLevelState
}
// priorityLevelState holds the state specific to a priority level.
type priorityLevelState struct {
// the API object or prototype prescribing this level. Nothing
// reached through this pointer is mutable.
2020-12-01 01:06:26 +00:00
pl * flowcontrol . PriorityLevelConfiguration
2020-03-26 21:07:15 +00:00
// qsCompleter holds the QueueSetCompleter derived from `config`
// and `queues` if config is not exempt, nil otherwise.
qsCompleter fq . QueueSetCompleter
// The QueueSet for this priority level. This is nil if and only
// if the priority level is exempt.
queues fq . QueueSet
// quiescing==true indicates that this priority level should be
// removed when its queues have all drained. May be true only if
// queues is non-nil.
quiescing bool
// number of goroutines between Controller::Match and calling the
// returned StartFunction
numPending int
2020-08-10 17:43:49 +00:00
// Observers tracking number waiting, executing
obsPair metrics . TimedObserverPair
2020-03-26 21:07:15 +00:00
}
// NewTestableController is extra flexible to facilitate testing
func newTestableController (
informerFactory kubeinformers . SharedInformerFactory ,
2020-12-01 01:06:26 +00:00
flowcontrolClient flowcontrolclient . FlowcontrolV1beta1Interface ,
2020-03-26 21:07:15 +00:00
serverConcurrencyLimit int ,
requestWaitLimit time . Duration ,
2020-08-10 17:43:49 +00:00
obsPairGenerator metrics . TimedObserverPairGenerator ,
2020-03-26 21:07:15 +00:00
queueSetFactory fq . QueueSetFactory ,
) * configController {
2020-08-10 17:43:49 +00:00
cfgCtlr := & configController {
2020-03-26 21:07:15 +00:00
queueSetFactory : queueSetFactory ,
2020-08-10 17:43:49 +00:00
obsPairGenerator : obsPairGenerator ,
2020-03-26 21:07:15 +00:00
serverConcurrencyLimit : serverConcurrencyLimit ,
requestWaitLimit : requestWaitLimit ,
flowcontrolClient : flowcontrolClient ,
priorityLevelStates : make ( map [ string ] * priorityLevelState ) ,
}
klog . V ( 2 ) . Infof ( "NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s" , serverConcurrencyLimit , requestWaitLimit )
2020-08-10 17:43:49 +00:00
cfgCtlr . initializeConfigController ( informerFactory )
2020-03-26 21:07:15 +00:00
// ensure the data structure reflects the mandatory config
2020-08-10 17:43:49 +00:00
cfgCtlr . lockAndDigestConfigObjects ( nil , nil )
return cfgCtlr
2020-03-26 21:07:15 +00:00
}
// initializeConfigController sets up the controller that processes
// config API objects.
2020-08-10 17:43:49 +00:00
func ( cfgCtlr * configController ) initializeConfigController ( informerFactory kubeinformers . SharedInformerFactory ) {
cfgCtlr . configQueue = workqueue . NewNamedRateLimitingQueue ( workqueue . NewItemExponentialFailureRateLimiter ( 200 * time . Millisecond , 8 * time . Hour ) , "priority_and_fairness_config_queue" )
2020-12-01 01:06:26 +00:00
fci := informerFactory . Flowcontrol ( ) . V1beta1 ( )
2020-03-26 21:07:15 +00:00
pli := fci . PriorityLevelConfigurations ( )
fsi := fci . FlowSchemas ( )
2020-08-10 17:43:49 +00:00
cfgCtlr . plLister = pli . Lister ( )
cfgCtlr . plInformerSynced = pli . Informer ( ) . HasSynced
cfgCtlr . fsLister = fsi . Lister ( )
cfgCtlr . fsInformerSynced = fsi . Informer ( ) . HasSynced
2020-03-26 21:07:15 +00:00
pli . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc : func ( obj interface { } ) {
2020-12-01 01:06:26 +00:00
pl := obj . ( * flowcontrol . PriorityLevelConfiguration )
2020-03-26 21:07:15 +00:00
klog . V ( 7 ) . Infof ( "Triggered API priority and fairness config reloading due to creation of PLC %s" , pl . Name )
2020-08-10 17:43:49 +00:00
cfgCtlr . configQueue . Add ( 0 )
2020-03-26 21:07:15 +00:00
} ,
UpdateFunc : func ( oldObj , newObj interface { } ) {
2020-12-01 01:06:26 +00:00
newPL := newObj . ( * flowcontrol . PriorityLevelConfiguration )
oldPL := oldObj . ( * flowcontrol . PriorityLevelConfiguration )
2020-03-26 21:07:15 +00:00
if ! apiequality . Semantic . DeepEqual ( oldPL . Spec , newPL . Spec ) {
klog . V ( 7 ) . Infof ( "Triggered API priority and fairness config reloading due to spec update of PLC %s" , newPL . Name )
2020-08-10 17:43:49 +00:00
cfgCtlr . configQueue . Add ( 0 )
2020-03-26 21:07:15 +00:00
}
} ,
DeleteFunc : func ( obj interface { } ) {
name , _ := cache . DeletionHandlingMetaNamespaceKeyFunc ( obj )
klog . V ( 7 ) . Infof ( "Triggered API priority and fairness config reloading due to deletion of PLC %s" , name )
2020-08-10 17:43:49 +00:00
cfgCtlr . configQueue . Add ( 0 )
2020-03-26 21:07:15 +00:00
} } )
fsi . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc : func ( obj interface { } ) {
2020-12-01 01:06:26 +00:00
fs := obj . ( * flowcontrol . FlowSchema )
2020-03-26 21:07:15 +00:00
klog . V ( 7 ) . Infof ( "Triggered API priority and fairness config reloading due to creation of FS %s" , fs . Name )
2020-08-10 17:43:49 +00:00
cfgCtlr . configQueue . Add ( 0 )
2020-03-26 21:07:15 +00:00
} ,
UpdateFunc : func ( oldObj , newObj interface { } ) {
2020-12-01 01:06:26 +00:00
newFS := newObj . ( * flowcontrol . FlowSchema )
oldFS := oldObj . ( * flowcontrol . FlowSchema )
2020-03-26 21:07:15 +00:00
if ! apiequality . Semantic . DeepEqual ( oldFS . Spec , newFS . Spec ) {
klog . V ( 7 ) . Infof ( "Triggered API priority and fairness config reloading due to spec update of FS %s" , newFS . Name )
2020-08-10 17:43:49 +00:00
cfgCtlr . configQueue . Add ( 0 )
2020-03-26 21:07:15 +00:00
}
} ,
DeleteFunc : func ( obj interface { } ) {
name , _ := cache . DeletionHandlingMetaNamespaceKeyFunc ( obj )
klog . V ( 7 ) . Infof ( "Triggered API priority and fairness config reloading due to deletion of FS %s" , name )
2020-08-10 17:43:49 +00:00
cfgCtlr . configQueue . Add ( 0 )
2020-03-26 21:07:15 +00:00
} } )
}
2020-08-10 17:43:49 +00:00
// MaintainObservations keeps the observers from
// metrics.PriorityLevelConcurrencyObserverPairGenerator from falling
// too far behind
func ( cfgCtlr * configController ) MaintainObservations ( stopCh <- chan struct { } ) {
wait . Until ( cfgCtlr . updateObservations , 10 * time . Second , stopCh )
}
func ( cfgCtlr * configController ) updateObservations ( ) {
cfgCtlr . lock . Lock ( )
defer cfgCtlr . lock . Unlock ( )
for _ , plc := range cfgCtlr . priorityLevelStates {
if plc . queues != nil {
plc . queues . UpdateObservations ( )
}
}
}
func ( cfgCtlr * configController ) Run ( stopCh <- chan struct { } ) error {
defer cfgCtlr . configQueue . ShutDown ( )
2020-03-26 21:07:15 +00:00
klog . Info ( "Starting API Priority and Fairness config controller" )
2020-08-10 17:43:49 +00:00
if ok := cache . WaitForCacheSync ( stopCh , cfgCtlr . plInformerSynced , cfgCtlr . fsInformerSynced ) ; ! ok {
2020-03-26 21:07:15 +00:00
return fmt . Errorf ( "Never achieved initial sync" )
}
klog . Info ( "Running API Priority and Fairness config worker" )
2020-08-10 17:43:49 +00:00
wait . Until ( cfgCtlr . runWorker , time . Second , stopCh )
2020-03-26 21:07:15 +00:00
klog . Info ( "Shutting down API Priority and Fairness config worker" )
return nil
}
2020-08-10 17:43:49 +00:00
func ( cfgCtlr * configController ) runWorker ( ) {
for cfgCtlr . processNextWorkItem ( ) {
2020-03-26 21:07:15 +00:00
}
}
2020-08-10 17:43:49 +00:00
func ( cfgCtlr * configController ) processNextWorkItem ( ) bool {
obj , shutdown := cfgCtlr . configQueue . Get ( )
2020-03-26 21:07:15 +00:00
if shutdown {
return false
}
func ( obj interface { } ) {
2020-08-10 17:43:49 +00:00
defer cfgCtlr . configQueue . Done ( obj )
if ! cfgCtlr . syncOne ( ) {
cfgCtlr . configQueue . AddRateLimited ( obj )
2020-03-26 21:07:15 +00:00
} else {
2020-08-10 17:43:49 +00:00
cfgCtlr . configQueue . Forget ( obj )
2020-03-26 21:07:15 +00:00
}
} ( obj )
return true
}
// syncOne attempts to sync all the API Priority and Fairness config
// objects. It either succeeds and returns `true` or logs an error
// and returns `false`.
2020-08-10 17:43:49 +00:00
func ( cfgCtlr * configController ) syncOne ( ) bool {
2020-03-26 21:07:15 +00:00
all := labels . Everything ( )
2020-08-10 17:43:49 +00:00
newPLs , err := cfgCtlr . plLister . List ( all )
2020-03-26 21:07:15 +00:00
if err != nil {
klog . Errorf ( "Unable to list PriorityLevelConfiguration objects: %s" , err . Error ( ) )
return false
}
2020-08-10 17:43:49 +00:00
newFSs , err := cfgCtlr . fsLister . List ( all )
2020-03-26 21:07:15 +00:00
if err != nil {
klog . Errorf ( "Unable to list FlowSchema objects: %s" , err . Error ( ) )
return false
}
2020-08-10 17:43:49 +00:00
err = cfgCtlr . digestConfigObjects ( newPLs , newFSs )
2020-03-26 21:07:15 +00:00
if err == nil {
return true
}
klog . Error ( err )
return false
}
// cfgMeal is the data involved in the process of digesting the API
// objects that configure API Priority and Fairness. All the config
// objects are digested together, because this is the simplest way to
// cope with the various dependencies between objects. The process of
// digestion is done in four passes over config objects --- three
// passes over PriorityLevelConfigurations and one pass over the
// FlowSchemas --- with the work dvided among the passes according to
// those dependencies.
type cfgMeal struct {
2020-08-10 17:43:49 +00:00
cfgCtlr * configController
2020-03-26 21:07:15 +00:00
newPLStates map [ string ] * priorityLevelState
// The sum of the concurrency shares of the priority levels in the
// new configuration
shareSum float64
// These keep track of which mandatory priority level config
// objects have been digested
haveExemptPL , haveCatchAllPL bool
// Buffered FlowSchema status updates to do. Do them when the
// lock is not held, to avoid a deadlock due to such a request
// provoking a call into this controller while the lock held
// waiting on that request to complete.
fsStatusUpdates [ ] fsStatusUpdate
}
// A buffered set of status updates for a FlowSchema
type fsStatusUpdate struct {
2020-12-01 01:06:26 +00:00
flowSchema * flowcontrol . FlowSchema
condition flowcontrol . FlowSchemaCondition
oldValue flowcontrol . FlowSchemaCondition
2020-03-26 21:07:15 +00:00
}
// digestConfigObjects is given all the API objects that configure
2020-08-10 17:43:49 +00:00
// cfgCtlr and writes its consequent new configState.
2020-12-01 01:06:26 +00:00
func ( cfgCtlr * configController ) digestConfigObjects ( newPLs [ ] * flowcontrol . PriorityLevelConfiguration , newFSs [ ] * flowcontrol . FlowSchema ) error {
2020-08-10 17:43:49 +00:00
fsStatusUpdates := cfgCtlr . lockAndDigestConfigObjects ( newPLs , newFSs )
2020-03-26 21:07:15 +00:00
var errs [ ] error
for _ , fsu := range fsStatusUpdates {
enc , err := json . Marshal ( fsu . condition )
if err != nil {
// should never happen because these conditions are created here and well formed
panic ( fmt . Sprintf ( "Failed to json.Marshall(%#+v): %s" , fsu . condition , err . Error ( ) ) )
}
klog . V ( 4 ) . Infof ( "Writing Condition %s to FlowSchema %s because its previous value was %s" , string ( enc ) , fsu . flowSchema . Name , fcfmt . Fmt ( fsu . oldValue ) )
2020-08-10 17:43:49 +00:00
_ , err = cfgCtlr . flowcontrolClient . FlowSchemas ( ) . Patch ( context . TODO ( ) , fsu . flowSchema . Name , apitypes . StrategicMergePatchType , [ ] byte ( fmt . Sprintf ( ` { "status": { "conditions": [ %s ] } } ` , string ( enc ) ) ) , metav1 . PatchOptions { FieldManager : "api-priority-and-fairness-config-consumer-v1" } , "status" )
2020-03-26 21:07:15 +00:00
if err != nil {
errs = append ( errs , errors . Wrap ( err , fmt . Sprintf ( "failed to set a status.condition for FlowSchema %s" , fsu . flowSchema . Name ) ) )
}
}
if len ( errs ) == 0 {
return nil
}
return apierrors . NewAggregate ( errs )
}
2020-12-01 01:06:26 +00:00
func ( cfgCtlr * configController ) lockAndDigestConfigObjects ( newPLs [ ] * flowcontrol . PriorityLevelConfiguration , newFSs [ ] * flowcontrol . FlowSchema ) [ ] fsStatusUpdate {
2020-08-10 17:43:49 +00:00
cfgCtlr . lock . Lock ( )
defer cfgCtlr . lock . Unlock ( )
2020-03-26 21:07:15 +00:00
meal := cfgMeal {
2020-08-10 17:43:49 +00:00
cfgCtlr : cfgCtlr ,
2020-03-26 21:07:15 +00:00
newPLStates : make ( map [ string ] * priorityLevelState ) ,
}
meal . digestNewPLsLocked ( newPLs )
meal . digestFlowSchemasLocked ( newFSs )
meal . processOldPLsLocked ( )
// Supply missing mandatory PriorityLevelConfiguration objects
if ! meal . haveExemptPL {
2020-08-10 17:43:49 +00:00
meal . imaginePL ( fcboot . MandatoryPriorityLevelConfigurationExempt , cfgCtlr . requestWaitLimit )
2020-03-26 21:07:15 +00:00
}
if ! meal . haveCatchAllPL {
2020-08-10 17:43:49 +00:00
meal . imaginePL ( fcboot . MandatoryPriorityLevelConfigurationCatchAll , cfgCtlr . requestWaitLimit )
2020-03-26 21:07:15 +00:00
}
meal . finishQueueSetReconfigsLocked ( )
// The new config has been constructed
2020-08-10 17:43:49 +00:00
cfgCtlr . priorityLevelStates = meal . newPLStates
2020-03-26 21:07:15 +00:00
klog . V ( 5 ) . Infof ( "Switched to new API Priority and Fairness configuration" )
return meal . fsStatusUpdates
}
// Digest the new set of PriorityLevelConfiguration objects.
// Pretend broken ones do not exist.
2020-12-01 01:06:26 +00:00
func ( meal * cfgMeal ) digestNewPLsLocked ( newPLs [ ] * flowcontrol . PriorityLevelConfiguration ) {
2020-03-26 21:07:15 +00:00
for _ , pl := range newPLs {
2020-08-10 17:43:49 +00:00
state := meal . cfgCtlr . priorityLevelStates [ pl . Name ]
2020-03-26 21:07:15 +00:00
if state == nil {
2020-08-10 17:43:49 +00:00
state = & priorityLevelState { obsPair : meal . cfgCtlr . obsPairGenerator . Generate ( 1 , 1 , [ ] string { pl . Name } ) }
2020-03-26 21:07:15 +00:00
}
2020-08-10 17:43:49 +00:00
qsCompleter , err := queueSetCompleterForPL ( meal . cfgCtlr . queueSetFactory , state . queues , pl , meal . cfgCtlr . requestWaitLimit , state . obsPair )
2020-03-26 21:07:15 +00:00
if err != nil {
klog . Warningf ( "Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s" , pl . Name , fcfmt . Fmt ( pl . Spec ) , err )
continue
}
meal . newPLStates [ pl . Name ] = state
state . pl = pl
state . qsCompleter = qsCompleter
if state . quiescing { // it was undesired, but no longer
klog . V ( 3 ) . Infof ( "Priority level %q was undesired and has become desired again" , pl . Name )
state . quiescing = false
}
if state . pl . Spec . Limited != nil {
meal . shareSum += float64 ( state . pl . Spec . Limited . AssuredConcurrencyShares )
}
2020-12-01 01:06:26 +00:00
meal . haveExemptPL = meal . haveExemptPL || pl . Name == flowcontrol . PriorityLevelConfigurationNameExempt
meal . haveCatchAllPL = meal . haveCatchAllPL || pl . Name == flowcontrol . PriorityLevelConfigurationNameCatchAll
2020-03-26 21:07:15 +00:00
}
}
// Digest the given FlowSchema objects. Ones that reference a missing
// or broken priority level are not to be passed on to the filter for
// use. We do this before holding over old priority levels so that
// requests stop going to those levels and FlowSchemaStatus values
// reflect this. This function also adds any missing mandatory
// FlowSchema objects. The given objects must all have distinct
// names.
2020-12-01 01:06:26 +00:00
func ( meal * cfgMeal ) digestFlowSchemasLocked ( newFSs [ ] * flowcontrol . FlowSchema ) {
2020-03-26 21:07:15 +00:00
fsSeq := make ( apihelpers . FlowSchemaSequence , 0 , len ( newFSs ) )
2020-12-01 01:06:26 +00:00
fsMap := make ( map [ string ] * flowcontrol . FlowSchema , len ( newFSs ) )
2020-03-26 21:07:15 +00:00
var haveExemptFS , haveCatchAllFS bool
for i , fs := range newFSs {
otherFS := fsMap [ fs . Name ]
if otherFS != nil {
// This client is forbidden to do this.
panic ( fmt . Sprintf ( "Given two FlowSchema objects with the same name: %s and %s" , fcfmt . Fmt ( otherFS ) , fcfmt . Fmt ( fs ) ) )
}
fsMap [ fs . Name ] = fs
_ , goodPriorityRef := meal . newPLStates [ fs . Spec . PriorityLevelConfiguration . Name ]
// Ensure the object's status reflects whether its priority
// level reference is broken.
//
// TODO: consider not even trying if server is not handling
// requests yet.
meal . presyncFlowSchemaStatus ( fs , ! goodPriorityRef , fs . Spec . PriorityLevelConfiguration . Name )
if ! goodPriorityRef {
klog . V ( 6 ) . Infof ( "Ignoring FlowSchema %s because of bad priority level reference %q" , fs . Name , fs . Spec . PriorityLevelConfiguration . Name )
continue
}
fsSeq = append ( fsSeq , newFSs [ i ] )
2020-12-01 01:06:26 +00:00
haveExemptFS = haveExemptFS || fs . Name == flowcontrol . FlowSchemaNameExempt
haveCatchAllFS = haveCatchAllFS || fs . Name == flowcontrol . FlowSchemaNameCatchAll
2020-03-26 21:07:15 +00:00
}
// sort into the order to be used for matching
sort . Sort ( fsSeq )
// Supply missing mandatory FlowSchemas, in correct position
if ! haveExemptFS {
fsSeq = append ( apihelpers . FlowSchemaSequence { fcboot . MandatoryFlowSchemaExempt } , fsSeq ... )
}
if ! haveCatchAllFS {
fsSeq = append ( fsSeq , fcboot . MandatoryFlowSchemaCatchAll )
}
2020-08-10 17:43:49 +00:00
meal . cfgCtlr . flowSchemas = fsSeq
if klog . V ( 5 ) . Enabled ( ) {
2020-03-26 21:07:15 +00:00
for _ , fs := range fsSeq {
klog . Infof ( "Using FlowSchema %s" , fcfmt . Fmt ( fs ) )
}
}
}
// Consider all the priority levels in the previous configuration.
// Keep the ones that are in the new config, supply mandatory
// behavior, or are still busy; for the rest: drop it if it has no
// queues, otherwise start the quiescing process if that has not
// already been started.
func ( meal * cfgMeal ) processOldPLsLocked ( ) {
2020-08-10 17:43:49 +00:00
for plName , plState := range meal . cfgCtlr . priorityLevelStates {
2020-03-26 21:07:15 +00:00
if meal . newPLStates [ plName ] != nil {
// Still desired and already updated
continue
}
2020-12-01 01:06:26 +00:00
if plName == flowcontrol . PriorityLevelConfigurationNameExempt && ! meal . haveExemptPL || plName == flowcontrol . PriorityLevelConfigurationNameCatchAll && ! meal . haveCatchAllPL {
2020-03-26 21:07:15 +00:00
// BTW, we know the Spec has not changed because the
// mandatory objects have immutable Specs
klog . V ( 3 ) . Infof ( "Retaining mandatory priority level %q despite lack of API object" , plName )
} else {
if plState . queues == nil || plState . numPending == 0 && plState . queues . IsIdle ( ) {
// Either there are no queues or they are done
// draining and no use is coming from another
// goroutine
klog . V ( 3 ) . Infof ( "Removing undesired priority level %q (nilQueues=%v), Type=%v" , plName , plState . queues == nil , plState . pl . Spec . Type )
continue
}
if ! plState . quiescing {
klog . V ( 3 ) . Infof ( "Priority level %q became undesired" , plName )
plState . quiescing = true
}
}
var err error
2020-08-10 17:43:49 +00:00
plState . qsCompleter , err = queueSetCompleterForPL ( meal . cfgCtlr . queueSetFactory , plState . queues , plState . pl , meal . cfgCtlr . requestWaitLimit , plState . obsPair )
2020-03-26 21:07:15 +00:00
if err != nil {
2020-08-10 17:43:49 +00:00
// This can not happen because queueSetCompleterForPL already approved this config
2020-03-26 21:07:15 +00:00
panic ( fmt . Sprintf ( "%s from name=%q spec=%s" , err , plName , fcfmt . Fmt ( plState . pl . Spec ) ) )
}
if plState . pl . Spec . Limited != nil {
// We deliberately include the lingering priority levels
// here so that their queues get some concurrency and they
// continue to drain. During this interim a lingering
// priority level continues to get a concurrency
// allocation determined by all the share values in the
// regular way.
meal . shareSum += float64 ( plState . pl . Spec . Limited . AssuredConcurrencyShares )
}
2020-12-01 01:06:26 +00:00
meal . haveExemptPL = meal . haveExemptPL || plName == flowcontrol . PriorityLevelConfigurationNameExempt
meal . haveCatchAllPL = meal . haveCatchAllPL || plName == flowcontrol . PriorityLevelConfigurationNameCatchAll
2020-03-26 21:07:15 +00:00
meal . newPLStates [ plName ] = plState
}
}
// For all the priority levels of the new config, divide up the
// server's total concurrency limit among them and create/update their
// QueueSets.
func ( meal * cfgMeal ) finishQueueSetReconfigsLocked ( ) {
for plName , plState := range meal . newPLStates {
if plState . pl . Spec . Limited == nil {
klog . V ( 5 ) . Infof ( "Using exempt priority level %q: quiescing=%v" , plName , plState . quiescing )
continue
}
// The use of math.Ceil here means that the results might sum
// to a little more than serverConcurrencyLimit but the
// difference will be negligible.
2020-08-10 17:43:49 +00:00
concurrencyLimit := int ( math . Ceil ( float64 ( meal . cfgCtlr . serverConcurrencyLimit ) * float64 ( plState . pl . Spec . Limited . AssuredConcurrencyShares ) / meal . shareSum ) )
2020-03-26 21:07:15 +00:00
metrics . UpdateSharedConcurrencyLimit ( plName , concurrencyLimit )
if plState . queues == nil {
klog . V ( 5 ) . Infof ( "Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)" , plName , fcfmt . Fmt ( plState . pl . Spec ) , concurrencyLimit , plState . quiescing , plState . pl . Spec . Limited . AssuredConcurrencyShares , meal . shareSum )
} else {
klog . V ( 5 ) . Infof ( "Retaining queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)" , plName , fcfmt . Fmt ( plState . pl . Spec ) , concurrencyLimit , plState . quiescing , plState . numPending , plState . pl . Spec . Limited . AssuredConcurrencyShares , meal . shareSum )
}
plState . queues = plState . qsCompleter . Complete ( fq . DispatchingConfig { ConcurrencyLimit : concurrencyLimit } )
}
}
2020-08-10 17:43:49 +00:00
// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
// given priority level configuration. Returns nil if that config
// does not call for limiting. Returns nil and an error if the given
2020-03-26 21:07:15 +00:00
// object is malformed in a way that is a problem for this package.
2020-12-01 01:06:26 +00:00
func queueSetCompleterForPL ( qsf fq . QueueSetFactory , queues fq . QueueSet , pl * flowcontrol . PriorityLevelConfiguration , requestWaitLimit time . Duration , intPair metrics . TimedObserverPair ) ( fq . QueueSetCompleter , error ) {
if ( pl . Spec . Type == flowcontrol . PriorityLevelEnablementExempt ) != ( pl . Spec . Limited == nil ) {
2020-03-26 21:07:15 +00:00
return nil , errors . New ( "broken union structure at the top" )
}
2020-12-01 01:06:26 +00:00
if ( pl . Spec . Type == flowcontrol . PriorityLevelEnablementExempt ) != ( pl . Name == flowcontrol . PriorityLevelConfigurationNameExempt ) {
2020-03-26 21:07:15 +00:00
// This package does not attempt to cope with a priority level dynamically switching between exempt and not.
return nil , errors . New ( "non-alignment between name and type" )
}
if pl . Spec . Limited == nil {
return nil , nil
}
2020-12-01 01:06:26 +00:00
if ( pl . Spec . Limited . LimitResponse . Type == flowcontrol . LimitResponseTypeReject ) != ( pl . Spec . Limited . LimitResponse . Queuing == nil ) {
2020-03-26 21:07:15 +00:00
return nil , errors . New ( "broken union structure for limit response" )
}
qcAPI := pl . Spec . Limited . LimitResponse . Queuing
qcQS := fq . QueuingConfig { Name : pl . Name }
if qcAPI != nil {
qcQS = fq . QueuingConfig { Name : pl . Name ,
DesiredNumQueues : int ( qcAPI . Queues ) ,
QueueLengthLimit : int ( qcAPI . QueueLengthLimit ) ,
HandSize : int ( qcAPI . HandSize ) ,
RequestWaitLimit : requestWaitLimit ,
}
}
var qsc fq . QueueSetCompleter
var err error
if queues != nil {
qsc , err = queues . BeginConfigChange ( qcQS )
} else {
2020-08-10 17:43:49 +00:00
qsc , err = qsf . BeginConstruction ( qcQS , intPair )
2020-03-26 21:07:15 +00:00
}
if err != nil {
err = errors . Wrap ( err , fmt . Sprintf ( "priority level %q has QueuingConfiguration %#+v, which is invalid" , pl . Name , qcAPI ) )
}
return qsc , err
}
2020-12-01 01:06:26 +00:00
func ( meal * cfgMeal ) presyncFlowSchemaStatus ( fs * flowcontrol . FlowSchema , isDangling bool , plName string ) {
danglingCondition := apihelpers . GetFlowSchemaConditionByType ( fs , flowcontrol . FlowSchemaConditionDangling )
2020-03-26 21:07:15 +00:00
if danglingCondition == nil {
2020-12-01 01:06:26 +00:00
danglingCondition = & flowcontrol . FlowSchemaCondition {
Type : flowcontrol . FlowSchemaConditionDangling ,
2020-03-26 21:07:15 +00:00
}
}
2020-12-01 01:06:26 +00:00
desiredStatus := flowcontrol . ConditionFalse
2020-03-26 21:07:15 +00:00
var desiredReason , desiredMessage string
if isDangling {
2020-12-01 01:06:26 +00:00
desiredStatus = flowcontrol . ConditionTrue
2020-03-26 21:07:15 +00:00
desiredReason = "NotFound"
desiredMessage = fmt . Sprintf ( "This FlowSchema references the PriorityLevelConfiguration object named %q but there is no such object" , plName )
} else {
desiredReason = "Found"
desiredMessage = fmt . Sprintf ( "This FlowSchema references the PriorityLevelConfiguration object named %q and it exists" , plName )
}
if danglingCondition . Status == desiredStatus && danglingCondition . Reason == desiredReason && danglingCondition . Message == desiredMessage {
return
}
meal . fsStatusUpdates = append ( meal . fsStatusUpdates , fsStatusUpdate {
flowSchema : fs ,
2020-12-01 01:06:26 +00:00
condition : flowcontrol . FlowSchemaCondition {
Type : flowcontrol . FlowSchemaConditionDangling ,
2020-03-26 21:07:15 +00:00
Status : desiredStatus ,
LastTransitionTime : metav1 . Now ( ) ,
Reason : desiredReason ,
Message : desiredMessage ,
} ,
oldValue : * danglingCondition } )
}
// imaginePL adds a priority level based on one of the mandatory ones
2020-08-10 17:43:49 +00:00
// that does not actually exist (right now) as a real API object.
2020-12-01 01:06:26 +00:00
func ( meal * cfgMeal ) imaginePL ( proto * flowcontrol . PriorityLevelConfiguration , requestWaitLimit time . Duration ) {
2020-03-26 21:07:15 +00:00
klog . V ( 3 ) . Infof ( "No %s PriorityLevelConfiguration found, imagining one" , proto . Name )
2020-08-10 17:43:49 +00:00
obsPair := meal . cfgCtlr . obsPairGenerator . Generate ( 1 , 1 , [ ] string { proto . Name } )
qsCompleter , err := queueSetCompleterForPL ( meal . cfgCtlr . queueSetFactory , nil , proto , requestWaitLimit , obsPair )
2020-03-26 21:07:15 +00:00
if err != nil {
// This can not happen because proto is one of the mandatory
// objects and these are not erroneous
panic ( err )
}
meal . newPLStates [ proto . Name ] = & priorityLevelState {
pl : proto ,
qsCompleter : qsCompleter ,
2020-08-10 17:43:49 +00:00
obsPair : obsPair ,
2020-03-26 21:07:15 +00:00
}
if proto . Spec . Limited != nil {
meal . shareSum += float64 ( proto . Spec . Limited . AssuredConcurrencyShares )
}
return
}
type immediateRequest struct { }
func ( immediateRequest ) Finish ( execute func ( ) ) bool {
execute ( )
return false
}
// startRequest classifies and, if appropriate, enqueues the request.
// Returns a nil Request if and only if the request is to be rejected.
// The returned bool indicates whether the request is exempt from
// limitation. The startWaitingTime is when the request started
// waiting in its queue, or `Time{}` if this did not happen.
2020-12-01 01:06:26 +00:00
func ( cfgCtlr * configController ) startRequest ( ctx context . Context , rd RequestDigest , queueNoteFn fq . QueueNoteFn ) ( fs * flowcontrol . FlowSchema , pl * flowcontrol . PriorityLevelConfiguration , isExempt bool , req fq . Request , startWaitingTime time . Time ) {
2020-03-26 21:07:15 +00:00
klog . V ( 7 ) . Infof ( "startRequest(%#+v)" , rd )
2020-08-10 17:43:49 +00:00
cfgCtlr . lock . Lock ( )
defer cfgCtlr . lock . Unlock ( )
2020-12-01 01:06:26 +00:00
var selectedFlowSchema * flowcontrol . FlowSchema
2020-08-10 17:43:49 +00:00
for _ , fs := range cfgCtlr . flowSchemas {
2020-03-26 21:07:15 +00:00
if matchesFlowSchema ( rd , fs ) {
2020-12-01 01:06:26 +00:00
selectedFlowSchema = fs
break
2020-03-26 21:07:15 +00:00
}
}
2020-12-01 01:06:26 +00:00
if selectedFlowSchema == nil {
// This should never happen. If the requestDigest's User is a part of
// system:authenticated or system:unauthenticated, the catch-all flow
// schema should match it. However, if that invariant somehow fails,
// fallback to the catch-all flow schema anyway.
for _ , fs := range cfgCtlr . flowSchemas {
if fs . Name == flowcontrol . FlowSchemaNameCatchAll {
selectedFlowSchema = fs
break
}
2020-03-26 21:07:15 +00:00
}
2020-12-01 01:06:26 +00:00
if selectedFlowSchema == nil {
// This should absolutely never, ever happen! APF guarantees two
// undeletable flow schemas at all times: an exempt flow schema and a
// catch-all flow schema.
panic ( fmt . Sprintf ( "no fallback catch-all flow schema found for request %#+v and user %#+v" , rd . RequestInfo , rd . User ) )
}
klog . Warningf ( "no match found for request %#+v and user %#+v; selecting catchAll=%s as fallback flow schema" , rd . RequestInfo , rd . User , fcfmt . Fmt ( selectedFlowSchema ) )
2020-03-26 21:07:15 +00:00
}
2020-12-01 01:06:26 +00:00
plName := selectedFlowSchema . Spec . PriorityLevelConfiguration . Name
plState := cfgCtlr . priorityLevelStates [ plName ]
if plState . pl . Spec . Type == flowcontrol . PriorityLevelEnablementExempt {
klog . V ( 7 ) . Infof ( "startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate" , rd , selectedFlowSchema . Name , selectedFlowSchema . Spec . DistinguisherMethod , plName )
return selectedFlowSchema , plState . pl , true , immediateRequest { } , time . Time { }
}
var numQueues int32
if plState . pl . Spec . Limited . LimitResponse . Type == flowcontrol . LimitResponseTypeQueue {
numQueues = plState . pl . Spec . Limited . LimitResponse . Queuing . Queues
}
var flowDistinguisher string
var hashValue uint64
if numQueues > 1 {
flowDistinguisher = computeFlowDistinguisher ( rd , selectedFlowSchema . Spec . DistinguisherMethod )
hashValue = hashFlowID ( selectedFlowSchema . Name , flowDistinguisher )
}
startWaitingTime = time . Now ( )
klog . V ( 7 ) . Infof ( "startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d" , rd , selectedFlowSchema . Name , selectedFlowSchema . Spec . DistinguisherMethod , plName , numQueues )
req , idle := plState . queues . StartRequest ( ctx , hashValue , flowDistinguisher , selectedFlowSchema . Name , rd . RequestInfo , rd . User , queueNoteFn )
if idle {
cfgCtlr . maybeReapLocked ( plName , plState )
}
return selectedFlowSchema , plState . pl , false , req , startWaitingTime
2020-03-26 21:07:15 +00:00
}
// Call this after getting a clue that the given priority level is undesired and idle
2020-08-10 17:43:49 +00:00
func ( cfgCtlr * configController ) maybeReap ( plName string ) {
cfgCtlr . lock . Lock ( )
defer cfgCtlr . lock . Unlock ( )
plState := cfgCtlr . priorityLevelStates [ plName ]
2020-03-26 21:07:15 +00:00
if plState == nil {
klog . V ( 7 ) . Infof ( "plName=%s, plState==nil" , plName )
return
}
if plState . queues != nil {
useless := plState . quiescing && plState . numPending == 0 && plState . queues . IsIdle ( )
klog . V ( 7 ) . Infof ( "plState.quiescing=%v, plState.numPending=%d, useless=%v" , plState . quiescing , plState . numPending , useless )
if ! useless {
return
}
}
klog . V ( 3 ) . Infof ( "Triggered API priority and fairness config reloading because priority level %s is undesired and idle" , plName )
2020-08-10 17:43:49 +00:00
cfgCtlr . configQueue . Add ( 0 )
2020-03-26 21:07:15 +00:00
}
// Call this if both (1) plState.queues is non-nil and reported being
2020-08-10 17:43:49 +00:00
// idle, and (2) cfgCtlr's lock has not been released since then.
func ( cfgCtlr * configController ) maybeReapLocked ( plName string , plState * priorityLevelState ) {
2020-03-26 21:07:15 +00:00
if ! ( plState . quiescing && plState . numPending == 0 ) {
return
}
klog . V ( 3 ) . Infof ( "Triggered API priority and fairness config reloading because priority level %s is undesired and idle" , plName )
2020-08-10 17:43:49 +00:00
cfgCtlr . configQueue . Add ( 0 )
2020-03-26 21:07:15 +00:00
}
// computeFlowDistinguisher extracts the flow distinguisher according to the given method
2020-12-01 01:06:26 +00:00
func computeFlowDistinguisher ( rd RequestDigest , method * flowcontrol . FlowDistinguisherMethod ) string {
2020-03-26 21:07:15 +00:00
if method == nil {
return ""
}
switch method . Type {
2020-12-01 01:06:26 +00:00
case flowcontrol . FlowDistinguisherMethodByUserType :
2020-03-26 21:07:15 +00:00
return rd . User . GetName ( )
2020-12-01 01:06:26 +00:00
case flowcontrol . FlowDistinguisherMethodByNamespaceType :
2020-03-26 21:07:15 +00:00
return rd . RequestInfo . Namespace
default :
// this line shall never reach
panic ( "invalid flow-distinguisher method" )
}
}
func hashFlowID ( fsName , fDistinguisher string ) uint64 {
hash := sha256 . New ( )
var sep = [ 1 ] byte { 0 }
hash . Write ( [ ] byte ( fsName ) )
hash . Write ( sep [ : ] )
hash . Write ( [ ] byte ( fDistinguisher ) )
var sum [ 32 ] byte
hash . Sum ( sum [ : 0 ] )
return binary . LittleEndian . Uint64 ( sum [ : 8 ] )
}