Make ResourceVersion a string internally instead of uint64

Allows us to define different watch versioning regimes in the future
as well as to encode information with the resource version.

This changes /watch/resources?resourceVersion=3 to start the watch at
4 instead of 3, which means clients can read a resource version and
then send it back to the server. Clients should no longer do math on
resource versions.
pull/6/head
Clayton Coleman 2014-10-07 16:51:28 -04:00
parent 31e02b882b
commit 82bcdd3b3b
54 changed files with 518 additions and 240 deletions

View File

@ -311,7 +311,7 @@ func executeAPIRequest(ctx api.Context, method string, c *client.Client) bool {
validStorage := checkStorage(storage)
verb := ""
setBody := false
var version uint64
var version string
printer := getPrinter()
@ -369,7 +369,7 @@ func executeAPIRequest(ctx api.Context, method string, c *client.Client) bool {
r.ParseSelectorParam("labels", *selector)
}
if setBody {
if version != 0 {
if len(version) > 0 {
data := readConfig(storage, c.RESTClient.Codec)
obj, err := latest.Codec.Decode(data)
if err != nil {

View File

@ -64,3 +64,10 @@ func InterpretDeleteError(err error, kind, name string) error {
return err
}
}
// InterpretResourceVersionError returns the appropriate api error
// for a failure to convert the resource version of an object sent
// to the API to an etcd uint64 index.
func InterpretResourceVersionError(err error, kind, value string) error {
return errors.NewInvalid(kind, "", errors.ErrorList{errors.NewFieldInvalid("resourceVersion", value)})
}

View File

@ -19,6 +19,7 @@ package latest
import (
"encoding/json"
"reflect"
"strconv"
"testing"
internal "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -41,7 +42,7 @@ var apiObjectFuzzer = fuzz.New().NilChance(.5).NumElements(1, 1).Funcs(
// TODO: Fix JSON/YAML packages and/or write custom encoding
// for uint64's. Somehow the LS *byte* of this is lost, but
// only when all 8 bytes are set.
j.ResourceVersion = c.RandUint64() >> 8
j.ResourceVersion = strconv.FormatUint(c.RandUint64()>>8, 10)
j.SelfLink = c.RandString()
var sec, nsec int64
@ -49,6 +50,19 @@ var apiObjectFuzzer = fuzz.New().NilChance(.5).NumElements(1, 1).Funcs(
c.Fuzz(&nsec)
j.CreationTimestamp = util.Unix(sec, nsec).Rfc3339Copy()
},
func(j *internal.ObjectReference, c fuzz.Continue) {
// We have to customize the randomization of TypeMetas because their
// APIVersion and Kind must remain blank in memory.
j.APIVersion = c.RandString()
j.Kind = c.RandString()
j.Namespace = c.RandString()
j.Name = c.RandString()
// TODO: Fix JSON/YAML packages and/or write custom encoding
// for uint64's. Somehow the LS *byte* of this is lost, but
// only when all 8 bytes are set.
j.ResourceVersion = strconv.FormatUint(c.RandUint64()>>8, 10)
j.FieldPath = c.RandString()
},
func(intstr *util.IntOrString, c fuzz.Continue) {
// util.IntOrString will panic if its kind is set wrong.
if c.RandBool() {
@ -120,12 +134,12 @@ func TestInternalRoundTrip(t *testing.T) {
}
func TestResourceVersioner(t *testing.T) {
pod := internal.Pod{TypeMeta: internal.TypeMeta{ResourceVersion: 10}}
pod := internal.Pod{TypeMeta: internal.TypeMeta{ResourceVersion: "10"}}
version, err := ResourceVersioner.ResourceVersion(&pod)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if version != 10 {
if version != "10" {
t.Errorf("unexpected version %d", version)
}
}

View File

@ -37,7 +37,7 @@ func TestGetReference(t *testing.T) {
obj: &Pod{
TypeMeta: TypeMeta{
ID: "foo",
ResourceVersion: 42,
ResourceVersion: "42",
SelfLink: "/api/v1beta1/pods/foo",
},
},
@ -46,14 +46,14 @@ func TestGetReference(t *testing.T) {
APIVersion: "v1beta1",
Name: "foo",
UID: "foo",
ResourceVersion: 42,
ResourceVersion: "42",
},
},
"serviceList": {
obj: &ServiceList{
TypeMeta: TypeMeta{
ID: "foo",
ResourceVersion: 42,
ResourceVersion: "42",
SelfLink: "/api/v1beta2/services",
},
},
@ -62,14 +62,14 @@ func TestGetReference(t *testing.T) {
APIVersion: "v1beta2",
Name: "foo",
UID: "foo",
ResourceVersion: 42,
ResourceVersion: "42",
},
},
"badSelfLink": {
obj: &ServiceList{
TypeMeta: TypeMeta{
ID: "foo",
ResourceVersion: 42,
ResourceVersion: "42",
SelfLink: "v1beta2/services",
},
},

View File

@ -21,6 +21,7 @@ import (
"flag"
"math/rand"
"reflect"
"strconv"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -49,7 +50,7 @@ var apiObjectFuzzer = fuzz.New().NilChance(.5).NumElements(1, 1).Funcs(
// TODO: Fix JSON/YAML packages and/or write custom encoding
// for uint64's. Somehow the LS *byte* of this is lost, but
// only when all 8 bytes are set.
j.ResourceVersion = c.RandUint64() >> 8
j.ResourceVersion = strconv.FormatUint(c.RandUint64()>>8, 10)
j.SelfLink = c.RandString()
var sec, nsec int64
@ -66,7 +67,7 @@ var apiObjectFuzzer = fuzz.New().NilChance(.5).NumElements(1, 1).Funcs(
// TODO: Fix JSON/YAML packages and/or write custom encoding
// for uint64's. Somehow the LS *byte* of this is lost, but
// only when all 8 bytes are set.
j.ResourceVersion = c.RandUint64() >> 8
j.ResourceVersion = strconv.FormatUint(c.RandUint64()>>8, 10)
j.SelfLink = c.RandString()
var sec, nsec int64
@ -74,6 +75,19 @@ var apiObjectFuzzer = fuzz.New().NilChance(.5).NumElements(1, 1).Funcs(
c.Fuzz(&nsec)
j.CreationTimestamp = util.Unix(sec, nsec).Rfc3339Copy()
},
func(j *api.ObjectReference, c fuzz.Continue) {
// We have to customize the randomization of TypeMetas because their
// APIVersion and Kind must remain blank in memory.
j.APIVersion = c.RandString()
j.Kind = c.RandString()
j.Namespace = c.RandString()
j.Name = c.RandString()
// TODO: Fix JSON/YAML packages and/or write custom encoding
// for uint64's. Somehow the LS *byte* of this is lost, but
// only when all 8 bytes are set.
j.ResourceVersion = strconv.FormatUint(c.RandUint64()>>8, 10)
j.FieldPath = c.RandString()
},
func(intstr *util.IntOrString, c fuzz.Continue) {
// util.IntOrString will panic if its kind is set wrong.
if c.RandBool() {

View File

@ -250,7 +250,7 @@ type TypeMeta struct {
ID string `json:"id,omitempty" yaml:"id,omitempty"`
CreationTimestamp util.Time `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"`
SelfLink string `json:"selfLink,omitempty" yaml:"selfLink,omitempty"`
ResourceVersion uint64 `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"`
ResourceVersion string `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"`
APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"`
Namespace string `json:"namespace,omitempty" yaml:"namespace,omitempty"`
UID string `json:"uid,omitempty" yaml:"uid,omitempty"`
@ -675,7 +675,7 @@ type ObjectReference struct {
Name string `json:"name,omitempty" yaml:"name,omitempty"`
UID string `json:"uid,omitempty" yaml:"uid,omitempty"`
APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"`
ResourceVersion uint64 `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"`
ResourceVersion string `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"`
// Optional. If referring to a piece of an object instead of an entire object, this string
// should contain a valid field access statement. For example,

View File

@ -17,12 +17,78 @@ limitations under the License.
package v1beta1
import (
"strconv"
newer "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
)
func init() {
newer.Scheme.AddConversionFuncs(
// TypeMeta has changed type of ResourceVersion internally
func(in *newer.TypeMeta, out *TypeMeta, s conversion.Scope) error {
out.APIVersion = in.APIVersion
out.Kind = in.Kind
out.Namespace = in.Namespace
out.ID = in.ID
out.CreationTimestamp = in.CreationTimestamp
out.SelfLink = in.SelfLink
out.Annotations = in.Annotations
if len(in.ResourceVersion) > 0 {
v, err := strconv.ParseUint(in.ResourceVersion, 10, 64)
if err != nil {
return err
}
out.ResourceVersion = v
}
return nil
},
func(in *TypeMeta, out *newer.TypeMeta, s conversion.Scope) error {
out.APIVersion = in.APIVersion
out.Kind = in.Kind
out.Namespace = in.Namespace
out.ID = in.ID
out.CreationTimestamp = in.CreationTimestamp
out.SelfLink = in.SelfLink
out.Annotations = in.Annotations
if in.ResourceVersion != 0 {
out.ResourceVersion = strconv.FormatUint(in.ResourceVersion, 10)
}
return nil
},
// ObjectReference has changed type of ResourceVersion internally
func(in *newer.ObjectReference, out *ObjectReference, s conversion.Scope) error {
out.APIVersion = in.APIVersion
out.Kind = in.Kind
out.Namespace = in.Namespace
out.Name = in.Name
out.FieldPath = in.FieldPath
if len(in.ResourceVersion) > 0 {
v, err := strconv.ParseUint(in.ResourceVersion, 10, 64)
if err != nil {
return err
}
out.ResourceVersion = v
}
return nil
},
func(in *ObjectReference, out *newer.ObjectReference, s conversion.Scope) error {
out.APIVersion = in.APIVersion
out.Kind = in.Kind
out.Namespace = in.Namespace
out.Name = in.Name
out.FieldPath = in.FieldPath
if in.ResourceVersion != 0 {
out.ResourceVersion = strconv.FormatUint(in.ResourceVersion, 10)
}
return nil
},
// EnvVar's Key is deprecated in favor of Name.
func(in *newer.EnvVar, out *EnvVar, s conversion.Scope) error {
out.Value = in.Value

View File

@ -17,12 +17,78 @@ limitations under the License.
package v1beta2
import (
"strconv"
newer "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
)
func init() {
newer.Scheme.AddConversionFuncs(
// TypeMeta has changed type of ResourceVersion internally
func(in *newer.TypeMeta, out *TypeMeta, s conversion.Scope) error {
out.APIVersion = in.APIVersion
out.Kind = in.Kind
out.Namespace = in.Namespace
out.ID = in.ID
out.CreationTimestamp = in.CreationTimestamp
out.SelfLink = in.SelfLink
out.Annotations = in.Annotations
if len(in.ResourceVersion) > 0 {
v, err := strconv.ParseUint(in.ResourceVersion, 10, 64)
if err != nil {
return err
}
out.ResourceVersion = v
}
return nil
},
func(in *TypeMeta, out *newer.TypeMeta, s conversion.Scope) error {
out.APIVersion = in.APIVersion
out.Kind = in.Kind
out.Namespace = in.Namespace
out.ID = in.ID
out.CreationTimestamp = in.CreationTimestamp
out.SelfLink = in.SelfLink
out.Annotations = in.Annotations
if in.ResourceVersion != 0 {
out.ResourceVersion = strconv.FormatUint(in.ResourceVersion, 10)
}
return nil
},
// ObjectReference has changed type of ResourceVersion internally
func(in *newer.ObjectReference, out *ObjectReference, s conversion.Scope) error {
out.APIVersion = in.APIVersion
out.Kind = in.Kind
out.Namespace = in.Namespace
out.Name = in.Name
out.FieldPath = in.FieldPath
if len(in.ResourceVersion) > 0 {
v, err := strconv.ParseUint(in.ResourceVersion, 10, 64)
if err != nil {
return err
}
out.ResourceVersion = v
}
return nil
},
func(in *ObjectReference, out *newer.ObjectReference, s conversion.Scope) error {
out.APIVersion = in.APIVersion
out.Kind = in.Kind
out.Namespace = in.Namespace
out.Name = in.Name
out.FieldPath = in.FieldPath
if in.ResourceVersion != 0 {
out.ResourceVersion = strconv.FormatUint(in.ResourceVersion, 10)
}
return nil
},
// EnvVar's Key is deprecated in favor of Name.
func(in *newer.EnvVar, out *EnvVar, s conversion.Scope) error {
out.Value = in.Value

View File

@ -78,7 +78,7 @@ type SimpleRESTStorage struct {
fakeWatch *watch.FakeWatcher
requestedLabelSelector labels.Selector
requestedFieldSelector labels.Selector
requestedResourceVersion uint64
requestedResourceVersion string
// The id requested, and location to return for ResourceLocation
requestedResourceLocationID string
@ -144,7 +144,7 @@ func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (<
}
// Implement ResourceWatcher.
func (storage *SimpleRESTStorage) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (storage *SimpleRESTStorage) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
storage.requestedLabelSelector = label
storage.requestedFieldSelector = field
storage.requestedResourceVersion = resourceVersion

View File

@ -54,7 +54,7 @@ type ResourceWatcher interface {
// are supported; an error should be returned if 'field' tries to select on a field that
// isn't supported. 'resourceVersion' allows for continuing/starting a watch at a
// particular version.
Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
}
// Redirector know how to return a remote resource's location.

View File

@ -20,7 +20,6 @@ import (
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"code.google.com/p/go.net/websocket"
@ -37,7 +36,7 @@ type WatchHandler struct {
codec runtime.Codec
}
func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion uint64) {
func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion string) {
if s, err := labels.ParseSelector(query.Get("labels")); err != nil {
label = labels.Everything()
} else {
@ -48,10 +47,8 @@ func getWatchParams(query url.Values) (label, field labels.Selector, resourceVer
} else {
field = s
}
if rv, err := strconv.ParseUint(query.Get("resourceVersion"), 10, 64); err == nil {
resourceVersion = rv
}
return label, field, resourceVersion
resourceVersion = query.Get("resourceVersion")
return
}
var connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)")

View File

@ -156,28 +156,28 @@ func TestWatchParamParsing(t *testing.T) {
table := []struct {
rawQuery string
resourceVersion uint64
resourceVersion string
labelSelector string
fieldSelector string
}{
{
rawQuery: "resourceVersion=1234",
resourceVersion: 1234,
resourceVersion: "1234",
labelSelector: "",
fieldSelector: "",
}, {
rawQuery: "resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo",
resourceVersion: 314159,
resourceVersion: "314159",
labelSelector: "name=foo",
fieldSelector: "Host=",
}, {
rawQuery: "fields=ID%3dfoo&resourceVersion=1492",
resourceVersion: 1492,
resourceVersion: "1492",
labelSelector: "",
fieldSelector: "ID=foo",
}, {
rawQuery: "",
resourceVersion: 0,
resourceVersion: "",
labelSelector: "",
fieldSelector: "",
},
@ -186,7 +186,7 @@ func TestWatchParamParsing(t *testing.T) {
for _, item := range table {
simpleStorage.requestedLabelSelector = nil
simpleStorage.requestedFieldSelector = nil
simpleStorage.requestedResourceVersion = 5 // Prove this is set in all cases
simpleStorage.requestedResourceVersion = "5" // Prove this is set in all cases
dest.RawQuery = item.rawQuery
resp, err := http.Get(dest.String())
if err != nil {

View File

@ -35,7 +35,7 @@ type ListerWatcher interface {
// ResourceVersion field will be used to start the watch in the right place.
List() (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(resourceVersion uint64) (watch.Interface, error)
Watch(resourceVersion string) (watch.Interface, error)
}
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
@ -71,7 +71,7 @@ func (r *Reflector) Run() {
}
func (r *Reflector) listAndWatch() {
var resourceVersion uint64
var resourceVersion string
list, err := r.listerWatcher.List()
if err != nil {
@ -124,7 +124,7 @@ func (r *Reflector) syncWith(items []runtime.Object) error {
}
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) error {
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string) error {
start := time.Now()
eventCount := 0
for {
@ -157,7 +157,7 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) err
default:
glog.Errorf("unable to understand watch event %#v", event)
}
*resourceVersion = jsonBase.ResourceVersion() + 1
*resourceVersion = jsonBase.ResourceVersion()
eventCount++
}

View File

@ -18,6 +18,7 @@ package cache
import (
"fmt"
"strconv"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -27,11 +28,11 @@ import (
type testLW struct {
ListFunc func() (runtime.Object, error)
WatchFunc func(resourceVersion uint64) (watch.Interface, error)
WatchFunc func(resourceVersion string) (watch.Interface, error)
}
func (t *testLW) List() (runtime.Object, error) { return t.ListFunc() }
func (t *testLW) Watch(resourceVersion uint64) (watch.Interface, error) {
func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) {
return t.WatchFunc(resourceVersion)
}
@ -42,7 +43,7 @@ func TestReflector_watchHandlerError(t *testing.T) {
go func() {
fw.Stop()
}()
var resumeRV uint64
var resumeRV string
err := g.watchHandler(fw, &resumeRV)
if err == nil {
t.Errorf("unexpected non-error")
@ -58,11 +59,11 @@ func TestReflector_watchHandler(t *testing.T) {
go func() {
fw.Add(&api.Service{TypeMeta: api.TypeMeta{ID: "rejected"}})
fw.Delete(&api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}})
fw.Modify(&api.Pod{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: 55}})
fw.Add(&api.Pod{TypeMeta: api.TypeMeta{ID: "baz", ResourceVersion: 32}})
fw.Modify(&api.Pod{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "55"}})
fw.Add(&api.Pod{TypeMeta: api.TypeMeta{ID: "baz", ResourceVersion: "32"}})
fw.Stop()
}()
var resumeRV uint64
var resumeRV string
err := g.watchHandler(fw, &resumeRV)
if err != nil {
t.Errorf("unexpected error %v", err)
@ -70,13 +71,13 @@ func TestReflector_watchHandler(t *testing.T) {
table := []struct {
ID string
RV uint64
RV string
exists bool
}{
{"foo", 0, false},
{"rejected", 0, false},
{"bar", 55, true},
{"baz", 32, true},
{"foo", "", false},
{"rejected", "", false},
{"bar", "55", true},
{"baz", "32", true},
}
for _, item := range table {
obj, exists := s.Get(item.ID)
@ -92,7 +93,7 @@ func TestReflector_watchHandler(t *testing.T) {
}
// RV should stay 1 higher than the last id we see.
if e, a := uint64(33), resumeRV; e != a {
if e, a := "33", resumeRV; e != a {
t.Errorf("expected %v, got %v", e, a)
}
}
@ -103,9 +104,9 @@ func TestReflector_listAndWatch(t *testing.T) {
// The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc
// to get called at the beginning of the watch with 1, and again with 4 when we
// inject an error at 3.
expectedRVs := []uint64{1, 4}
expectedRVs := []string{"1", "4"}
lw := &testLW{
WatchFunc: func(rv uint64) (watch.Interface, error) {
WatchFunc: func(rv string) (watch.Interface, error) {
fw := watch.NewFake()
if e, a := expectedRVs[0], rv; e != a {
t.Errorf("Expected rv %v, but got %v", e, a)
@ -117,7 +118,7 @@ func TestReflector_listAndWatch(t *testing.T) {
return fw, nil
},
ListFunc: func() (runtime.Object, error) {
return &api.PodList{TypeMeta: api.TypeMeta{ResourceVersion: 1}}, nil
return &api.PodList{TypeMeta: api.TypeMeta{ResourceVersion: "1"}}, nil
},
}
s := NewFIFO()
@ -130,9 +131,9 @@ func TestReflector_listAndWatch(t *testing.T) {
if fw == nil {
fw = <-createdFakes
}
sendingRV := uint64(i + 2)
sendingRV := strconv.FormatUint(uint64(i+2), 10)
fw.Add(&api.Pod{TypeMeta: api.TypeMeta{ID: id, ResourceVersion: sendingRV}})
if sendingRV == 3 {
if sendingRV == "3" {
// Inject a failure.
fw.Stop()
fw = nil
@ -145,7 +146,7 @@ func TestReflector_listAndWatch(t *testing.T) {
if e, a := id, pod.ID; e != a {
t.Errorf("%v: Expected %v, got %v", i, e, a)
}
if e, a := uint64(i+2), pod.ResourceVersion; e != a {
if e, a := strconv.FormatUint(uint64(i+2), 10), pod.ResourceVersion; e != a {
t.Errorf("%v: Expected %v, got %v", i, e, a)
}
}
@ -156,10 +157,10 @@ func TestReflector_listAndWatch(t *testing.T) {
}
func TestReflector_listAndWatchWithErrors(t *testing.T) {
mkPod := func(id string, rv uint64) *api.Pod {
mkPod := func(id string, rv string) *api.Pod {
return &api.Pod{TypeMeta: api.TypeMeta{ID: id, ResourceVersion: rv}}
}
mkList := func(rv uint64, pods ...*api.Pod) *api.PodList {
mkList := func(rv string, pods ...*api.Pod) *api.PodList {
list := &api.PodList{TypeMeta: api.TypeMeta{ResourceVersion: rv}}
for _, pod := range pods {
list.Items = append(list.Items, *pod)
@ -173,29 +174,29 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) {
watchErr error
}{
{
list: mkList(1),
list: mkList("1"),
events: []watch.Event{
{watch.Added, mkPod("foo", 2)},
{watch.Added, mkPod("bar", 3)},
{watch.Added, mkPod("foo", "2")},
{watch.Added, mkPod("bar", "3")},
},
}, {
list: mkList(3, mkPod("foo", 2), mkPod("bar", 3)),
list: mkList("3", mkPod("foo", "2"), mkPod("bar", "3")),
events: []watch.Event{
{watch.Deleted, mkPod("foo", 4)},
{watch.Added, mkPod("qux", 5)},
{watch.Deleted, mkPod("foo", "4")},
{watch.Added, mkPod("qux", "5")},
},
}, {
listErr: fmt.Errorf("a list error"),
}, {
list: mkList(5, mkPod("bar", 3), mkPod("qux", 5)),
list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")),
watchErr: fmt.Errorf("a watch error"),
}, {
list: mkList(5, mkPod("bar", 3), mkPod("qux", 5)),
list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")),
events: []watch.Event{
{watch.Added, mkPod("baz", 6)},
{watch.Added, mkPod("baz", "6")},
},
}, {
list: mkList(6, mkPod("bar", 3), mkPod("qux", 5), mkPod("baz", 6)),
list: mkList("6", mkPod("bar", "3"), mkPod("qux", "5"), mkPod("baz", "6")),
},
}
@ -204,7 +205,7 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) {
if item.list != nil {
// Test that the list is what currently exists in the store.
current := s.List()
checkMap := map[string]uint64{}
checkMap := map[string]string{}
for _, item := range current {
pod := item.(*api.Pod)
checkMap[pod.ID] = pod.ResourceVersion
@ -220,7 +221,7 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) {
}
watchRet, watchErr := item.events, item.watchErr
lw := &testLW{
WatchFunc: func(rv uint64) (watch.Interface, error) {
WatchFunc: func(rv string) (watch.Interface, error) {
if watchErr != nil {
return nil, watchErr
}

View File

@ -53,7 +53,7 @@ type ReplicationControllerInterface interface {
CreateReplicationController(ctx api.Context, ctrl *api.ReplicationController) (*api.ReplicationController, error)
UpdateReplicationController(ctx api.Context, ctrl *api.ReplicationController) (*api.ReplicationController, error)
DeleteReplicationController(ctx api.Context, id string) error
WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
}
// ServiceInterface has methods to work with Service resources.
@ -63,14 +63,14 @@ type ServiceInterface interface {
CreateService(ctx api.Context, srv *api.Service) (*api.Service, error)
UpdateService(ctx api.Context, srv *api.Service) (*api.Service, error)
DeleteService(ctx api.Context, id string) error
WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
}
// EndpointsInterface has methods to work with Endpoints resources
type EndpointsInterface interface {
ListEndpoints(ctx api.Context, selector labels.Selector) (*api.EndpointsList, error)
GetEndpoints(ctx api.Context, id string) (*api.Endpoints, error)
WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
}
// VersionInterface has a method to retrieve the server version.
@ -122,7 +122,7 @@ func (c *Client) CreatePod(ctx api.Context, pod *api.Pod) (result *api.Pod, err
// UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs.
func (c *Client) UpdatePod(ctx api.Context, pod *api.Pod) (result *api.Pod, err error) {
result = &api.Pod{}
if pod.ResourceVersion == 0 {
if len(pod.ResourceVersion) == 0 {
err = fmt.Errorf("invalid update object, missing resource version: %v", pod)
return
}
@ -154,7 +154,7 @@ func (c *Client) CreateReplicationController(ctx api.Context, controller *api.Re
// UpdateReplicationController updates an existing replication controller.
func (c *Client) UpdateReplicationController(ctx api.Context, controller *api.ReplicationController) (result *api.ReplicationController, err error) {
result = &api.ReplicationController{}
if controller.ResourceVersion == 0 {
if len(controller.ResourceVersion) == 0 {
err = fmt.Errorf("invalid update object, missing resource version: %v", controller)
return
}
@ -168,11 +168,11 @@ func (c *Client) DeleteReplicationController(ctx api.Context, id string) error {
}
// WatchReplicationControllers returns a watch.Interface that watches the requested controllers.
func (c *Client) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (c *Client) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("replicationControllers").
UintParam("resourceVersion", resourceVersion).
Param("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
@ -202,7 +202,7 @@ func (c *Client) CreateService(ctx api.Context, svc *api.Service) (result *api.S
// UpdateService updates an existing service.
func (c *Client) UpdateService(ctx api.Context, svc *api.Service) (result *api.Service, err error) {
result = &api.Service{}
if svc.ResourceVersion == 0 {
if len(svc.ResourceVersion) == 0 {
err = fmt.Errorf("invalid update object, missing resource version: %v", svc)
return
}
@ -216,11 +216,11 @@ func (c *Client) DeleteService(ctx api.Context, id string) error {
}
// WatchServices returns a watch.Interface that watches the requested services.
func (c *Client) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (c *Client) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("services").
UintParam("resourceVersion", resourceVersion).
Param("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
@ -241,11 +241,11 @@ func (c *Client) GetEndpoints(ctx api.Context, id string) (result *api.Endpoints
}
// WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service.
func (c *Client) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (c *Client) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("endpoints").
UintParam("resourceVersion", resourceVersion).
Param("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
@ -259,7 +259,7 @@ func (c *Client) CreateEndpoints(ctx api.Context, endpoints *api.Endpoints) (*ap
func (c *Client) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) (*api.Endpoints, error) {
result := &api.Endpoints{}
if endpoints.ResourceVersion == 0 {
if len(endpoints.ResourceVersion) == 0 {
return nil, fmt.Errorf("invalid update object, missing resource version: %v", endpoints)
}
err := c.Put().

View File

@ -265,7 +265,7 @@ func TestCreatePod(t *testing.T) {
func TestUpdatePod(t *testing.T) {
requestPod := &api.Pod{
TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: 1},
TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"},
CurrentState: api.PodState{
Status: "Foobar",
},
@ -330,7 +330,7 @@ func TestGetController(t *testing.T) {
func TestUpdateController(t *testing.T) {
requestController := &api.ReplicationController{
TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: 1},
TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"},
}
c := &testClient{
Request: testRequest{Method: "PUT", Path: "/replicationControllers/foo"},
@ -464,7 +464,7 @@ func TestCreateService(t *testing.T) {
}
func TestUpdateService(t *testing.T) {
svc := &api.Service{TypeMeta: api.TypeMeta{ID: "service-1", ResourceVersion: 1}}
svc := &api.Service{TypeMeta: api.TypeMeta{ID: "service-1", ResourceVersion: "1"}}
c := &testClient{
Request: testRequest{Method: "PUT", Path: "/services/service-1", Body: svc},
Response: Response{StatusCode: 200, Body: svc},

View File

@ -92,7 +92,7 @@ func (c *Fake) DeleteReplicationController(ctx api.Context, controller string) e
return nil
}
func (c *Fake) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (c *Fake) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-controllers", Value: resourceVersion})
return c.Watch, nil
}
@ -122,7 +122,7 @@ func (c *Fake) DeleteService(ctx api.Context, service string) error {
return nil
}
func (c *Fake) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (c *Fake) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-services", Value: resourceVersion})
return c.Watch, c.Err
}
@ -137,7 +137,7 @@ func (c *Fake) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error
return &api.Endpoints{}, nil
}
func (c *Fake) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (c *Fake) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion})
return c.Watch, c.Err
}

View File

@ -114,6 +114,14 @@ func (r *Request) UintParam(paramName string, u uint64) *Request {
return r.setParam(paramName, strconv.FormatUint(u, 10))
}
// Param creates a query parameter with the given string value.
func (r *Request) Param(paramName, s string) *Request {
if r.err != nil {
return r
}
return r.setParam(paramName, s)
}
func (r *Request) setParam(paramName, value string) *Request {
if specialParams.Has(paramName) {
r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName)

View File

@ -87,12 +87,12 @@ func NewReplicationManager(kubeClient client.Interface) *ReplicationManager {
// Run begins watching and syncing.
func (rm *ReplicationManager) Run(period time.Duration) {
rm.syncTime = time.Tick(period)
resourceVersion := uint64(0)
resourceVersion := ""
go util.Forever(func() { rm.watchControllers(&resourceVersion) }, period)
}
// resourceVersion is a pointer to the resource version to use/update.
func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) {
func (rm *ReplicationManager) watchControllers(resourceVersion *string) {
ctx := api.NewContext()
watching, err := rm.kubeClient.WatchReplicationControllers(
ctx,
@ -124,7 +124,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) {
continue
}
// If we get disconnected, start where we left off.
*resourceVersion = rc.ResourceVersion + 1
*resourceVersion = rc.ResourceVersion
// Sync even if this is a deletion event, to ensure that we leave
// it in the desired state.
glog.V(4).Infof("About to sync from watch: %v", rc.ID)

View File

@ -324,7 +324,7 @@ type FakeWatcher struct {
*client.Fake
}
func (fw FakeWatcher) WatchReplicationControllers(ctx api.Context, l, f labels.Selector, rv uint64) (watch.Interface, error) {
func (fw FakeWatcher) WatchReplicationControllers(ctx api.Context, l, f labels.Selector, rv string) (watch.Interface, error) {
return fw.w, nil
}
@ -341,7 +341,7 @@ func TestWatchControllers(t *testing.T) {
return nil
}
resourceVersion := uint64(0)
resourceVersion := ""
go manager.watchControllers(&resourceVersion)
// Test normal case

View File

@ -47,7 +47,7 @@ func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface
helper := tools.EtcdHelper{
client,
latest.Codec,
latest.ResourceVersioner,
tools.RuntimeVersionAdapter{latest.ResourceVersioner},
}
source := &SourceEtcd{
key: key,

View File

@ -78,7 +78,7 @@ func NewEtcdHelper(etcdServers []string, version string) (helper tools.EtcdHelpe
if err != nil {
return helper, err
}
return tools.EtcdHelper{client, versionInterfaces.Codec, versionInterfaces.ResourceVersioner}, nil
return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.ResourceVersioner}}, nil
}
// New returns a new instance of Master connected to the given etcd server.

View File

@ -31,8 +31,8 @@ import (
type Watcher interface {
ListServices(ctx api.Context, label labels.Selector) (*api.ServiceList, error)
ListEndpoints(ctx api.Context, label labels.Selector) (*api.EndpointsList, error)
WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
}
// SourceAPI implements a configuration source for services and endpoints that
@ -57,12 +57,12 @@ func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceU
// prevent hot loops if the server starts to misbehave
reconnectDuration: time.Second * 1,
}
serviceVersion := uint64(0)
serviceVersion := ""
go util.Forever(func() {
config.runServices(&serviceVersion)
time.Sleep(wait.Jitter(config.reconnectDuration, 0.0))
}, period)
endpointVersion := uint64(0)
endpointVersion := ""
go util.Forever(func() {
config.runEndpoints(&endpointVersion)
time.Sleep(wait.Jitter(config.reconnectDuration, 0.0))
@ -71,9 +71,9 @@ func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceU
}
// runServices loops forever looking for changes to services.
func (s *SourceAPI) runServices(resourceVersion *uint64) {
func (s *SourceAPI) runServices(resourceVersion *string) {
ctx := api.NewContext()
if *resourceVersion == 0 {
if len(*resourceVersion) == 0 {
services, err := s.client.ListServices(ctx, labels.Everything())
if err != nil {
glog.Errorf("Unable to load services: %v", err)
@ -97,7 +97,7 @@ func (s *SourceAPI) runServices(resourceVersion *uint64) {
}
// handleServicesWatch loops over an event channel and delivers config changes to an update channel.
func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates chan<- ServiceUpdate) {
func handleServicesWatch(resourceVersion *string, ch <-chan watch.Event, updates chan<- ServiceUpdate) {
for {
select {
case event, ok := <-ch:
@ -107,7 +107,7 @@ func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates
}
service := event.Object.(*api.Service)
*resourceVersion = service.ResourceVersion + 1
*resourceVersion = service.ResourceVersion
switch event.Type {
case watch.Added, watch.Modified:
@ -121,9 +121,9 @@ func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates
}
// runEndpoints loops forever looking for changes to endpoints.
func (s *SourceAPI) runEndpoints(resourceVersion *uint64) {
func (s *SourceAPI) runEndpoints(resourceVersion *string) {
ctx := api.NewContext()
if *resourceVersion == 0 {
if len(*resourceVersion) == 0 {
endpoints, err := s.client.ListEndpoints(ctx, labels.Everything())
if err != nil {
glog.Errorf("Unable to load endpoints: %v", err)
@ -147,7 +147,7 @@ func (s *SourceAPI) runEndpoints(resourceVersion *uint64) {
}
// handleEndpointsWatch loops over an event channel and delivers config changes to an update channel.
func handleEndpointsWatch(resourceVersion *uint64, ch <-chan watch.Event, updates chan<- EndpointsUpdate) {
func handleEndpointsWatch(resourceVersion *string, ch <-chan watch.Event, updates chan<- EndpointsUpdate) {
for {
select {
case event, ok := <-ch:
@ -157,7 +157,7 @@ func handleEndpointsWatch(resourceVersion *uint64, ch <-chan watch.Event, update
}
endpoints := event.Object.(*api.Endpoints)
*resourceVersion = endpoints.ResourceVersion + 1
*resourceVersion = endpoints.ResourceVersion
switch event.Type {
case watch.Added, watch.Modified:

View File

@ -27,13 +27,13 @@ import (
)
func TestServices(t *testing.T) {
service := api.Service{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: uint64(2)}}
service := api.Service{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}}
fakeWatch := watch.NewFake()
fakeClient := &client.Fake{Watch: fakeWatch}
services := make(chan ServiceUpdate)
source := SourceAPI{client: fakeClient, services: services}
resourceVersion := uint64(1)
resourceVersion := "1"
go func() {
// called twice
source.runServices(&resourceVersion)
@ -42,7 +42,7 @@ func TestServices(t *testing.T) {
// test adding a service to the watch
fakeWatch.Add(&service)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(1)}}) {
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", "1"}}) {
t.Errorf("expected call to watch-services, got %#v", fakeClient)
}
@ -66,26 +66,26 @@ func TestServices(t *testing.T) {
fakeWatch.Stop()
newFakeWatch.Add(&service)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(1)}, {"watch-services", uint64(3)}}) {
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", "1"}, {"watch-services", "3"}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
}
}
func TestServicesFromZero(t *testing.T) {
service := api.Service{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: uint64(2)}}
service := api.Service{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}}
fakeWatch := watch.NewFake()
fakeWatch.Stop()
fakeClient := &client.Fake{Watch: fakeWatch}
fakeClient.ServiceList = api.ServiceList{
TypeMeta: api.TypeMeta{ResourceVersion: 2},
TypeMeta: api.TypeMeta{ResourceVersion: "2"},
Items: []api.Service{
service,
},
}
services := make(chan ServiceUpdate)
source := SourceAPI{client: fakeClient, services: services}
resourceVersion := uint64(0)
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
@ -101,10 +101,10 @@ func TestServicesFromZero(t *testing.T) {
// should have listed, then watched
<-ch
if resourceVersion != 2 {
if resourceVersion != "2" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-services", nil}, {"watch-services", uint64(2)}}) {
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-services", nil}, {"watch-services", "2"}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
@ -113,7 +113,7 @@ func TestServicesError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
services := make(chan ServiceUpdate)
source := SourceAPI{client: fakeClient, services: services}
resourceVersion := uint64(1)
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
@ -122,10 +122,10 @@ func TestServicesError(t *testing.T) {
// should have listed only
<-ch
if resourceVersion != 1 {
if resourceVersion != "1" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(1)}}) {
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", "1"}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
@ -134,7 +134,7 @@ func TestServicesFromZeroError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
services := make(chan ServiceUpdate)
source := SourceAPI{client: fakeClient, services: services}
resourceVersion := uint64(0)
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
@ -143,7 +143,7 @@ func TestServicesFromZeroError(t *testing.T) {
// should have listed only
<-ch
if resourceVersion != 0 {
if resourceVersion != "" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-services", nil}}) {
@ -152,13 +152,13 @@ func TestServicesFromZeroError(t *testing.T) {
}
func TestEndpoints(t *testing.T) {
endpoint := api.Endpoints{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}}
endpoint := api.Endpoints{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}, Endpoints: []string{"127.0.0.1:9000"}}
fakeWatch := watch.NewFake()
fakeClient := &client.Fake{Watch: fakeWatch}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{client: fakeClient, endpoints: endpoints}
resourceVersion := uint64(1)
resourceVersion := "1"
go func() {
// called twice
source.runEndpoints(&resourceVersion)
@ -167,7 +167,7 @@ func TestEndpoints(t *testing.T) {
// test adding an endpoint to the watch
fakeWatch.Add(&endpoint)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(1)}}) {
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", "1"}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
}
@ -191,26 +191,26 @@ func TestEndpoints(t *testing.T) {
fakeWatch.Stop()
newFakeWatch.Add(&endpoint)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(1)}, {"watch-endpoints", uint64(3)}}) {
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", "1"}, {"watch-endpoints", "3"}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
}
}
func TestEndpointsFromZero(t *testing.T) {
endpoint := api.Endpoints{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}}
endpoint := api.Endpoints{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}, Endpoints: []string{"127.0.0.1:9000"}}
fakeWatch := watch.NewFake()
fakeWatch.Stop()
fakeClient := &client.Fake{Watch: fakeWatch}
fakeClient.EndpointsList = api.EndpointsList{
TypeMeta: api.TypeMeta{ResourceVersion: 2},
TypeMeta: api.TypeMeta{ResourceVersion: "2"},
Items: []api.Endpoints{
endpoint,
},
}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{client: fakeClient, endpoints: endpoints}
resourceVersion := uint64(0)
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
@ -226,10 +226,10 @@ func TestEndpointsFromZero(t *testing.T) {
// should have listed, then watched
<-ch
if resourceVersion != 2 {
if resourceVersion != "2" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-endpoints", nil}, {"watch-endpoints", uint64(2)}}) {
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-endpoints", nil}, {"watch-endpoints", "2"}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
@ -238,7 +238,7 @@ func TestEndpointsError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{client: fakeClient, endpoints: endpoints}
resourceVersion := uint64(1)
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
@ -247,10 +247,10 @@ func TestEndpointsError(t *testing.T) {
// should have listed only
<-ch
if resourceVersion != 1 {
if resourceVersion != "1" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(1)}}) {
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", "1"}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
@ -259,7 +259,7 @@ func TestEndpointsFromZeroError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{client: fakeClient, endpoints: endpoints}
resourceVersion := uint64(0)
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
@ -268,7 +268,7 @@ func TestEndpointsFromZeroError(t *testing.T) {
// should have listed only
<-ch
if resourceVersion != 0 {
if resourceVersion != "" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-endpoints", nil}}) {

View File

@ -24,7 +24,7 @@ import (
// Registry is an interface for things that know how to store ReplicationControllers.
type Registry interface {
ListControllers(ctx api.Context) (*api.ReplicationControllerList, error)
WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error)
WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error)
GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error)
CreateController(ctx api.Context, controller *api.ReplicationController) error
UpdateController(ctx api.Context, controller *api.ReplicationController) error

View File

@ -149,7 +149,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
// Watch returns ReplicationController events via a watch.Interface.
// It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
if !field.Empty() {
return nil, fmt.Errorf("no field selector implemented for controllers")
}

View File

@ -50,7 +50,7 @@ func TestListControllersError(t *testing.T) {
}
func TestListEmptyControllerList(t *testing.T) {
mockRegistry := registrytest.ControllerRegistry{nil, &api.ReplicationControllerList{TypeMeta: api.TypeMeta{ResourceVersion: 1}}}
mockRegistry := registrytest.ControllerRegistry{nil, &api.ReplicationControllerList{TypeMeta: api.TypeMeta{ResourceVersion: "1"}}}
storage := REST{
registry: &mockRegistry,
}
@ -63,7 +63,7 @@ func TestListEmptyControllerList(t *testing.T) {
if len(controllers.(*api.ReplicationControllerList).Items) != 0 {
t.Errorf("Unexpected non-zero ctrl list: %#v", controllers)
}
if controllers.(*api.ReplicationControllerList).ResourceVersion != 1 {
if controllers.(*api.ReplicationControllerList).ResourceVersion != "1" {
t.Errorf("Unexpected resource version: %#v", controllers)
}
}

View File

@ -26,6 +26,6 @@ import (
type Registry interface {
ListEndpoints(ctx api.Context) (*api.EndpointsList, error)
GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error)
WatchEndpoints(ctx api.Context, labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchEndpoints(ctx api.Context, labels, fields labels.Selector, resourceVersion string) (watch.Interface, error)
UpdateEndpoints(ctx api.Context, e *api.Endpoints) error
}

View File

@ -55,7 +55,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
// Watch returns Endpoint events via a watch.Interface.
// It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.WatchEndpoints(ctx, label, field, resourceVersion)
}

View File

@ -74,7 +74,7 @@ func TestEndpointsRegistryList(t *testing.T) {
registry := registrytest.NewServiceRegistry()
storage := NewREST(registry)
registry.EndpointsList = api.EndpointsList{
TypeMeta: api.TypeMeta{ResourceVersion: 1},
TypeMeta: api.TypeMeta{ResourceVersion: "1"},
Items: []api.Endpoints{
{TypeMeta: api.TypeMeta{ID: "foo"}},
{TypeMeta: api.TypeMeta{ID: "bar"}},
@ -92,7 +92,7 @@ func TestEndpointsRegistryList(t *testing.T) {
if e, a := "bar", sl.Items[1].ID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if sl.ResourceVersion != 1 {
if sl.ResourceVersion != "1" {
t.Errorf("Unexpected resource version: %#v", sl)
}
}

View File

@ -18,6 +18,7 @@ package etcd
import (
"fmt"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
@ -53,6 +54,21 @@ func makePodKey(podID string) string {
return "/registry/pods/" + podID
}
// parseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func parseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, etcderr.InterpretResourceVersionError(err, kind, resourceVersion)
}
return version + 1, nil
}
// ListPods obtains a list of pods with labels that match selector.
func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) {
return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool {
@ -63,7 +79,7 @@ func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.Pod
// ListPodsPredicate obtains a list of pods that match filter.
func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) {
allPods := api.PodList{}
err := r.ExtractList("/registry/pods", &allPods.Items, &allPods.ResourceVersion)
err := r.ExtractToList("/registry/pods", &allPods)
if err != nil {
return nil, err
}
@ -82,8 +98,12 @@ func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool
}
// WatchPods begins watching for new, changed, or deleted pods.
func (r *Registry) WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
return r.WatchList("/registry/pods", resourceVersion, func(obj runtime.Object) bool {
func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) {
version, err := parseWatchResourceVersion(resourceVersion, "pod")
if err != nil {
return nil, err
}
return r.WatchList("/registry/pods", version, func(obj runtime.Object) bool {
switch t := obj.(type) {
case *api.Pod:
return filter(t)
@ -227,13 +247,17 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error {
// ListControllers obtains a list of ReplicationControllers.
func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) {
controllers := &api.ReplicationControllerList{}
err := r.ExtractList("/registry/controllers", &controllers.Items, &controllers.ResourceVersion)
err := r.ExtractToList("/registry/controllers", controllers)
return controllers, err
}
// WatchControllers begins watching for new, changed, or deleted controllers.
func (r *Registry) WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error) {
return r.WatchList("/registry/controllers", resourceVersion, tools.Everything)
func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) {
version, err := parseWatchResourceVersion(resourceVersion, "replicationControllers")
if err != nil {
return nil, err
}
return r.WatchList("/registry/controllers", version, tools.Everything)
}
func makeControllerKey(id string) string {
@ -277,7 +301,7 @@ func makeServiceKey(name string) string {
// ListServices obtains a list of Services.
func (r *Registry) ListServices(ctx api.Context) (*api.ServiceList, error) {
list := &api.ServiceList{}
err := r.ExtractList("/registry/services/specs", &list.Items, &list.ResourceVersion)
err := r.ExtractToList("/registry/services/specs", list)
return list, err
}
@ -337,15 +361,19 @@ func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error {
}
// WatchServices begins watching for new, changed, or deleted service configurations.
func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
version, err := parseWatchResourceVersion(resourceVersion, "service")
if err != nil {
return nil, err
}
if !label.Empty() {
return nil, fmt.Errorf("label selectors are not supported on services")
}
if value, found := field.RequiresExactMatch("ID"); found {
return r.Watch(makeServiceKey(value), resourceVersion), nil
return r.Watch(makeServiceKey(value), version), nil
}
if field.Empty() {
return r.WatchList("/registry/services/specs", resourceVersion, tools.Everything)
return r.WatchList("/registry/services/specs", version, tools.Everything)
}
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
}
@ -353,7 +381,7 @@ func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector,
// ListEndpoints obtains a list of Services.
func (r *Registry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) {
list := &api.EndpointsList{}
err := r.ExtractList("/registry/services/endpoints", &list.Items, &list.ResourceVersion)
err := r.ExtractToList("/registry/services/endpoints", list)
return list, err
}
@ -369,15 +397,19 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error {
}
// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations.
func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
version, err := parseWatchResourceVersion(resourceVersion, "endpoints")
if err != nil {
return nil, err
}
if !label.Empty() {
return nil, fmt.Errorf("label selectors are not supported on endpoints")
}
if value, found := field.RequiresExactMatch("ID"); found {
return r.Watch(makeServiceEndpointsKey(value), resourceVersion), nil
return r.Watch(makeServiceEndpointsKey(value), version), nil
}
if field.Empty() {
return r.WatchList("/registry/services/endpoints", resourceVersion, tools.Everything)
return r.WatchList("/registry/services/endpoints", version, tools.Everything)
}
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
}
@ -388,7 +420,7 @@ func makeMinionKey(minionID string) string {
func (r *Registry) ListMinions(ctx api.Context) (*api.MinionList, error) {
minions := &api.MinionList{}
err := r.ExtractList("/registry/minions", &minions.Items, &minions.ResourceVersion)
err := r.ExtractToList("/registry/minions", minions)
return minions, err
}

View File

@ -18,6 +18,7 @@ package etcd
import (
"reflect"
"strconv"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -33,13 +34,48 @@ import (
)
func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, latest.ResourceVersioner},
registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}},
&pod.BasicManifestFactory{
ServiceRegistry: &registrytest.ServiceRegistry{},
})
return registry
}
func TestEtcdParseWatchResourceVersion(t *testing.T) {
testCases := []struct {
Version string
Kind string
ExpectVersion uint64
Err bool
}{
{Version: "", ExpectVersion: 0},
{Version: "a", Err: true},
{Version: " ", Err: true},
{Version: "1", ExpectVersion: 2},
{Version: "10", ExpectVersion: 11},
}
for _, testCase := range testCases {
version, err := parseWatchResourceVersion(testCase.Version, testCase.Kind)
switch {
case testCase.Err:
if err == nil {
t.Errorf("%s: unexpected non-error", testCase.Version)
continue
}
if !errors.IsInvalid(err) {
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
case !testCase.Err && err != nil:
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
if version != testCase.ExpectVersion {
t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version)
}
}
}
func TestEtcdGetPod(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
@ -661,7 +697,7 @@ func TestEtcdUpdateController(t *testing.T) {
resp, _ := fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.UpdateController(ctx, &api.ReplicationController{
TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex},
TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: strconv.FormatUint(resp.Node.ModifiedIndex, 10)},
DesiredState: api.ReplicationControllerState{
Replicas: 2,
},
@ -807,7 +843,7 @@ func TestEtcdUpdateService(t *testing.T) {
resp, _ := fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient)
testService := api.Service{
TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex},
TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: strconv.FormatUint(resp.Node.ModifiedIndex, 10)},
Labels: map[string]string{
"baz": "bar",
},
@ -826,8 +862,8 @@ func TestEtcdUpdateService(t *testing.T) {
}
// Clear modified indices before the equality test.
svc.ResourceVersion = 0
testService.ResourceVersion = 0
svc.ResourceVersion = ""
testService.ResourceVersion = ""
if !reflect.DeepEqual(*svc, testService) {
t.Errorf("Unexpected service: got\n %#v\n, wanted\n %#v", svc, testService)
}
@ -919,7 +955,7 @@ func TestEtcdWatchServices(t *testing.T) {
watching, err := registry.WatchServices(ctx,
labels.Everything(),
labels.SelectorFromSet(labels.Set{"ID": "foo"}),
1,
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -948,7 +984,7 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) {
ctx,
labels.Everything(),
labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}),
0,
"",
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
@ -958,7 +994,7 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) {
ctx,
labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}),
labels.Everything(),
0,
"",
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
@ -973,7 +1009,7 @@ func TestEtcdWatchEndpoints(t *testing.T) {
ctx,
labels.Everything(),
labels.SelectorFromSet(labels.Set{"ID": "foo"}),
1,
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -1002,7 +1038,7 @@ func TestEtcdWatchEndpointsBadSelector(t *testing.T) {
ctx,
labels.Everything(),
labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}),
0,
"",
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
@ -1012,7 +1048,7 @@ func TestEtcdWatchEndpointsBadSelector(t *testing.T) {
ctx,
labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}),
labels.Everything(),
0,
"",
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)

View File

@ -29,7 +29,7 @@ type Registry interface {
// ListPodsPredicate obtains a list of pods for which filter returns true.
ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error)
// Watch for new/changed/deleted pods
WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error)
WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error)
// Get a specific pod
GetPod(ctx api.Context, podID string) (*api.Pod, error)
// Create a pod based on a specification.

View File

@ -176,7 +176,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
}
// Watch begins watching for new, changed, or deleted pods.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.WatchPods(ctx, resourceVersion, rs.filterFunc(label, field))
}

View File

@ -143,7 +143,7 @@ func TestListPodsError(t *testing.T) {
}
func TestListEmptyPodList(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(&api.PodList{TypeMeta: api.TypeMeta{ResourceVersion: 1}})
podRegistry := registrytest.NewPodRegistry(&api.PodList{TypeMeta: api.TypeMeta{ResourceVersion: "1"}})
storage := REST{
registry: podRegistry,
}
@ -156,7 +156,7 @@ func TestListEmptyPodList(t *testing.T) {
if len(pods.(*api.PodList).Items) != 0 {
t.Errorf("Unexpected non-zero pod list: %#v", pods)
}
if pods.(*api.PodList).ResourceVersion != 1 {
if pods.(*api.PodList).ResourceVersion != "1" {
t.Errorf("Unexpected resource version: %#v", pods)
}
}

View File

@ -47,6 +47,6 @@ func (r *ControllerRegistry) DeleteController(ctx api.Context, ID string) error
return r.Err
}
func (r *ControllerRegistry) WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error) {
func (r *ControllerRegistry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) {
return nil, r.Err
}

View File

@ -63,7 +63,7 @@ func (r *PodRegistry) ListPods(ctx api.Context, selector labels.Selector) (*api.
})
}
func (r *PodRegistry) WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
func (r *PodRegistry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) {
// TODO: wire filter down into the mux; it needs access to current and previous state :(
return r.mux.Watch(), nil
}

View File

@ -63,7 +63,7 @@ func (r *ServiceRegistry) UpdateService(ctx api.Context, svc *api.Service) error
return r.Err
}
func (r *ServiceRegistry) WatchServices(ctx api.Context, label labels.Selector, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (r *ServiceRegistry) WatchServices(ctx api.Context, label labels.Selector, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return nil, r.Err
}
@ -81,6 +81,6 @@ func (r *ServiceRegistry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) err
return r.Err
}
func (r *ServiceRegistry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (r *ServiceRegistry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return nil, r.Err
}

View File

@ -30,7 +30,7 @@ type Registry interface {
GetService(ctx api.Context, name string) (*api.Service, error)
DeleteService(ctx api.Context, name string) error
UpdateService(ctx api.Context, svc *api.Service) error
WatchServices(ctx api.Context, labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchServices(ctx api.Context, labels, fields labels.Selector, resourceVersion string) (watch.Interface, error)
// TODO: endpoints and their implementation should be separated, setting endpoints should be
// supported via the API, and the endpoints-controller should use the API to update endpoints.

View File

@ -142,7 +142,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
// Watch returns Services events via a watch.Interface.
// It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.WatchServices(ctx, label, field, resourceVersion)
}

View File

@ -371,7 +371,7 @@ func TestServiceRegistryList(t *testing.T) {
TypeMeta: api.TypeMeta{ID: "foo2"},
Selector: map[string]string{"bar2": "baz2"},
})
registry.List.ResourceVersion = 1
registry.List.ResourceVersion = "1"
s, _ := storage.List(ctx, labels.Everything(), labels.Everything())
sl := s.(*api.ServiceList)
if len(fakeCloud.Calls) != 0 {
@ -386,7 +386,7 @@ func TestServiceRegistryList(t *testing.T) {
if e, a := "foo2", sl.Items[1].ID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if sl.ResourceVersion != 1 {
if sl.ResourceVersion != "1" {
t.Errorf("Unexpected resource version: %#v", sl)
}
}

View File

@ -36,8 +36,8 @@ type Codec interface {
// ResourceVersioner provides methods for setting and retrieving
// the resource version from an API object.
type ResourceVersioner interface {
SetResourceVersion(obj Object, version uint64) error
ResourceVersion(obj Object) (uint64, error)
SetResourceVersion(obj Object, version string) error
ResourceVersion(obj Object) (string, error)
}
// SelfLinker provides methods for setting and retrieving the SelfLink field of an API object.

View File

@ -30,15 +30,15 @@ func NewTypeMetaResourceVersioner() ResourceVersioner {
// jsonBaseModifier implements ResourceVersioner and SelfLinker.
type jsonBaseModifier struct{}
func (v jsonBaseModifier) ResourceVersion(obj Object) (uint64, error) {
func (v jsonBaseModifier) ResourceVersion(obj Object) (string, error) {
json, err := FindTypeMeta(obj)
if err != nil {
return 0, err
return "", err
}
return json.ResourceVersion(), nil
}
func (v jsonBaseModifier) SetResourceVersion(obj Object, version uint64) error {
func (v jsonBaseModifier) SetResourceVersion(obj Object, version string) error {
json, err := FindTypeMeta(obj)
if err != nil {
return err
@ -86,8 +86,8 @@ type TypeMetaInterface interface {
SetAPIVersion(version string)
Kind() string
SetKind(kind string)
ResourceVersion() uint64
SetResourceVersion(version uint64)
ResourceVersion() string
SetResourceVersion(version string)
SelfLink() string
SetSelfLink(selfLink string)
}
@ -96,7 +96,7 @@ type genericTypeMeta struct {
id *string
apiVersion *string
kind *string
resourceVersion *uint64
resourceVersion *string
selfLink *string
}
@ -124,11 +124,11 @@ func (g genericTypeMeta) SetKind(kind string) {
*g.kind = kind
}
func (g genericTypeMeta) ResourceVersion() uint64 {
func (g genericTypeMeta) ResourceVersion() string {
return *g.resourceVersion
}
func (g genericTypeMeta) SetResourceVersion(version uint64) {
func (g genericTypeMeta) SetResourceVersion(version string) {
*g.resourceVersion = version
}

View File

@ -29,14 +29,14 @@ func TestGenericTypeMeta(t *testing.T) {
ID string `json:"id,omitempty" yaml:"id,omitempty"`
CreationTimestamp util.Time `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"`
SelfLink string `json:"selfLink,omitempty" yaml:"selfLink,omitempty"`
ResourceVersion uint64 `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"`
ResourceVersion string `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"`
APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"`
}
j := TypeMeta{
ID: "foo",
APIVersion: "a",
Kind: "b",
ResourceVersion: 1,
ResourceVersion: "1",
SelfLink: "some/place/only/we/know",
}
g, err := newGenericTypeMeta(reflect.ValueOf(&j).Elem())
@ -54,7 +54,7 @@ func TestGenericTypeMeta(t *testing.T) {
if e, a := "b", jbi.Kind(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := uint64(1), jbi.ResourceVersion(); e != a {
if e, a := "1", jbi.ResourceVersion(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := "some/place/only/we/know", jbi.SelfLink(); e != a {
@ -64,7 +64,7 @@ func TestGenericTypeMeta(t *testing.T) {
jbi.SetID("bar")
jbi.SetAPIVersion("c")
jbi.SetKind("d")
jbi.SetResourceVersion(2)
jbi.SetResourceVersion("2")
jbi.SetSelfLink("google.com")
// Prove that jbi changes the original object.
@ -77,7 +77,7 @@ func TestGenericTypeMeta(t *testing.T) {
if e, a := "d", j.Kind; e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := uint64(2), j.ResourceVersion; e != a {
if e, a := "2", j.ResourceVersion; e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := "google.com", j.SelfLink; e != a {
@ -99,12 +99,12 @@ func (*MyIncorrectlyMarkedAsAPIObject) IsAnAPIObject() {}
func TestResourceVersionerOfAPI(t *testing.T) {
type T struct {
Object
Expected uint64
Expected string
}
testCases := map[string]T{
"empty api object": {&MyAPIObject{}, 0},
"api object with version": {&MyAPIObject{TypeMeta: TypeMeta{ResourceVersion: 1}}, 1},
"pointer to api object with version": {&MyAPIObject{TypeMeta: TypeMeta{ResourceVersion: 1}}, 1},
"empty api object": {&MyAPIObject{}, ""},
"api object with version": {&MyAPIObject{TypeMeta: TypeMeta{ResourceVersion: "1"}}, "1"},
"pointer to api object with version": {&MyAPIObject{TypeMeta: TypeMeta{ResourceVersion: "1"}}, "1"},
}
versioning := NewTypeMetaResourceVersioner()
for key, testCase := range testCases {
@ -119,9 +119,9 @@ func TestResourceVersionerOfAPI(t *testing.T) {
failingCases := map[string]struct {
Object
Expected uint64
Expected string
}{
"not a valid object to try": {&MyIncorrectlyMarkedAsAPIObject{}, 1},
"not a valid object to try": {&MyIncorrectlyMarkedAsAPIObject{}, "1"},
}
for key, testCase := range failingCases {
_, err := versioning.ResourceVersion(testCase.Object)
@ -132,20 +132,20 @@ func TestResourceVersionerOfAPI(t *testing.T) {
setCases := map[string]struct {
Object
Expected uint64
Expected string
}{
"pointer to api object with version": {&MyAPIObject{TypeMeta: TypeMeta{ResourceVersion: 1}}, 1},
"pointer to api object with version": {&MyAPIObject{TypeMeta: TypeMeta{ResourceVersion: "1"}}, "1"},
}
for key, testCase := range setCases {
if err := versioning.SetResourceVersion(testCase.Object, 5); err != nil {
if err := versioning.SetResourceVersion(testCase.Object, "5"); err != nil {
t.Errorf("%s: unexpected error %#v", key, err)
}
actual, err := versioning.ResourceVersion(testCase.Object)
if err != nil {
t.Errorf("%s: unexpected error %#v", key, err)
}
if actual != 5 {
t.Errorf("%s: expected %d, got %d", key, 5, actual)
if actual != "5" {
t.Errorf("%s: expected %d, got %d", key, "5", actual)
}
}
}

View File

@ -39,7 +39,7 @@ type TypeMeta struct {
ID string `json:"id,omitempty" yaml:"id,omitempty"`
CreationTimestamp util.Time `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"`
SelfLink string `json:"selfLink,omitempty" yaml:"selfLink,omitempty"`
ResourceVersion uint64 `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"`
ResourceVersion string `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"`
APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"`
}

View File

@ -90,7 +90,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
*newEndpoints = *currentEndpoints
newEndpoints.Endpoints = endpoints
if currentEndpoints.ResourceVersion == 0 {
if len(currentEndpoints.ResourceVersion) == 0 {
// No previous endpoints, create them
_, err = e.client.CreateEndpoints(nsCtx, newEndpoints)
} else {

View File

@ -194,7 +194,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
serverResponse{http.StatusOK, api.Endpoints{
TypeMeta: api.TypeMeta{
ID: "foo",
ResourceVersion: 1,
ResourceVersion: "1",
},
Endpoints: []string{"6.7.8.9:1000"},
}})
@ -206,7 +206,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
data := runtime.EncodeOrDie(testapi.CodecForVersionOrDie(), &api.Endpoints{
TypeMeta: api.TypeMeta{
ID: "foo",
ResourceVersion: 1,
ResourceVersion: "1",
},
Endpoints: []string{"1.2.3.4:8080"},
})
@ -229,7 +229,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
serverResponse{http.StatusOK, serviceList},
serverResponse{http.StatusOK, api.Endpoints{
TypeMeta: api.TypeMeta{
ResourceVersion: 1,
ResourceVersion: "1",
},
Endpoints: []string{"1.2.3.4:8080"},
}})
@ -263,7 +263,7 @@ func TestSyncEndpointsItems(t *testing.T) {
}
data := runtime.EncodeOrDie(testapi.CodecForVersionOrDie(), &api.Endpoints{
TypeMeta: api.TypeMeta{
ResourceVersion: 0,
ResourceVersion: "",
},
Endpoints: []string{"1.2.3.4:8080"},
})

View File

@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"reflect"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/coreos/go-etcd/etcd"
@ -62,12 +63,43 @@ type EtcdGetSet interface {
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
}
type EtcdResourceVersioner interface {
SetResourceVersion(obj runtime.Object, version uint64) error
ResourceVersion(obj runtime.Object) (uint64, error)
}
// RuntimeVersionAdapter converts a string based versioner to EtcdResourceVersioner
type RuntimeVersionAdapter struct {
Versioner runtime.ResourceVersioner
}
// SetResourceVersion implements EtcdResourceVersioner
func (a RuntimeVersionAdapter) SetResourceVersion(obj runtime.Object, version uint64) error {
if version == 0 {
return a.Versioner.SetResourceVersion(obj, "")
}
s := strconv.FormatUint(version, 10)
return a.Versioner.SetResourceVersion(obj, s)
}
// SetResourceVersion implements EtcdResourceVersioner
func (a RuntimeVersionAdapter) ResourceVersion(obj runtime.Object) (uint64, error) {
version, err := a.Versioner.ResourceVersion(obj)
if err != nil {
return 0, err
}
if version == "" {
return 0, nil
}
return strconv.ParseUint(version, 10, 64)
}
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
type EtcdHelper struct {
Client EtcdGetSet
Codec runtime.Codec
// optional, no atomic operations can be performed without this interface
ResourceVersioner runtime.ResourceVersioner
ResourceVersioner EtcdResourceVersioner
}
// IsEtcdNotFound returns true iff err is an etcd not found error.

View File

@ -44,7 +44,7 @@ func (*TestResource) IsAnAPIObject() {}
var scheme *runtime.Scheme
var codec runtime.Codec
var versioner = runtime.NewTypeMetaResourceVersioner()
var versioner = RuntimeVersionAdapter{runtime.NewTypeMetaResourceVersioner()}
func init() {
scheme = runtime.NewScheme()
@ -89,11 +89,11 @@ func TestExtractToList(t *testing.T) {
},
}
expect := api.PodList{
TypeMeta: api.TypeMeta{ResourceVersion: 10},
TypeMeta: api.TypeMeta{ResourceVersion: "10"},
Items: []api.Pod{
{TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: 1}},
{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: 2}},
{TypeMeta: api.TypeMeta{ID: "baz", ResourceVersion: 3}},
{TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}},
{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}},
{TypeMeta: api.TypeMeta{ID: "baz", ResourceVersion: "3"}},
},
}
@ -204,7 +204,7 @@ func TestSetObj(t *testing.T) {
}
func TestSetObjWithVersion(t *testing.T) {
obj := &api.Pod{TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: 1}}
obj := &api.Pod{TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}}
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
fakeClient.Data["/some/key"] = EtcdResponseWithError{
@ -254,7 +254,7 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) {
func TestAtomicUpdate(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := EtcdHelper{fakeClient, codec, runtime.NewTypeMetaResourceVersioner()}
helper := EtcdHelper{fakeClient, codec, versioner}
// Create a new node.
fakeClient.ExpectNotFoundGet("/some/key")
@ -308,7 +308,7 @@ func TestAtomicUpdate(t *testing.T) {
func TestAtomicUpdateNoChange(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := EtcdHelper{fakeClient, codec, runtime.NewTypeMetaResourceVersioner()}
helper := EtcdHelper{fakeClient, codec, versioner}
// Create a new node.
fakeClient.ExpectNotFoundGet("/some/key")
@ -339,7 +339,7 @@ func TestAtomicUpdateNoChange(t *testing.T) {
func TestAtomicUpdate_CreateCollision(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := EtcdHelper{fakeClient, codec, runtime.NewTypeMetaResourceVersioner()}
helper := EtcdHelper{fakeClient, codec, versioner}
fakeClient.ExpectNotFoundGet("/some/key")

View File

@ -82,7 +82,7 @@ type TransformFunc func(runtime.Object) (runtime.Object, error)
// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding runtime.Codec
versioner runtime.ResourceVersioner
versioner EtcdResourceVersioner
transform TransformFunc
list bool // If we're doing a recursive watch, should be true.
@ -107,7 +107,7 @@ const watchWaitDuration = 100 * time.Millisecond
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher {
func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,

View File

@ -395,7 +395,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
testCases := map[string]struct {
Response EtcdResponseWithError
ExpectedVersion uint64
ExpectedVersion string
ExpectedType watch.EventType
}{
"get value created": {
@ -410,7 +410,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
EtcdIndex: 2,
},
},
1,
"1",
watch.Added,
},
"get value modified": {
@ -425,7 +425,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
EtcdIndex: 3,
},
},
2,
"2",
watch.Modified,
},
}
@ -510,10 +510,10 @@ func TestWatchListFromZeroIndex(t *testing.T) {
if !ok {
t.Fatalf("expected a pod, got %#v", event.Object)
}
if actualPod.ResourceVersion != 1 {
if actualPod.ResourceVersion != "1" {
t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod)
}
pod.ResourceVersion = 1
pod.ResourceVersion = "1"
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}

View File

@ -116,13 +116,13 @@ func (lw *listWatch) List() (runtime.Object, error) {
Get()
}
func (lw *listWatch) Watch(resourceVersion uint64) (watch.Interface, error) {
func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) {
return lw.client.
Get().
Path("watch").
Path(lw.resource).
SelectorParam("fields", lw.fieldSelector).
UintParam("resourceVersion", resourceVersion).
Param("resourceVersion", resourceVersion).
Watch()
}

View File

@ -85,37 +85,41 @@ func TestCreateLists(t *testing.T) {
func TestCreateWatches(t *testing.T) {
factory := ConfigFactory{nil}
table := []struct {
rv uint64
rv string
location string
factory func() *listWatch
}{
// Minion watch
{
rv: 0,
rv: "",
location: "/api/" + testapi.Version() + "/watch/minions?fields=&resourceVersion=",
factory: factory.createMinionLW,
}, {
rv: "0",
location: "/api/" + testapi.Version() + "/watch/minions?fields=&resourceVersion=0",
factory: factory.createMinionLW,
}, {
rv: 42,
rv: "42",
location: "/api/" + testapi.Version() + "/watch/minions?fields=&resourceVersion=42",
factory: factory.createMinionLW,
},
// Assigned pod watches
{
rv: 0,
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0",
rv: "",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=",
factory: factory.createAssignedPodLW,
}, {
rv: 42,
rv: "42",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42",
factory: factory.createAssignedPodLW,
},
// Unassigned pod watches
{
rv: 0,
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0",
rv: "",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=",
factory: factory.createUnassignedPodLW,
}, {
rv: 42,
rv: "42",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42",
factory: factory.createUnassignedPodLW,
},

View File

@ -19,6 +19,7 @@ limitations under the License.
package integration
import (
"strconv"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -92,7 +93,7 @@ func TestExtractObj(t *testing.T) {
func TestWatch(t *testing.T) {
client := newEtcdClient()
helper := tools.EtcdHelper{Client: client, Codec: latest.Codec, ResourceVersioner: latest.ResourceVersioner}
helper := tools.EtcdHelper{Client: client, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
withEtcdKey(func(key string) {
resp, err := client.Set(key, runtime.EncodeOrDie(v1beta1.Codec, &api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
if err != nil {
@ -109,7 +110,7 @@ func TestWatch(t *testing.T) {
// version should match what we set
pod := event.Object.(*api.Pod)
if pod.ResourceVersion != expectedVersion {
if pod.ResourceVersion != strconv.FormatUint(expectedVersion, 10) {
t.Errorf("expected version %d, got %#v", expectedVersion, pod)
}
@ -134,7 +135,7 @@ func TestWatch(t *testing.T) {
t.Errorf("expected deleted event %#v", event)
}
pod = event.Object.(*api.Pod)
if pod.ResourceVersion != expectedVersion {
if pod.ResourceVersion != strconv.FormatUint(expectedVersion, 10) {
t.Errorf("expected version %d, got %#v", expectedVersion, pod)
}
})