Refactor throttle into util pkg

Fix missing throttle.go
pull/6/head
harry 2016-03-09 13:54:59 +08:00 committed by Harry Zhang
parent dae5ac4828
commit 8472cfa214
15 changed files with 51 additions and 49 deletions

View File

@ -56,6 +56,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/util/flowcontrol"
utilnet "k8s.io/kubernetes/pkg/util/net"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
@ -201,7 +202,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096).
Run(3, wait.NeverStop)
nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(),
nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(5 * time.Second)
cadvisorInterface := new(cadvisortest.Fake)

View File

@ -64,9 +64,9 @@ import (
"k8s.io/kubernetes/pkg/healthz"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
@ -207,8 +207,8 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
// this cidr has been validated already
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)

View File

@ -58,8 +58,8 @@ import (
"k8s.io/kubernetes/pkg/healthz"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/contrib/mesos/pkg/profile"
@ -154,8 +154,8 @@ func (s *CMServer) Run(_ []string) error {
}
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
const (
@ -53,7 +54,7 @@ type RESTClient struct {
contentConfig ContentConfig
// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
Throttle util.RateLimiter
Throttle flowcontrol.RateLimiter
// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
@ -77,9 +78,9 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
config.ContentType = "application/json"
}
var throttle util.RateLimiter
var throttle flowcontrol.RateLimiter
if maxQPS > 0 {
throttle = util.NewTokenBucketRateLimiter(maxQPS, maxBurst)
throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
}
return &RESTClient{
base: &base,

View File

@ -39,7 +39,7 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
@ -117,11 +117,11 @@ type Request struct {
resp *http.Response
backoffMgr BackoffManager
throttle util.RateLimiter
throttle flowcontrol.RateLimiter
}
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle util.RateLimiter) *Request {
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request {
if backoff == nil {
glog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{}

View File

@ -33,8 +33,8 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
netsets "k8s.io/kubernetes/pkg/util/net/sets"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
@ -80,7 +80,7 @@ type GCECloud struct {
managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master)
networkURL string
useMetadataServer bool
operationPollRateLimiter util.RateLimiter
operationPollRateLimiter flowcontrol.RateLimiter
}
type Config struct {
@ -297,7 +297,7 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
glog.Infof("managing multiple zones: %v", managedZones)
}
operationPollRateLimiter := util.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size.
operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size.
return &GCECloud{
service: svc,

View File

@ -22,7 +22,7 @@ import (
"strings"
"time"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/oauth2"
@ -61,7 +61,7 @@ type altTokenSource struct {
oauthClient *http.Client
tokenURL string
tokenBody string
throttle util.RateLimiter
throttle flowcontrol.RateLimiter
}
func (a *altTokenSource) Token() (*oauth2.Token, error) {
@ -106,7 +106,7 @@ func newAltTokenSource(tokenURL, tokenBody string) oauth2.TokenSource {
oauthClient: client,
tokenURL: tokenURL,
tokenBody: tokenBody,
throttle: util.NewTokenBucketRateLimiter(tokenURLQPS, tokenURLBurst),
throttle: flowcontrol.NewTokenBucketRateLimiter(tokenURLQPS, tokenURLBurst),
}
return oauth2.ReuseTokenSource(nil, a)
}

View File

@ -40,7 +40,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
@ -69,7 +69,7 @@ type NodeController struct {
allocateNodeCIDRs bool
cloud cloudprovider.Interface
clusterCIDR *net.IPNet
deletingPodsRateLimiter util.RateLimiter
deletingPodsRateLimiter flowcontrol.RateLimiter
knownNodeSet sets.String
kubeClient clientset.Interface
// Method for easy mocking in unittest.
@ -129,8 +129,8 @@ func NewNodeController(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
deletionEvictionLimiter util.RateLimiter,
terminationEvictionLimiter util.RateLimiter,
deletionEvictionLimiter flowcontrol.RateLimiter,
terminationEvictionLimiter flowcontrol.RateLimiter,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,

View File

@ -31,8 +31,8 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
@ -418,7 +418,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod,
evictionTimeout, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
for _, ds := range item.daemonSets {
@ -487,7 +487,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
deleteWaitChan: make(chan struct{}),
}
nodeController := NewNodeController(nil, fnh, 10*time.Minute,
util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(),
flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, false)
nodeController.cloud = &fakecloud.FakeCloud{}
@ -720,8 +720,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}
for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(),
util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(),
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
@ -870,8 +870,8 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
}
for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(),
util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(),
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err)
@ -952,7 +952,7 @@ func TestNodeDeletion(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
}
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(),
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {

View File

@ -22,7 +22,7 @@ import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -137,11 +137,11 @@ func (q *UniqueQueue) Head() (TimedValue, bool) {
// of execution. It is also rate limited.
type RateLimitedTimedQueue struct {
queue UniqueQueue
limiter util.RateLimiter
limiter flowcontrol.RateLimiter
}
// Creates new queue which will use given RateLimiter to oversee execution.
func NewRateLimitedTimedQueue(limiter util.RateLimiter) *RateLimitedTimedQueue {
func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue {
return &RateLimitedTimedQueue{
queue: UniqueQueue{
queue: TimedQueue{},

View File

@ -21,7 +21,7 @@ import (
"testing"
"time"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -39,7 +39,7 @@ func CheckSetEq(lhs, rhs sets.String) bool {
}
func TestAddNode(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
@ -62,7 +62,7 @@ func TestAddNode(t *testing.T) {
}
func TestDelNode(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
@ -84,7 +84,7 @@ func TestDelNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
}
evictor = NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
@ -106,7 +106,7 @@ func TestDelNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
}
evictor = NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
@ -130,7 +130,7 @@ func TestDelNode(t *testing.T) {
}
func TestTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
@ -152,7 +152,7 @@ func TestTry(t *testing.T) {
}
func TestTryOrdering(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
@ -184,7 +184,7 @@ func TestTryOrdering(t *testing.T) {
}
func TestTryRemovingWhileTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")

View File

@ -32,8 +32,8 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/leaky"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/parsers"
)
@ -107,7 +107,7 @@ type dockerPuller struct {
type throttledDockerPuller struct {
puller dockerPuller
limiter util.RateLimiter
limiter flowcontrol.RateLimiter
}
// newDockerPuller creates a new instance of the default implementation of DockerPuller.
@ -122,7 +122,7 @@ func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPulle
}
return &throttledDockerPuller{
puller: dp,
limiter: util.NewTokenBucketRateLimiter(qps, burst),
limiter: flowcontrol.NewTokenBucketRateLimiter(qps, burst),
}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package flowcontrol
import (
"sync"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package flowcontrol
import (
"math"

View File

@ -26,7 +26,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
@ -66,7 +66,7 @@ var _ = KubeDescribe("Service endpoints latency", func() {
// Turn off rate limiting--it interferes with our measurements.
oldThrottle := f.Client.RESTClient.Throttle
f.Client.RESTClient.Throttle = util.NewFakeAlwaysRateLimiter()
f.Client.RESTClient.Throttle = flowcontrol.NewFakeAlwaysRateLimiter()
defer func() { f.Client.RESTClient.Throttle = oldThrottle }()
failing := sets.NewString()