mirror of https://github.com/k3s-io/k3s
Make cache.Reflector more injectable. Add test for resource version state keeping.
parent
6ca912e07f
commit
4c3f509d94
|
@ -21,7 +21,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
@ -41,42 +40,49 @@ type Store interface {
|
||||||
|
|
||||||
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
|
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
|
||||||
type Reflector struct {
|
type Reflector struct {
|
||||||
kubeClient *client.Client
|
// The type of object we expect to place in the store.
|
||||||
resource string
|
|
||||||
expectedType reflect.Type
|
expectedType reflect.Type
|
||||||
|
// The destination to sync up with the watch source
|
||||||
store Store
|
store Store
|
||||||
|
// watchCreater is called to initiate watches.
|
||||||
|
watchFactory WatchFactory
|
||||||
|
// loopDelay controls timing between one watch ending and
|
||||||
|
// the beginning of the next one.
|
||||||
|
loopDelay time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WatchFactory should begin a watch at the specified version.
|
||||||
|
type WatchFactory func(resourceVersion uint64) (watch.Interface, error)
|
||||||
|
|
||||||
// NewReflector makes a new Reflector object which will keep the given store up to
|
// NewReflector makes a new Reflector object which will keep the given store up to
|
||||||
// date with the server's contents for the given resource. Reflector promises to
|
// date with the server's contents for the given resource. Reflector promises to
|
||||||
// only put things in the store that have the type of expectedType.
|
// only put things in the store that have the type of expectedType.
|
||||||
// TODO: define a query so you only locally cache a subset of items.
|
func NewReflector(watchFactory WatchFactory, expectedType interface{}, store Store) *Reflector {
|
||||||
func NewReflector(resource string, kubeClient *client.Client, expectedType interface{}, store Store) *Reflector {
|
|
||||||
gc := &Reflector{
|
gc := &Reflector{
|
||||||
resource: resource,
|
watchFactory: watchFactory,
|
||||||
kubeClient: kubeClient,
|
|
||||||
store: store,
|
store: store,
|
||||||
expectedType: reflect.TypeOf(expectedType),
|
expectedType: reflect.TypeOf(expectedType),
|
||||||
|
loopDelay: time.Second,
|
||||||
}
|
}
|
||||||
return gc
|
return gc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||||
|
// Run starts a goroutine and returns immediately.
|
||||||
func (gc *Reflector) Run() {
|
func (gc *Reflector) Run() {
|
||||||
|
var resourceVersion uint64
|
||||||
go util.Forever(func() {
|
go util.Forever(func() {
|
||||||
w, err := gc.startWatch()
|
w, err := gc.watchFactory(resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("failed to watch %v: %v", gc.resource, err)
|
glog.Errorf("failed to watch %v: %v", gc.expectedType, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
gc.watchHandler(w)
|
gc.watchHandler(w, &resourceVersion)
|
||||||
}, 5*time.Second)
|
}, gc.loopDelay)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gc *Reflector) startWatch() (watch.Interface, error) {
|
// watchHandler watches w and keeps *resourceVersion up to date.
|
||||||
return gc.kubeClient.Get().Path(gc.resource).Path("watch").Watch()
|
func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
|
||||||
}
|
|
||||||
|
|
||||||
func (gc *Reflector) watchHandler(w watch.Interface) {
|
|
||||||
for {
|
for {
|
||||||
event, ok := <-w.ResultChan()
|
event, ok := <-w.ResultChan()
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -102,5 +108,9 @@ func (gc *Reflector) watchHandler(w watch.Interface) {
|
||||||
default:
|
default:
|
||||||
glog.Errorf("unable to understand watch event %#v", event)
|
glog.Errorf("unable to understand watch event %#v", event)
|
||||||
}
|
}
|
||||||
|
next := jsonBase.ResourceVersion() + 1
|
||||||
|
if next > *resourceVersion {
|
||||||
|
*resourceVersion = next
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,29 +17,27 @@ limitations under the License.
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestReflector_watchHandler(t *testing.T) {
|
func TestReflector_watchHandler(t *testing.T) {
|
||||||
s := NewStore()
|
s := NewStore()
|
||||||
g := NewReflector("foo", nil, &api.Pod{}, s)
|
g := NewReflector(nil, &api.Pod{}, s)
|
||||||
fw := watch.NewFake()
|
fw := watch.NewFake()
|
||||||
s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}})
|
s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}})
|
||||||
s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}})
|
s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}})
|
||||||
go func() {
|
go func() {
|
||||||
fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}})
|
fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}})
|
||||||
fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz"}})
|
fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 32}})
|
||||||
fw.Add(&api.Service{JSONBase: api.JSONBase{ID: "rejected"}})
|
fw.Add(&api.Service{JSONBase: api.JSONBase{ID: "rejected"}})
|
||||||
fw.Delete(&api.Pod{JSONBase: api.JSONBase{ID: "foo"}})
|
fw.Delete(&api.Pod{JSONBase: api.JSONBase{ID: "foo"}})
|
||||||
fw.Stop()
|
fw.Stop()
|
||||||
}()
|
}()
|
||||||
g.watchHandler(fw)
|
var resumeRV uint64
|
||||||
|
g.watchHandler(fw, &resumeRV)
|
||||||
|
|
||||||
table := []struct {
|
table := []struct {
|
||||||
ID string
|
ID string
|
||||||
|
@ -49,7 +47,7 @@ func TestReflector_watchHandler(t *testing.T) {
|
||||||
{"foo", 0, false},
|
{"foo", 0, false},
|
||||||
{"rejected", 0, false},
|
{"rejected", 0, false},
|
||||||
{"bar", 55, true},
|
{"bar", 55, true},
|
||||||
{"baz", 0, true},
|
{"baz", 32, true},
|
||||||
}
|
}
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
obj, exists := s.Get(item.ID)
|
obj, exists := s.Get(item.ID)
|
||||||
|
@ -63,32 +61,62 @@ func TestReflector_watchHandler(t *testing.T) {
|
||||||
t.Errorf("%v: expected %v, got %v", item.ID, e, a)
|
t.Errorf("%v: expected %v, got %v", item.ID, e, a)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RV should stay 1 higher than the highest id we see.
|
||||||
|
if e, a := uint64(56), resumeRV; e != a {
|
||||||
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReflector_startWatch(t *testing.T) {
|
func TestReflector_Run(t *testing.T) {
|
||||||
table := []struct{ resource, path string }{
|
createdFakes := make(chan *watch.FakeWatcher)
|
||||||
{"pods", "/api/v1beta1/pods/watch"},
|
|
||||||
{"services", "/api/v1beta1/services/watch"},
|
// Expect our starter to get called at the beginning of the watch with 0, and again with 3 when we
|
||||||
|
// inject an error at 2.
|
||||||
|
expectedRVs := []uint64{0, 3}
|
||||||
|
watchStarter := func(rv uint64) (watch.Interface, error) {
|
||||||
|
fw := watch.NewFake()
|
||||||
|
if e, a := expectedRVs[0], rv; e != a {
|
||||||
|
t.Errorf("Expected rv %v, but got %v", e, a)
|
||||||
}
|
}
|
||||||
for _, testItem := range table {
|
expectedRVs = expectedRVs[1:]
|
||||||
got := make(chan struct{})
|
// channel is not buffered because the for loop below needs to block. But
|
||||||
srv := httptest.NewServer(http.HandlerFunc(
|
// we don't want to block here, so report the new fake via a go routine.
|
||||||
func(w http.ResponseWriter, req *http.Request) {
|
go func() { createdFakes <- fw }()
|
||||||
w.WriteHeader(http.StatusNotFound)
|
return fw, nil
|
||||||
if req.URL.Path == testItem.path {
|
|
||||||
close(got)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
t.Errorf("unexpected path %v", req.URL.Path)
|
s := NewFIFO()
|
||||||
}))
|
r := NewReflector(watchStarter, &api.Pod{}, s)
|
||||||
s := NewStore()
|
r.loopDelay = 0
|
||||||
c := client.New(srv.URL, nil)
|
r.Run()
|
||||||
g := NewReflector(testItem.resource, c, &api.Pod{}, s)
|
|
||||||
_, err := g.startWatch()
|
ids := []string{"foo", "bar", "baz", "qux", "zoo"}
|
||||||
// We're just checking that it watches the right path.
|
var fw *watch.FakeWatcher
|
||||||
if err == nil {
|
for i, id := range ids {
|
||||||
t.Errorf("unexpected non-error")
|
if fw == nil {
|
||||||
|
fw = <-createdFakes
|
||||||
}
|
}
|
||||||
<-got
|
sendingRV := uint64(i + 1)
|
||||||
|
fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: id, ResourceVersion: sendingRV}})
|
||||||
|
if sendingRV == 2 {
|
||||||
|
// Inject a failure.
|
||||||
|
fw.Stop()
|
||||||
|
fw = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we received the right ids with the right resource versions.
|
||||||
|
for i, id := range ids {
|
||||||
|
pod := s.Pop().(*api.Pod)
|
||||||
|
if e, a := id, pod.ID; e != a {
|
||||||
|
t.Errorf("%v: Expected %v, got %v", i, e, a)
|
||||||
|
}
|
||||||
|
if e, a := uint64(i+1), pod.ResourceVersion; e != a {
|
||||||
|
t.Errorf("%v: Expected %v, got %v", i, e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(expectedRVs) != 0 {
|
||||||
|
t.Error("called watchStarter an unexpected number of times")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue