diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index be398b0cf7..20549ef397 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -19,6 +19,7 @@ package etcd import ( "net/http" "sync" + "sync/atomic" "time" "k8s.io/kubernetes/pkg/api/unversioned" @@ -26,7 +27,6 @@ import ( "k8s.io/kubernetes/pkg/storage" etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/util/atomic" "k8s.io/kubernetes/pkg/watch" "github.com/coreos/go-etcd/etcd" @@ -43,6 +43,23 @@ const ( EtcdExpire = "expire" ) +// HighWaterMark is a thread-safe object for tracking the maximum value seen +// for some quantity. +type HighWaterMark int64 + +// Update returns true if and only if 'current' is the highest value ever seen. +func (hwm *HighWaterMark) Update(current int64) bool { + for { + old := atomic.LoadInt64((*int64)(hwm)) + if current <= old { + return false + } + if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) { + return true + } + } +} + // TransformFunc attempts to convert an object to another object for use with a watcher. type TransformFunc func(runtime.Object) (runtime.Object, error) @@ -170,7 +187,7 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming } var ( - watchChannelHWM atomic.HighWaterMark + watchChannelHWM HighWaterMark ) // translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be @@ -215,7 +232,7 @@ func (w *etcdWatcher) translate() { return case res, ok := <-w.etcdIncoming: if ok { - if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Check(curLen) { + if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Update(curLen) { // Monitor if this gets backed up, and how much. glog.V(2).Infof("watch: %v objects queued in channel.", curLen) } diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index 561998bac2..12a7f6d07f 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -17,7 +17,9 @@ limitations under the License. package etcd import ( + "math/rand" rt "runtime" + "sync" "testing" "github.com/coreos/go-etcd/etcd" @@ -463,3 +465,34 @@ func TestWatchPurposefulShutdown(t *testing.T) { t.Errorf("Channel should be closed") } } + +func TestHighWaterMark(t *testing.T) { + var h HighWaterMark + + for i := int64(10); i < 20; i++ { + if !h.Update(i) { + t.Errorf("unexpected false for %v", i) + } + if h.Update(i - 1) { + t.Errorf("unexpected true for %v", i-1) + } + } + + m := int64(0) + wg := sync.WaitGroup{} + for i := 0; i < 300; i++ { + wg.Add(1) + v := rand.Int63() + go func(v int64) { + defer wg.Done() + h.Update(v) + }(v) + if v > m { + m = v + } + } + wg.Wait() + if m != int64(h) { + t.Errorf("unexpected value, wanted %v, got %v", m, int64(h)) + } +} diff --git a/pkg/util/atomic/highwatermark.go b/pkg/util/atomic/highwatermark.go deleted file mode 100644 index bdf19ffc4f..0000000000 --- a/pkg/util/atomic/highwatermark.go +++ /dev/null @@ -1,39 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package atomic - -import ( - "sync" - "sync/atomic" -) - -// HighWaterMark is a thread-safe object for tracking the maximum value seen -// for some quantity. -type HighWaterMark int64 - -// Check returns true if and only if 'current' is the highest value ever seen. -func (hwm *HighWaterMark) Update(current int64) bool { - for { - old := atomic.LoadInt64((*int64)(hwm)) - if current <= old { - return false - } - if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) { - return true - } - } -} diff --git a/pkg/util/atomic/highwatermark_test.go b/pkg/util/atomic/highwatermark_test.go deleted file mode 100644 index da724b52c7..0000000000 --- a/pkg/util/atomic/highwatermark_test.go +++ /dev/null @@ -1,54 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package atomic - -import ( - "math/rand" - "sync" - "testing" -) - -func TestHighWaterMark(t *testing.T) { - var h HighWaterMark - - for i := int64(10); i < 20; i++ { - if !h.Check(i) { - t.Errorf("unexpected false for %v", i) - } - if h.Check(i - 1) { - t.Errorf("unexpected true for %v", i-1) - } - } - - m := int64(0) - wg := sync.WaitGroup{} - for i := 0; i < 300; i++ { - wg.Add(1) - v := rand.Int63() - go func(v int64) { - defer wg.Done() - h.Check(v) - }(v) - if v > m { - m = v - } - } - wg.Wait() - if m != int64(h) { - t.Errorf("unexpected value, wanted %v, got %v", m, int64(h)) - } -} diff --git a/pkg/util/atomic/value.go b/pkg/util/atomic/value.go index f87b9327d5..a9bc8cd813 100644 --- a/pkg/util/atomic/value.go +++ b/pkg/util/atomic/value.go @@ -18,7 +18,6 @@ package atomic import ( "sync" - "sync/atomic" ) // TODO(ArtfulCoder) diff --git a/pkg/util/atomic/value_test.go b/pkg/util/atomic/value_test.go index cbab545884..0cb839dd5e 100644 --- a/pkg/util/atomic/value_test.go +++ b/pkg/util/atomic/value_test.go @@ -17,15 +17,13 @@ limitations under the License. package atomic import ( - "math/rand" - "sync" "testing" "time" "k8s.io/kubernetes/pkg/util" ) -func ExpectValue(t *testing.T, atomicValue *AtomicValue, expectedValue interface{}) { +func ExpectValue(t *testing.T, atomicValue *Value, expectedValue interface{}) { actualValue := atomicValue.Load() if actualValue != expectedValue { t.Errorf("Expected to find %v, found %v", expectedValue, actualValue) @@ -47,39 +45,8 @@ func ExpectValue(t *testing.T, atomicValue *AtomicValue, expectedValue interface } func TestAtomicValue(t *testing.T) { - atomicValue := &AtomicValue{} + atomicValue := &Value{} ExpectValue(t, atomicValue, nil) atomicValue.Store(10) ExpectValue(t, atomicValue, 10) } - -func TestHighWaterMark(t *testing.T) { - var h HighWaterMark - - for i := int64(10); i < 20; i++ { - if !h.Check(i) { - t.Errorf("unexpected false for %v", i) - } - if h.Check(i - 1) { - t.Errorf("unexpected true for %v", i-1) - } - } - - m := int64(0) - wg := sync.WaitGroup{} - for i := 0; i < 300; i++ { - wg.Add(1) - v := rand.Int63() - go func(v int64) { - defer wg.Done() - h.Check(v) - }(v) - if v > m { - m = v - } - } - wg.Wait() - if m != int64(h) { - t.Errorf("unexpected value, wanted %v, got %v", m, int64(h)) - } -}