Merge pull request #34588 from caesarxuchao/copy-workqueue

Automatic merge from submit-queue

Copy workqueue to client-go

The workqueue is very useful when building controllers, so this PR copied it to client-go's staging area.

Fix https://github.com/kubernetes/kubernetes/issues/33497
pull/6/head
Kubernetes Submit Queue 2017-01-02 01:28:52 -08:00 committed by GitHub
commit 1ab598e973
21 changed files with 1961 additions and 169 deletions

View File

@ -77,6 +77,9 @@ mkcp "/pkg/client/unversioned/clientcmd" "/pkg/client/unversioned"
mkcp "/pkg/client/unversioned/portforward" "/pkg/client/unversioned"
mkcp "/plugin/pkg/client/auth" "/plugin/pkg/client"
mkcp "/pkg/util/workqueue" "pkg/util"
# remove this folder because it imports prometheus
rm -rf "${CLIENT_REPO_TEMP}/pkg/util/workqueue/prometheus"
# remove this test because it imports the internal clientset
rm "${CLIENT_REPO_TEMP}"/pkg/client/unversioned/portforward/portforward_test.go

View File

@ -404,6 +404,12 @@ func IsForbidden(err error) bool {
return reasonForError(err) == metav1.StatusReasonForbidden
}
// IsTimeout determines if err is an error which indicates that request times out due to long
// processing.
func IsTimeout(err error) bool {
return reasonForError(err) == metav1.StatusReasonTimeout
}
// IsServerTimeout determines if err is an error which indicates that the request needs to be retried
// by the client.
func IsServerTimeout(err error) bool {

View File

@ -3249,6 +3249,9 @@ const (
// - Secret.Data["token"] - a token that identifies the service account to the API
SecretTypeServiceAccountToken SecretType = "kubernetes.io/service-account-token"
// SecretTypeBootstrapToken is the key for tokens used by kubeadm to validate cluster info during discovery.
SecretTypeBootstrapToken = "bootstrap.kubernetes.io/token"
// ServiceAccountNameKey is the key of the required annotation for SecretTypeServiceAccountToken secrets
ServiceAccountNameKey = "kubernetes.io/service-account.name"
// ServiceAccountUIDKey is the key of the required annotation for SecretTypeServiceAccountToken secrets

View File

@ -19,6 +19,7 @@ package kubeadm
import (
"fmt"
"os"
"path"
"runtime"
"strings"
)
@ -46,9 +47,9 @@ func SetEnvParams() *EnvParams {
}
return &EnvParams{
KubernetesDir: envParams["kubernetes_dir"],
HostPKIPath: envParams["host_pki_path"],
HostEtcdPath: envParams["host_etcd_path"],
KubernetesDir: path.Clean(envParams["kubernetes_dir"]),
HostPKIPath: path.Clean(envParams["host_pki_path"]),
HostEtcdPath: path.Clean(envParams["host_etcd_path"]),
HyperkubeImage: envParams["hyperkube_image"],
RepositoryPrefix: envParams["repo_prefix"],
DiscoveryImage: envParams["discovery_image"],

View File

@ -27,6 +27,15 @@ const ConfigMirrorAnnotationKey = "kubernetes.io/config.mirror"
const ConfigFirstSeenAnnotationKey = "kubernetes.io/config.seen"
const ConfigHashAnnotationKey = "kubernetes.io/config.hash"
// This key needs to sync with the key used by the rescheduler, which currently
// lives in contrib. Its presence indicates 2 things, as far as the kubelet is
// concerned:
// 1. Resource related admission checks will prioritize the admission of
// pods bearing the key, over pods without the key, regardless of QoS.
// 2. The OOM score of pods bearing the key will be <= pods without
// the key (where the <= part is determied by QoS).
const CriticalPodAnnotationKey = "scheduler.alpha.kubernetes.io/critical-pod"
// PodOperation defines what changes will be made on a pod configuration.
type PodOperation int

View File

@ -194,7 +194,7 @@ func (s *Serializer) RecognizesData(peek io.Reader) (ok, unknown bool, err error
// we could potentially look for '---'
return false, true, nil
}
_, ok = utilyaml.GuessJSONStream(peek, 2048)
_, _, ok = utilyaml.GuessJSONStream(peek, 2048)
return ok, false, nil
}

View File

@ -0,0 +1,211 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"math"
"sync"
"time"
"github.com/juju/ratelimit"
)
type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limitting. The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))},
)
}
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type BucketRateLimiter struct {
*ratelimit.Bucket
}
var _ RateLimiter = &BucketRateLimiter{}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Bucket.Take(1)
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
return 0
}
func (r *BucketRateLimiter) Forget(item interface{}) {
}
// ItemExponentialFailureRateLimiter does a simple baseDelay*10^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
baseDelay time.Duration
maxDelay time.Duration
}
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
return &ItemExponentialFailureRateLimiter{
failures: map[interface{}]int{},
baseDelay: baseDelay,
maxDelay: maxDelay,
}
}
func DefaultItemBasedRateLimiter() RateLimiter {
return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
maxFastAttempts int
fastDelay time.Duration
slowDelay time.Duration
}
var _ RateLimiter = &ItemFastSlowRateLimiter{}
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
return &ItemFastSlowRateLimiter{
failures: map[interface{}]int{},
fastDelay: fastDelay,
slowDelay: slowDelay,
maxFastAttempts: maxFastAttempts,
}
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
r.failures[item] = r.failures[item] + 1
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
return &MaxOfRateLimiter{limiters: limiters}
}
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) Forget(item interface{}) {
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}

View File

@ -0,0 +1,184 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"testing"
"time"
)
func TestItemExponentialFailureRateLimiter(t *testing.T) {
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
if e, a := 1*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 4*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 8*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 16*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5, limiter.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1*time.Millisecond, limiter.When("two"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2*time.Millisecond, limiter.When("two"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, limiter.NumRequeues("two"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
limiter.Forget("one")
if e, a := 0, limiter.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}
func TestItemExponentialFailureRateLimiterOverFlow(t *testing.T) {
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1000*time.Second)
for i := 0; i < 5; i++ {
limiter.When("one")
}
if e, a := 32*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
for i := 0; i < 1000; i++ {
limiter.When("overflow1")
}
if e, a := 1000*time.Second, limiter.When("overflow1"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
limiter = NewItemExponentialFailureRateLimiter(1*time.Minute, 1000*time.Hour)
for i := 0; i < 2; i++ {
limiter.When("two")
}
if e, a := 4*time.Minute, limiter.When("two"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
for i := 0; i < 1000; i++ {
limiter.When("overflow2")
}
if e, a := 1000*time.Hour, limiter.When("overflow2"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}
func TestItemFastSlowRateLimiter(t *testing.T) {
limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3)
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 10*time.Second, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 10*time.Second, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5, limiter.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, limiter.NumRequeues("two"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
limiter.Forget("one")
if e, a := 0, limiter.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}
func TestMaxOfRateLimiter(t *testing.T) {
limiter := NewMaxOfRateLimiter(
NewItemFastSlowRateLimiter(5*time.Millisecond, 3*time.Second, 3),
NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second),
)
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 3*time.Second, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 3*time.Second, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5, limiter.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, limiter.NumRequeues("two"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
limiter.Forget("one")
if e, a := 0, limiter.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

View File

@ -0,0 +1,246 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sort"
"time"
"k8s.io/client-go/pkg/util/clock"
utilruntime "k8s.io/client-go/pkg/util/runtime"
)
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
// NewDelayingQueue constructs a new workqueue with delayed queuing ability
func NewDelayingQueue() DelayingInterface {
return newDelayingQueue(clock.RealClock{}, "")
}
func NewNamedDelayingQueue(name string) DelayingInterface {
return newDelayingQueue(clock.RealClock{}, name)
}
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingTimeByEntry: map[t]time.Time{},
waitingForAddCh: make(chan waitFor, 1000),
metrics: newRetryMetrics(name),
}
go ret.waitingLoop()
return ret
}
// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
Interface
// clock tracks time for delayed firing
clock clock.Clock
// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// heartbeat ensures we wait no more than maxWait before firing
//
// TODO: replace with Ticker (and add to clock) so this can be cleaned up.
// clock.Tick will leak.
heartbeat <-chan time.Time
// waitingForAdd is an ordered slice of items to be added to the contained work queue
waitingForAdd []waitFor
// waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes
waitingTimeByEntry map[t]time.Time
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan waitFor
// metrics counts the number of retries
metrics retryMetrics
}
// waitFor holds the data to add and the time it should be added
type waitFor struct {
data t
readyAt time.Time
}
// ShutDown gives a way to shut off this queue
func (q *delayingType) ShutDown() {
q.Interface.ShutDown()
close(q.stopCh)
}
// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}
q.metrics.retry()
// immediately add things with no delay
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
// expired item sitting for more than 10 seconds.
const maxWait = 10 * time.Second
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)
for {
if q.Interface.ShuttingDown() {
// discard waiting entries
q.waitingForAdd = nil
q.waitingTimeByEntry = nil
return
}
now := q.clock.Now()
// Add ready entries
readyEntries := 0
for _, entry := range q.waitingForAdd {
if entry.readyAt.After(now) {
break
}
q.Add(entry.data)
delete(q.waitingTimeByEntry, entry.data)
readyEntries++
}
q.waitingForAdd = q.waitingForAdd[readyEntries:]
// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if len(q.waitingForAdd) > 0 {
nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now))
}
select {
case <-q.stopCh:
return
case <-q.heartbeat:
// continue the loop, which will add ready items
case <-nextReadyAt:
// continue the loop, which will add ready items
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
} else {
q.Add(waitEntry.data)
}
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
// inserts the given entry into the sorted entries list
// same semantics as append()... the given slice may be modified,
// and the returned value should be used
//
// TODO: This should probably be converted to use container/heap to improve
// running time for a large number of items.
func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor {
// if the entry is already in our retry list and the existing time is before the new one, just skip it
existingTime, exists := knownEntries[entry.data]
if exists && existingTime.Before(entry.readyAt) {
return entries
}
// if the entry exists and is scheduled for later, go ahead and remove the entry
if exists {
if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) {
entries = append(entries[:existingIndex], entries[existingIndex+1:]...)
}
}
insertionIndex := sort.Search(len(entries), func(i int) bool {
return entry.readyAt.Before(entries[i].readyAt)
})
// grow by 1
entries = append(entries, waitFor{})
// shift items from the insertion point to the end
copy(entries[insertionIndex+1:], entries[insertionIndex:])
// insert the record
entries[insertionIndex] = entry
knownEntries[entry.data] = entry.readyAt
return entries
}
// findEntryIndex returns the index for an existing entry
func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int {
index := sort.Search(len(entries), func(i int) bool {
return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt
})
// we know this is the earliest possible index, but there could be multiple with the same time
// iterate from here to find the dupe
for ; index < len(entries); index++ {
if entries[index].data == data {
break
}
}
return index
}

View File

@ -0,0 +1,236 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"fmt"
"reflect"
"testing"
"time"
"k8s.io/client-go/pkg/util/clock"
"k8s.io/client-go/pkg/util/wait"
)
func TestSimpleQueue(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock, "")
first := "foo"
q.AddAfter(first, 50*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
fakeClock.Step(60 * time.Millisecond)
if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ := q.Get()
q.Done(item)
// step past the next heartbeat
fakeClock.Step(10 * time.Second)
err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) {
if q.Len() > 0 {
return false, fmt.Errorf("added to queue")
}
return false, nil
})
if err != wait.ErrWaitTimeout {
t.Errorf("expected timeout, got: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
}
func TestDeduping(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock, "")
first := "foo"
q.AddAfter(first, 50*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
q.AddAfter(first, 70*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
// step past the first block, we should receive now
fakeClock.Step(60 * time.Millisecond)
if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ := q.Get()
q.Done(item)
// step past the second add
fakeClock.Step(20 * time.Millisecond)
if q.Len() != 0 {
t.Errorf("should not have added")
}
// test again, but this time the earlier should override
q.AddAfter(first, 50*time.Millisecond)
q.AddAfter(first, 30*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
fakeClock.Step(40 * time.Millisecond)
if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ = q.Get()
q.Done(item)
// step past the second add
fakeClock.Step(20 * time.Millisecond)
if q.Len() != 0 {
t.Errorf("should not have added")
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
}
func TestAddTwoFireEarly(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock, "")
first := "foo"
second := "bar"
third := "baz"
q.AddAfter(first, 1*time.Second)
q.AddAfter(second, 50*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
fakeClock.Step(60 * time.Millisecond)
if err := waitForAdded(q, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
item, _ := q.Get()
if !reflect.DeepEqual(item, second) {
t.Errorf("expected %v, got %v", second, item)
}
q.AddAfter(third, 2*time.Second)
fakeClock.Step(1 * time.Second)
if err := waitForAdded(q, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
item, _ = q.Get()
if !reflect.DeepEqual(item, first) {
t.Errorf("expected %v, got %v", first, item)
}
fakeClock.Step(2 * time.Second)
if err := waitForAdded(q, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
item, _ = q.Get()
if !reflect.DeepEqual(item, third) {
t.Errorf("expected %v, got %v", third, item)
}
}
func TestCopyShifting(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock, "")
first := "foo"
second := "bar"
third := "baz"
q.AddAfter(first, 1*time.Second)
q.AddAfter(second, 500*time.Millisecond)
q.AddAfter(third, 250*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
fakeClock.Step(2 * time.Second)
if err := waitForAdded(q, 3); err != nil {
t.Fatalf("unexpected err: %v", err)
}
actualFirst, _ := q.Get()
if !reflect.DeepEqual(actualFirst, third) {
t.Errorf("expected %v, got %v", third, actualFirst)
}
actualSecond, _ := q.Get()
if !reflect.DeepEqual(actualSecond, second) {
t.Errorf("expected %v, got %v", second, actualSecond)
}
actualThird, _ := q.Get()
if !reflect.DeepEqual(actualThird, first) {
t.Errorf("expected %v, got %v", first, actualThird)
}
}
func waitForAdded(q DelayingInterface, depth int) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if q.Len() == depth {
return true, nil
}
return false, nil
})
}
func waitForWaitingQueueToFill(q DelayingInterface) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if len(q.(*delayingType).waitingForAddCh) == 0 {
return true, nil
}
return false, nil
})
}

View File

@ -0,0 +1,26 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package workqueue provides a simple queue that supports the following
// features:
// * Fair: items processed in the order in which they are added.
// * Stingy: a single item will not be processed multiple times concurrently,
// and if an item is added multiple times before it can be processed, it
// will only be processed once.
// * Multiple consumers and producers. In particular, it is allowed for an
// item to be reenqueued while it is being processed.
// * Shutdown notifications.
package workqueue // import "k8s.io/client-go/pkg/util/workqueue"

View File

@ -0,0 +1,195 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sync"
"time"
)
// This file provides abstractions for setting the provider (e.g., prometheus)
// of metrics.
type queueMetrics interface {
add(item t)
get(item t)
done(item t)
}
// GaugeMetric represents a single numerical value that can arbitrarily go up
// and down.
type GaugeMetric interface {
Inc()
Dec()
}
// CounterMetric represents a single numerical value that only ever
// goes up.
type CounterMetric interface {
Inc()
}
// SummaryMetric captures individual observations.
type SummaryMetric interface {
Observe(float64)
}
type noopMetric struct{}
func (noopMetric) Inc() {}
func (noopMetric) Dec() {}
func (noopMetric) Observe(float64) {}
type defaultQueueMetrics struct {
// current depth of a workqueue
depth GaugeMetric
// total number of adds handled by a workqueue
adds CounterMetric
// how long an item stays in a workqueue
latency SummaryMetric
// how long processing an item from a workqueue takes
workDuration SummaryMetric
addTimes map[t]time.Time
processingStartTimes map[t]time.Time
}
func (m *defaultQueueMetrics) add(item t) {
if m == nil {
return
}
m.adds.Inc()
m.depth.Inc()
if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = time.Now()
}
}
func (m *defaultQueueMetrics) get(item t) {
if m == nil {
return
}
m.depth.Dec()
m.processingStartTimes[item] = time.Now()
if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(sinceInMicroseconds(startTime))
delete(m.addTimes, item)
}
}
func (m *defaultQueueMetrics) done(item t) {
if m == nil {
return
}
if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item)
}
}
// Gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}
type retryMetrics interface {
retry()
}
type defaultRetryMetrics struct {
retries CounterMetric
}
func (m *defaultRetryMetrics) retry() {
if m == nil {
return
}
m.retries.Inc()
}
// MetricsProvider generates various metrics used by the queue.
type MetricsProvider interface {
NewDepthMetric(name string) GaugeMetric
NewAddsMetric(name string) CounterMetric
NewLatencyMetric(name string) SummaryMetric
NewWorkDurationMetric(name string) SummaryMetric
NewRetriesMetric(name string) CounterMetric
}
type noopMetricsProvider struct{}
func (_ noopMetricsProvider) NewDepthMetric(name string) GaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
var metricsFactory = struct {
metricsProvider MetricsProvider
setProviders sync.Once
}{
metricsProvider: noopMetricsProvider{},
}
func newQueueMetrics(name string) queueMetrics {
var ret *defaultQueueMetrics
if len(name) == 0 {
return ret
}
return &defaultQueueMetrics{
depth: metricsFactory.metricsProvider.NewDepthMetric(name),
adds: metricsFactory.metricsProvider.NewAddsMetric(name),
latency: metricsFactory.metricsProvider.NewLatencyMetric(name),
workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
}
}
func newRetryMetrics(name string) retryMetrics {
var ret *defaultRetryMetrics
if len(name) == 0 {
return ret
}
return &defaultRetryMetrics{
retries: metricsFactory.metricsProvider.NewRetriesMetric(name),
}
}
// SetProvider sets the metrics provider of the metricsFactory.
func SetProvider(metricsProvider MetricsProvider) {
metricsFactory.setProviders.Do(func() {
metricsFactory.metricsProvider = metricsProvider
})
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sync"
utilruntime "k8s.io/client-go/pkg/util/runtime"
)
type DoWorkPieceFunc func(piece int)
// Parallelize is a very simple framework that allow for parallelizing
// N independent pieces of work.
func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) {
toProcess := make(chan int, pieces)
for i := 0; i < pieces; i++ {
toProcess <- i
}
close(toProcess)
if pieces < workers {
workers = pieces
}
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
for piece := range toProcess {
doWorkPiece(piece)
}
}()
}
wg.Wait()
}

View File

@ -0,0 +1,172 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sync"
)
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
// New constructs a new workqueue (see the package comment).
func New() *Type {
return NewNamed("")
}
func NewNamed(name string) *Type {
return &Type{
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: newQueueMetrics(name),
}
}
// Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t
// dirty defines all of the items that need to be processed.
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
}
type empty struct{}
type t interface{}
type set map[t]empty
func (s set) has(item t) bool {
_, exists := s[item]
return exists
}
func (s set) insert(item t) {
s[item] = empty{}
}
func (s set) delete(item t) {
delete(s, item)
}
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) {
return
}
q.metrics.add(item)
q.dirty.insert(item)
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
// Len returns the current queue length, for informational purposes only. You
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
// value, that can't be synchronized properly.
func (q *Type) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.queue)
}
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
// ShutDown will cause q to ignore all new items added to it. As soon as the
// worker goroutines have drained the existing items in the queue, they will be
// instructed to exit.
func (q *Type) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true
q.cond.Broadcast()
}
func (q *Type) ShuttingDown() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.shuttingDown
}

View File

@ -0,0 +1,161 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue_test
import (
"sync"
"testing"
"time"
"k8s.io/client-go/pkg/util/workqueue"
)
func TestBasic(t *testing.T) {
// If something is seriously wrong this test will never complete.
q := workqueue.New()
// Start producers
const producers = 50
producerWG := sync.WaitGroup{}
producerWG.Add(producers)
for i := 0; i < producers; i++ {
go func(i int) {
defer producerWG.Done()
for j := 0; j < 50; j++ {
q.Add(i)
time.Sleep(time.Millisecond)
}
}(i)
}
// Start consumers
const consumers = 10
consumerWG := sync.WaitGroup{}
consumerWG.Add(consumers)
for i := 0; i < consumers; i++ {
go func(i int) {
defer consumerWG.Done()
for {
item, quit := q.Get()
if item == "added after shutdown!" {
t.Errorf("Got an item added after shutdown.")
}
if quit {
return
}
t.Logf("Worker %v: begin processing %v", i, item)
time.Sleep(3 * time.Millisecond)
t.Logf("Worker %v: done processing %v", i, item)
q.Done(item)
}
}(i)
}
producerWG.Wait()
q.ShutDown()
q.Add("added after shutdown!")
consumerWG.Wait()
}
func TestAddWhileProcessing(t *testing.T) {
q := workqueue.New()
// Start producers
const producers = 50
producerWG := sync.WaitGroup{}
producerWG.Add(producers)
for i := 0; i < producers; i++ {
go func(i int) {
defer producerWG.Done()
q.Add(i)
}(i)
}
// Start consumers
const consumers = 10
consumerWG := sync.WaitGroup{}
consumerWG.Add(consumers)
for i := 0; i < consumers; i++ {
go func(i int) {
defer consumerWG.Done()
// Every worker will re-add every item up to two times.
// This tests the dirty-while-processing case.
counters := map[interface{}]int{}
for {
item, quit := q.Get()
if quit {
return
}
counters[item]++
if counters[item] < 2 {
q.Add(item)
}
q.Done(item)
}
}(i)
}
producerWG.Wait()
q.ShutDown()
consumerWG.Wait()
}
func TestLen(t *testing.T) {
q := workqueue.New()
q.Add("foo")
if e, a := 1, q.Len(); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
q.Add("bar")
if e, a := 2, q.Len(); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
q.Add("foo") // should not increase the queue length.
if e, a := 2, q.Len(); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestReinsert(t *testing.T) {
q := workqueue.New()
q.Add("foo")
// Start processing
i, _ := q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
// Add it back while processing
q.Add(i)
// Finish it up
q.Done(i)
// It should be back on the queue
i, _ = q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
// Finish that one up
q.Done(i)
if a := q.Len(); a != 0 {
t.Errorf("Expected queue to be empty. Has %v items", a)
}
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
// AddRateLimited adds an item to the workqueue after the rate limiter says its ok
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
// Remember to call Forget! If you don't, you may end up tracking failures forever.
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: NewDelayingQueue(),
rateLimiter: rateLimiter,
}
}
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: NewNamedDelayingQueue(name),
rateLimiter: rateLimiter,
}
}
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter
}
// AddRateLimited AddAfter's the item based on the time when the rate limiter says its ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}

View File

@ -0,0 +1,75 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"testing"
"time"
"k8s.io/client-go/pkg/util/clock"
)
func TestRateLimitingQueue(t *testing.T) {
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
fakeClock := clock.NewFakeClock(time.Now())
delayingQueue := &delayingType{
Interface: New(),
clock: fakeClock,
heartbeat: fakeClock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan waitFor, 1000),
metrics: newRetryMetrics(""),
}
queue.DelayingInterface = delayingQueue
queue.AddRateLimited("one")
waitEntry := <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("one")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, queue.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("two")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("two")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.Forget("one")
if e, a := 0, queue.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("one")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import "time"
type TimedWorkQueue struct {
*Type
}
type TimedWorkQueueItem struct {
StartTime time.Time
Object interface{}
}
func NewTimedWorkQueue() *TimedWorkQueue {
return &TimedWorkQueue{New()}
}
// Add adds the obj along with the current timestamp to the queue.
func (q TimedWorkQueue) Add(timedItem *TimedWorkQueueItem) {
q.Type.Add(timedItem)
}
// Get gets the obj along with its timestamp from the queue.
func (q TimedWorkQueue) Get() (timedItem *TimedWorkQueueItem, shutdown bool) {
origin, shutdown := q.Type.Get()
if origin == nil {
return nil, shutdown
}
timedItem, _ = origin.(*TimedWorkQueueItem)
return timedItem, shutdown
}
func (q TimedWorkQueue) Done(timedItem *TimedWorkQueueItem) error {
q.Type.Done(timedItem)
return nil
}

View File

@ -0,0 +1,38 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"testing"
"time"
"k8s.io/client-go/pkg/api/v1"
)
func TestNoMemoryLeak(t *testing.T) {
timedQueue := NewTimedWorkQueue()
timedQueue.Add(&TimedWorkQueueItem{Object: &v1.Pod{}, StartTime: time.Time{}})
item, _ := timedQueue.Get()
timedQueue.Add(item)
// The item should still be in the timedQueue.
timedQueue.Done(item)
item, _ = timedQueue.Get()
timedQueue.Done(item)
if len(timedQueue.Type.processing) != 0 {
t.Errorf("expect timedQueue.Type.processing to be empty!")
}
}

View File

@ -181,6 +181,7 @@ type YAMLOrJSONDecoder struct {
bufferSize int
decoder decoder
rawData []byte
}
// NewYAMLOrJSONDecoder returns a decoder that will process YAML documents
@ -198,10 +199,11 @@ func NewYAMLOrJSONDecoder(r io.Reader, bufferSize int) *YAMLOrJSONDecoder {
// provide object, or returns an error.
func (d *YAMLOrJSONDecoder) Decode(into interface{}) error {
if d.decoder == nil {
buffer, isJSON := GuessJSONStream(d.r, d.bufferSize)
buffer, origData, isJSON := GuessJSONStream(d.r, d.bufferSize)
if isJSON {
glog.V(4).Infof("decoding stream as JSON")
d.decoder = json.NewDecoder(buffer)
d.rawData = origData
} else {
glog.V(4).Infof("decoding stream as YAML")
d.decoder = NewYAMLToJSONDecoder(buffer)
@ -215,6 +217,13 @@ func (d *YAMLOrJSONDecoder) Decode(into interface{}) error {
glog.V(4).Infof("reading stream failed: %v", readErr)
}
js := string(data)
// if contents from io.Reader are not complete,
// use the original raw data to prevent panic
if int64(len(js)) <= syntax.Offset {
js = string(d.rawData)
}
start := strings.LastIndex(js[:syntax.Offset], "\n") + 1
line := strings.Count(js[:start], "\n")
return fmt.Errorf("json: line %d: %s", line, syntax.Error())
@ -296,10 +305,10 @@ func (r *LineReader) Read() ([]byte, error) {
// GuessJSONStream scans the provided reader up to size, looking
// for an open brace indicating this is JSON. It will return the
// bufio.Reader it creates for the consumer.
func GuessJSONStream(r io.Reader, size int) (io.Reader, bool) {
func GuessJSONStream(r io.Reader, size int) (io.Reader, []byte, bool) {
buffer := bufio.NewReaderSize(r, size)
b, _ := buffer.Peek(size)
return buffer, hasJSONPrefix(b)
return buffer, b, hasJSONPrefix(b)
}
var jsonPrefix = []byte("{")

368
vendor/BUILD vendored
View File

@ -802,64 +802,6 @@ go_library(
tags = ["automanaged"],
)
go_library(
name = "github.com/chai2010/gettext-go/gettext",
srcs = [
"github.com/chai2010/gettext-go/gettext/caller.go",
"github.com/chai2010/gettext-go/gettext/doc.go",
"github.com/chai2010/gettext-go/gettext/domain.go",
"github.com/chai2010/gettext-go/gettext/domain_helper.go",
"github.com/chai2010/gettext-go/gettext/fs.go",
"github.com/chai2010/gettext-go/gettext/gettext.go",
"github.com/chai2010/gettext-go/gettext/local.go",
"github.com/chai2010/gettext-go/gettext/tr.go",
],
tags = ["automanaged"],
deps = [
"//vendor:github.com/chai2010/gettext-go/gettext/mo",
"//vendor:github.com/chai2010/gettext-go/gettext/plural",
"//vendor:github.com/chai2010/gettext-go/gettext/po",
],
)
go_library(
name = "github.com/chai2010/gettext-go/gettext/mo",
srcs = [
"github.com/chai2010/gettext-go/gettext/mo/doc.go",
"github.com/chai2010/gettext-go/gettext/mo/encoder.go",
"github.com/chai2010/gettext-go/gettext/mo/file.go",
"github.com/chai2010/gettext-go/gettext/mo/header.go",
"github.com/chai2010/gettext-go/gettext/mo/message.go",
"github.com/chai2010/gettext-go/gettext/mo/util.go",
],
tags = ["automanaged"],
)
go_library(
name = "github.com/chai2010/gettext-go/gettext/plural",
srcs = [
"github.com/chai2010/gettext-go/gettext/plural/doc.go",
"github.com/chai2010/gettext-go/gettext/plural/formula.go",
"github.com/chai2010/gettext-go/gettext/plural/table.go",
],
tags = ["automanaged"],
)
go_library(
name = "github.com/chai2010/gettext-go/gettext/po",
srcs = [
"github.com/chai2010/gettext-go/gettext/po/comment.go",
"github.com/chai2010/gettext-go/gettext/po/doc.go",
"github.com/chai2010/gettext-go/gettext/po/file.go",
"github.com/chai2010/gettext-go/gettext/po/header.go",
"github.com/chai2010/gettext-go/gettext/po/line_reader.go",
"github.com/chai2010/gettext-go/gettext/po/message.go",
"github.com/chai2010/gettext-go/gettext/po/re.go",
"github.com/chai2010/gettext-go/gettext/po/util.go",
],
tags = ["automanaged"],
)
go_library(
name = "github.com/cloudflare/cfssl/auth",
srcs = ["github.com/cloudflare/cfssl/auth/auth.go"],
@ -4316,17 +4258,6 @@ go_binary(
tags = ["automanaged"],
)
go_library(
name = "github.com/jteeuwen/go-bindata/go-bindata",
srcs = [
"github.com/jteeuwen/go-bindata/go-bindata/AppendSliceValue.go",
"github.com/jteeuwen/go-bindata/go-bindata/main.go",
"github.com/jteeuwen/go-bindata/go-bindata/version.go",
],
tags = ["automanaged"],
deps = ["//vendor:github.com/jteeuwen/go-bindata"],
)
go_library(
name = "github.com/juju/ratelimit",
srcs = [
@ -4633,37 +4564,6 @@ go_binary(
tags = ["automanaged"],
)
go_library(
name = "github.com/onsi/ginkgo/ginkgo",
srcs = [
"github.com/onsi/ginkgo/ginkgo/bootstrap_command.go",
"github.com/onsi/ginkgo/ginkgo/build_command.go",
"github.com/onsi/ginkgo/ginkgo/convert_command.go",
"github.com/onsi/ginkgo/ginkgo/generate_command.go",
"github.com/onsi/ginkgo/ginkgo/help_command.go",
"github.com/onsi/ginkgo/ginkgo/main.go",
"github.com/onsi/ginkgo/ginkgo/nodot_command.go",
"github.com/onsi/ginkgo/ginkgo/notifications.go",
"github.com/onsi/ginkgo/ginkgo/run_command.go",
"github.com/onsi/ginkgo/ginkgo/run_watch_and_build_command_flags.go",
"github.com/onsi/ginkgo/ginkgo/suite_runner.go",
"github.com/onsi/ginkgo/ginkgo/unfocus_command.go",
"github.com/onsi/ginkgo/ginkgo/version_command.go",
"github.com/onsi/ginkgo/ginkgo/watch_command.go",
],
tags = ["automanaged"],
deps = [
"//vendor:github.com/onsi/ginkgo/config",
"//vendor:github.com/onsi/ginkgo/ginkgo/convert",
"//vendor:github.com/onsi/ginkgo/ginkgo/interrupthandler",
"//vendor:github.com/onsi/ginkgo/ginkgo/nodot",
"//vendor:github.com/onsi/ginkgo/ginkgo/testrunner",
"//vendor:github.com/onsi/ginkgo/ginkgo/testsuite",
"//vendor:github.com/onsi/ginkgo/ginkgo/watch",
"//vendor:github.com/onsi/ginkgo/types",
],
)
go_library(
name = "github.com/onsi/ginkgo/ginkgo/convert",
srcs = [
@ -6472,15 +6372,6 @@ go_binary(
tags = ["automanaged"],
)
go_library(
name = "github.com/ugorji/go/codec/codecgen",
srcs = [
"github.com/ugorji/go/codec/codecgen/gen.go",
"github.com/ugorji/go/codec/codecgen/z.go",
],
tags = ["automanaged"],
)
go_library(
name = "github.com/vishvananda/netlink",
srcs = [
@ -7284,65 +7175,12 @@ go_library(
],
)
go_library(
name = "golang.org/x/text/encoding",
srcs = ["golang.org/x/text/encoding/encoding.go"],
tags = ["automanaged"],
deps = [
"//vendor:golang.org/x/text/encoding/internal/identifier",
"//vendor:golang.org/x/text/transform",
],
)
go_library(
name = "golang.org/x/text/encoding/internal",
srcs = ["golang.org/x/text/encoding/internal/internal.go"],
tags = ["automanaged"],
deps = [
"//vendor:golang.org/x/text/encoding",
"//vendor:golang.org/x/text/encoding/internal/identifier",
"//vendor:golang.org/x/text/transform",
],
)
go_library(
name = "golang.org/x/text/encoding/internal/identifier",
srcs = [
"golang.org/x/text/encoding/internal/identifier/identifier.go",
"golang.org/x/text/encoding/internal/identifier/mib.go",
],
tags = ["automanaged"],
)
go_library(
name = "golang.org/x/text/encoding/unicode",
srcs = [
"golang.org/x/text/encoding/unicode/override.go",
"golang.org/x/text/encoding/unicode/unicode.go",
],
tags = ["automanaged"],
deps = [
"//vendor:golang.org/x/text/encoding",
"//vendor:golang.org/x/text/encoding/internal",
"//vendor:golang.org/x/text/encoding/internal/identifier",
"//vendor:golang.org/x/text/internal/utf8internal",
"//vendor:golang.org/x/text/runes",
"//vendor:golang.org/x/text/transform",
],
)
go_library(
name = "golang.org/x/text/internal/tag",
srcs = ["golang.org/x/text/internal/tag/tag.go"],
tags = ["automanaged"],
)
go_library(
name = "golang.org/x/text/internal/utf8internal",
srcs = ["golang.org/x/text/internal/utf8internal/utf8internal.go"],
tags = ["automanaged"],
)
go_library(
name = "golang.org/x/text/language",
srcs = [
@ -12188,6 +12026,159 @@ go_library(
],
)
go_test(
name = "k8s.io/client-go/pkg/util/workqueue_test",
srcs = [
"k8s.io/client-go/pkg/util/workqueue/default_rate_limiters_test.go",
"k8s.io/client-go/pkg/util/workqueue/delaying_queue_test.go",
"k8s.io/client-go/pkg/util/workqueue/rate_limitting_queue_test.go",
"k8s.io/client-go/pkg/util/workqueue/timed_queue_test.go",
],
library = ":k8s.io/client-go/pkg/util/workqueue",
tags = ["automanaged"],
deps = [
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/pkg/util/clock",
"//vendor:k8s.io/client-go/pkg/util/wait",
],
)
go_library(
name = "k8s.io/client-go/pkg/util/workqueue",
srcs = [
"k8s.io/client-go/pkg/util/workqueue/default_rate_limiters.go",
"k8s.io/client-go/pkg/util/workqueue/delaying_queue.go",
"k8s.io/client-go/pkg/util/workqueue/doc.go",
"k8s.io/client-go/pkg/util/workqueue/metrics.go",
"k8s.io/client-go/pkg/util/workqueue/parallelizer.go",
"k8s.io/client-go/pkg/util/workqueue/queue.go",
"k8s.io/client-go/pkg/util/workqueue/rate_limitting_queue.go",
"k8s.io/client-go/pkg/util/workqueue/timed_queue.go",
],
tags = ["automanaged"],
deps = [
"//vendor:github.com/juju/ratelimit",
"//vendor:k8s.io/client-go/pkg/util/clock",
"//vendor:k8s.io/client-go/pkg/util/runtime",
],
)
go_test(
name = "k8s.io/client-go/pkg/util/workqueue_xtest",
srcs = ["k8s.io/client-go/pkg/util/workqueue/queue_test.go"],
tags = ["automanaged"],
deps = ["//vendor:k8s.io/client-go/pkg/util/workqueue"],
)
go_library(
name = "github.com/chai2010/gettext-go/gettext",
srcs = [
"github.com/chai2010/gettext-go/gettext/caller.go",
"github.com/chai2010/gettext-go/gettext/doc.go",
"github.com/chai2010/gettext-go/gettext/domain.go",
"github.com/chai2010/gettext-go/gettext/domain_helper.go",
"github.com/chai2010/gettext-go/gettext/fs.go",
"github.com/chai2010/gettext-go/gettext/gettext.go",
"github.com/chai2010/gettext-go/gettext/local.go",
"github.com/chai2010/gettext-go/gettext/tr.go",
],
tags = ["automanaged"],
deps = [
"//vendor:github.com/chai2010/gettext-go/gettext/mo",
"//vendor:github.com/chai2010/gettext-go/gettext/plural",
"//vendor:github.com/chai2010/gettext-go/gettext/po",
],
)
go_library(
name = "github.com/chai2010/gettext-go/gettext/mo",
srcs = [
"github.com/chai2010/gettext-go/gettext/mo/doc.go",
"github.com/chai2010/gettext-go/gettext/mo/encoder.go",
"github.com/chai2010/gettext-go/gettext/mo/file.go",
"github.com/chai2010/gettext-go/gettext/mo/header.go",
"github.com/chai2010/gettext-go/gettext/mo/message.go",
"github.com/chai2010/gettext-go/gettext/mo/util.go",
],
tags = ["automanaged"],
)
go_library(
name = "github.com/chai2010/gettext-go/gettext/plural",
srcs = [
"github.com/chai2010/gettext-go/gettext/plural/doc.go",
"github.com/chai2010/gettext-go/gettext/plural/formula.go",
"github.com/chai2010/gettext-go/gettext/plural/table.go",
],
tags = ["automanaged"],
)
go_library(
name = "github.com/chai2010/gettext-go/gettext/po",
srcs = [
"github.com/chai2010/gettext-go/gettext/po/comment.go",
"github.com/chai2010/gettext-go/gettext/po/doc.go",
"github.com/chai2010/gettext-go/gettext/po/file.go",
"github.com/chai2010/gettext-go/gettext/po/header.go",
"github.com/chai2010/gettext-go/gettext/po/line_reader.go",
"github.com/chai2010/gettext-go/gettext/po/message.go",
"github.com/chai2010/gettext-go/gettext/po/re.go",
"github.com/chai2010/gettext-go/gettext/po/util.go",
],
tags = ["automanaged"],
)
go_library(
name = "github.com/jteeuwen/go-bindata/go-bindata",
srcs = [
"github.com/jteeuwen/go-bindata/go-bindata/AppendSliceValue.go",
"github.com/jteeuwen/go-bindata/go-bindata/main.go",
"github.com/jteeuwen/go-bindata/go-bindata/version.go",
],
tags = ["automanaged"],
deps = ["//vendor:github.com/jteeuwen/go-bindata"],
)
go_library(
name = "github.com/onsi/ginkgo/ginkgo",
srcs = [
"github.com/onsi/ginkgo/ginkgo/bootstrap_command.go",
"github.com/onsi/ginkgo/ginkgo/build_command.go",
"github.com/onsi/ginkgo/ginkgo/convert_command.go",
"github.com/onsi/ginkgo/ginkgo/generate_command.go",
"github.com/onsi/ginkgo/ginkgo/help_command.go",
"github.com/onsi/ginkgo/ginkgo/main.go",
"github.com/onsi/ginkgo/ginkgo/nodot_command.go",
"github.com/onsi/ginkgo/ginkgo/notifications.go",
"github.com/onsi/ginkgo/ginkgo/run_command.go",
"github.com/onsi/ginkgo/ginkgo/run_watch_and_build_command_flags.go",
"github.com/onsi/ginkgo/ginkgo/suite_runner.go",
"github.com/onsi/ginkgo/ginkgo/unfocus_command.go",
"github.com/onsi/ginkgo/ginkgo/version_command.go",
"github.com/onsi/ginkgo/ginkgo/watch_command.go",
],
tags = ["automanaged"],
deps = [
"//vendor:github.com/onsi/ginkgo/config",
"//vendor:github.com/onsi/ginkgo/ginkgo/convert",
"//vendor:github.com/onsi/ginkgo/ginkgo/interrupthandler",
"//vendor:github.com/onsi/ginkgo/ginkgo/nodot",
"//vendor:github.com/onsi/ginkgo/ginkgo/testrunner",
"//vendor:github.com/onsi/ginkgo/ginkgo/testsuite",
"//vendor:github.com/onsi/ginkgo/ginkgo/watch",
"//vendor:github.com/onsi/ginkgo/types",
],
)
go_library(
name = "github.com/ugorji/go/codec/codecgen",
srcs = [
"github.com/ugorji/go/codec/codecgen/gen.go",
"github.com/ugorji/go/codec/codecgen/z.go",
],
tags = ["automanaged"],
)
go_library(
name = "golang.org/x/crypto/ed25519",
srcs = ["golang.org/x/crypto/ed25519/ed25519.go"],
@ -12203,3 +12194,56 @@ go_library(
],
tags = ["automanaged"],
)
go_library(
name = "golang.org/x/text/encoding",
srcs = ["golang.org/x/text/encoding/encoding.go"],
tags = ["automanaged"],
deps = [
"//vendor:golang.org/x/text/encoding/internal/identifier",
"//vendor:golang.org/x/text/transform",
],
)
go_library(
name = "golang.org/x/text/encoding/internal",
srcs = ["golang.org/x/text/encoding/internal/internal.go"],
tags = ["automanaged"],
deps = [
"//vendor:golang.org/x/text/encoding",
"//vendor:golang.org/x/text/encoding/internal/identifier",
"//vendor:golang.org/x/text/transform",
],
)
go_library(
name = "golang.org/x/text/encoding/internal/identifier",
srcs = [
"golang.org/x/text/encoding/internal/identifier/identifier.go",
"golang.org/x/text/encoding/internal/identifier/mib.go",
],
tags = ["automanaged"],
)
go_library(
name = "golang.org/x/text/encoding/unicode",
srcs = [
"golang.org/x/text/encoding/unicode/override.go",
"golang.org/x/text/encoding/unicode/unicode.go",
],
tags = ["automanaged"],
deps = [
"//vendor:golang.org/x/text/encoding",
"//vendor:golang.org/x/text/encoding/internal",
"//vendor:golang.org/x/text/encoding/internal/identifier",
"//vendor:golang.org/x/text/internal/utf8internal",
"//vendor:golang.org/x/text/runes",
"//vendor:golang.org/x/text/transform",
],
)
go_library(
name = "golang.org/x/text/internal/utf8internal",
srcs = ["golang.org/x/text/internal/utf8internal/utf8internal.go"],
tags = ["automanaged"],
)