Update highwatermark

pull/6/head
Harry Zhang 2015-10-26 03:35:57 +00:00 committed by harry
parent 5405a5d98d
commit 8fe92c69d2
6 changed files with 55 additions and 132 deletions

View File

@ -19,6 +19,7 @@ package etcd
import (
"net/http"
"sync"
"sync/atomic"
"time"
"k8s.io/kubernetes/pkg/api/unversioned"
@ -26,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/storage"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/atomic"
"k8s.io/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
@ -43,6 +43,23 @@ const (
EtcdExpire = "expire"
)
// HighWaterMark is a thread-safe object for tracking the maximum value seen
// for some quantity.
type HighWaterMark int64
// Update returns true if and only if 'current' is the highest value ever seen.
func (hwm *HighWaterMark) Update(current int64) bool {
for {
old := atomic.LoadInt64((*int64)(hwm))
if current <= old {
return false
}
if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
return true
}
}
}
// TransformFunc attempts to convert an object to another object for use with a watcher.
type TransformFunc func(runtime.Object) (runtime.Object, error)
@ -170,7 +187,7 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming
}
var (
watchChannelHWM atomic.HighWaterMark
watchChannelHWM HighWaterMark
)
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
@ -215,7 +232,7 @@ func (w *etcdWatcher) translate() {
return
case res, ok := <-w.etcdIncoming:
if ok {
if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Check(curLen) {
if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(2).Infof("watch: %v objects queued in channel.", curLen)
}

View File

@ -17,7 +17,9 @@ limitations under the License.
package etcd
import (
"math/rand"
rt "runtime"
"sync"
"testing"
"github.com/coreos/go-etcd/etcd"
@ -463,3 +465,34 @@ func TestWatchPurposefulShutdown(t *testing.T) {
t.Errorf("Channel should be closed")
}
}
func TestHighWaterMark(t *testing.T) {
var h HighWaterMark
for i := int64(10); i < 20; i++ {
if !h.Update(i) {
t.Errorf("unexpected false for %v", i)
}
if h.Update(i - 1) {
t.Errorf("unexpected true for %v", i-1)
}
}
m := int64(0)
wg := sync.WaitGroup{}
for i := 0; i < 300; i++ {
wg.Add(1)
v := rand.Int63()
go func(v int64) {
defer wg.Done()
h.Update(v)
}(v)
if v > m {
m = v
}
}
wg.Wait()
if m != int64(h) {
t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
}
}

View File

@ -1,39 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package atomic
import (
"sync"
"sync/atomic"
)
// HighWaterMark is a thread-safe object for tracking the maximum value seen
// for some quantity.
type HighWaterMark int64
// Check returns true if and only if 'current' is the highest value ever seen.
func (hwm *HighWaterMark) Update(current int64) bool {
for {
old := atomic.LoadInt64((*int64)(hwm))
if current <= old {
return false
}
if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
return true
}
}
}

View File

@ -1,54 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package atomic
import (
"math/rand"
"sync"
"testing"
)
func TestHighWaterMark(t *testing.T) {
var h HighWaterMark
for i := int64(10); i < 20; i++ {
if !h.Check(i) {
t.Errorf("unexpected false for %v", i)
}
if h.Check(i - 1) {
t.Errorf("unexpected true for %v", i-1)
}
}
m := int64(0)
wg := sync.WaitGroup{}
for i := 0; i < 300; i++ {
wg.Add(1)
v := rand.Int63()
go func(v int64) {
defer wg.Done()
h.Check(v)
}(v)
if v > m {
m = v
}
}
wg.Wait()
if m != int64(h) {
t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
}
}

View File

@ -18,7 +18,6 @@ package atomic
import (
"sync"
"sync/atomic"
)
// TODO(ArtfulCoder)

View File

@ -17,15 +17,13 @@ limitations under the License.
package atomic
import (
"math/rand"
"sync"
"testing"
"time"
"k8s.io/kubernetes/pkg/util"
)
func ExpectValue(t *testing.T, atomicValue *AtomicValue, expectedValue interface{}) {
func ExpectValue(t *testing.T, atomicValue *Value, expectedValue interface{}) {
actualValue := atomicValue.Load()
if actualValue != expectedValue {
t.Errorf("Expected to find %v, found %v", expectedValue, actualValue)
@ -47,39 +45,8 @@ func ExpectValue(t *testing.T, atomicValue *AtomicValue, expectedValue interface
}
func TestAtomicValue(t *testing.T) {
atomicValue := &AtomicValue{}
atomicValue := &Value{}
ExpectValue(t, atomicValue, nil)
atomicValue.Store(10)
ExpectValue(t, atomicValue, 10)
}
func TestHighWaterMark(t *testing.T) {
var h HighWaterMark
for i := int64(10); i < 20; i++ {
if !h.Check(i) {
t.Errorf("unexpected false for %v", i)
}
if h.Check(i - 1) {
t.Errorf("unexpected true for %v", i-1)
}
}
m := int64(0)
wg := sync.WaitGroup{}
for i := 0; i < 300; i++ {
wg.Add(1)
v := rand.Int63()
go func(v int64) {
defer wg.Done()
h.Check(v)
}(v)
if v > m {
m = v
}
}
wg.Wait()
if m != int64(h) {
t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
}
}