Implement required sync changes everywhere.

Make requests sync by default, so that usage of them doesn't have to
change.
pull/6/head
Daniel Smith 2014-06-25 16:23:15 -07:00
parent 59a6489e84
commit c9246dc130
10 changed files with 225 additions and 80 deletions

View File

@ -338,7 +338,7 @@ func TestParseTimeout(t *testing.T) {
func TestSyncCreate(t *testing.T) {
storage := SimpleRESTStorage{
injectedFunction: func(obj interface{}) (interface{}, error) {
time.Sleep(2 * time.Second)
time.Sleep(200 * time.Millisecond)
return obj, nil
},
}
@ -375,7 +375,7 @@ func TestSyncCreate(t *testing.T) {
func TestSyncCreateTimeout(t *testing.T) {
storage := SimpleRESTStorage{
injectedFunction: func(obj interface{}) (interface{}, error) {
time.Sleep(10 * time.Second)
time.Sleep(400 * time.Millisecond)
return obj, nil
},
}
@ -387,7 +387,7 @@ func TestSyncCreateTimeout(t *testing.T) {
simple := Simple{Name: "foo"}
data, _ := api.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true&timeout=2s", bytes.NewBuffer(data))
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true&timeout=200ms", bytes.NewBuffer(data))
expectNoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)

View File

@ -27,7 +27,7 @@ func TestOperation(t *testing.T) {
c := make(chan interface{})
op := ops.NewOperation(c)
go func() {
time.Sleep(5 * time.Second)
time.Sleep(500 * time.Millisecond)
c <- "All done"
}()
@ -39,26 +39,26 @@ func TestOperation(t *testing.T) {
t.Errorf("expire incorrectly removed the operation %#v", ops)
}
op.WaitFor(time.Second)
op.WaitFor(10 * time.Millisecond)
if _, completed := op.Describe(); completed {
t.Errorf("Unexpectedly fast completion")
}
op.WaitFor(5 * time.Second)
op.WaitFor(time.Second)
if _, completed := op.Describe(); !completed {
t.Errorf("Unexpectedly slow completion")
}
time.Sleep(900 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
if op.expired(time.Now().Add(-time.Second)) {
t.Errorf("Should not be expired: %#v", op)
}
if !op.expired(time.Now().Add(-800 * time.Millisecond)) {
if !op.expired(time.Now().Add(-80 * time.Millisecond)) {
t.Errorf("Should be expired: %#v", op)
}
ops.expire(800 * time.Millisecond)
ops.expire(80 * time.Millisecond)
if tmp := ops.Get(op.ID); tmp != nil {
t.Errorf("expire failed to remove the operation %#v", ops)
}

View File

@ -43,9 +43,11 @@ import (
// Begin a request with a verb (GET, POST, PUT, DELETE)
func (c *Client) Verb(verb string) *Request {
return &Request{
verb: verb,
c: c,
path: "/api/v1beta1",
verb: verb,
c: c,
path: "/api/v1beta1",
sync: true,
timeout: 10 * time.Second,
}
}
@ -80,6 +82,7 @@ type Request struct {
body io.Reader
selector labels.Selector
timeout time.Duration
sync bool
}
// Append an item to the request path. You must call Path at least once.
@ -91,6 +94,15 @@ func (r *Request) Path(item string) *Request {
return r
}
// Set sync/async call status.
func (r *Request) Sync(sync bool) *Request {
if r.err != nil {
return r
}
r.sync = sync
return r
}
// Overwrite an existing path with the path parameter.
func (r *Request) AbsPath(path string) *Request {
if r.err != nil {
@ -168,8 +180,11 @@ func (r *Request) Do() Result {
if r.selector != nil {
query.Add("labels", r.selector.String())
}
if r.timeout != 0 {
query.Add("timeout", r.timeout.String())
if r.sync {
query.Add("sync", "true")
if r.timeout != 0 {
query.Add("timeout", r.timeout.String())
}
}
finalUrl += "?" + query.Encode()
req, err := http.NewRequest(r.verb, finalUrl, r.body)

View File

@ -58,7 +58,7 @@ func TestDoRequestNewWay(t *testing.T) {
t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
}
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &reqBody)
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" {
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&sync=true&timeout=1s" {
t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery)
}
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
@ -83,6 +83,7 @@ func TestDoRequestNewWayReader(t *testing.T) {
Path("foo/bar").
Path("baz").
Selector(labels.Set{"name": "foo"}.AsSelector()).
Sync(false).
Timeout(time.Second).
Body(bytes.NewBuffer(reqBodyExpected)).
Do().Get()
@ -97,7 +98,7 @@ func TestDoRequestNewWayReader(t *testing.T) {
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr)
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" {
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo" {
t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery)
}
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
@ -136,7 +137,7 @@ func TestDoRequestNewWayObj(t *testing.T) {
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr)
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" {
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&sync=true&timeout=1s" {
t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery)
}
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
@ -181,7 +182,7 @@ func TestDoRequestNewWayFile(t *testing.T) {
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr)
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" {
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&sync=true&timeout=1s" {
t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery)
}
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
@ -213,3 +214,19 @@ func TestAbsPath(t *testing.T) {
t.Errorf("unexpected path: %s, expected %s", r.path, expectedPath)
}
}
func TestSync(t *testing.T) {
c := New("", nil)
r := c.Get()
if !r.sync {
t.Errorf("sync has wrong default")
}
r.Sync(false)
if r.sync {
t.Errorf("'Sync' doesn't work")
}
r.Sync(true)
if !r.sync {
t.Errorf("'Sync' doesn't work")
}
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package registry
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
@ -55,7 +57,9 @@ func (storage *ControllerRegistryStorage) Get(id string) (interface{}, error) {
}
func (storage *ControllerRegistryStorage) Delete(id string) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.DeleteController(id)
return apiserver.MakeAsync(func() (interface{}, error) {
return api.Status{Status: api.StatusSuccess}, storage.registry.DeleteController(id)
}), nil
}
func (storage *ControllerRegistryStorage) Extract(body []byte) (interface{}, error) {
@ -64,10 +68,36 @@ func (storage *ControllerRegistryStorage) Extract(body []byte) (interface{}, err
return result, err
}
func (storage *ControllerRegistryStorage) Create(controller interface{}) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return controller }), storage.registry.CreateController(controller.(api.ReplicationController))
func (storage *ControllerRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
controller, ok := obj.(api.ReplicationController)
if !ok {
return nil, fmt.Errorf("not a replication controller: %#v", obj)
}
if controller.ID == "" {
return nil, fmt.Errorf("ID should not be empty: %#v", controller)
}
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.registry.CreateController(controller)
if err != nil {
return nil, err
}
return storage.registry.GetController(controller.ID)
}), nil
}
func (storage *ControllerRegistryStorage) Update(controller interface{}) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return controller }), storage.registry.UpdateController(controller.(api.ReplicationController))
func (storage *ControllerRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
controller, ok := obj.(api.ReplicationController)
if !ok {
return nil, fmt.Errorf("not a replication controller: %#v", obj)
}
if controller.ID == "" {
return nil, fmt.Errorf("ID should not be empty: %#v", controller)
}
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.registry.UpdateController(controller)
if err != nil {
return nil, err
}
return storage.registry.GetController(controller.ID)
}), nil
}

View File

@ -140,8 +140,28 @@ func (storage *MinionRegistryStorage) Extract(body []byte) (interface{}, error)
return minion, err
}
func (storage *MinionRegistryStorage) Create(minion interface{}) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return minion }), storage.registry.Insert(minion.(api.Minion).ID)
func (storage *MinionRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
minion, ok := obj.(api.Minion)
if !ok {
return nil, fmt.Errorf("not a minion: %#v", obj)
}
if minion.ID == "" {
return nil, fmt.Errorf("ID should not be empty: %#v", minion)
}
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.registry.Insert(minion.ID)
if err != nil {
return nil, err
}
contains, err := storage.registry.Contains(minion.ID)
if err != nil {
return nil, err
}
if contains {
return storage.toApiMinion(minion.ID), nil
}
return nil, fmt.Errorf("unable to add minion %#v", minion)
}), nil
}
func (storage *MinionRegistryStorage) Update(minion interface{}) (<-chan interface{}, error) {
@ -156,5 +176,7 @@ func (storage *MinionRegistryStorage) Delete(id string) (<-chan interface{}, err
if err != nil {
return nil, err
}
return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.Delete(id)
return apiserver.MakeAsync(func() (interface{}, error) {
return api.Status{Status: api.StatusSuccess}, storage.registry.Delete(id)
}), nil
}

View File

@ -73,20 +73,32 @@ func TestMinionRegistryStorage(t *testing.T) {
t.Errorf("has unexpected object")
}
if _, err := ms.Create(api.Minion{JSONBase: api.JSONBase{ID: "baz"}}); err != nil {
c, err := ms.Create(api.Minion{JSONBase: api.JSONBase{ID: "baz"}})
if err != nil {
t.Errorf("insert failed")
}
obj := <-c
if m, ok := obj.(api.Minion); !ok || m.ID != "baz" {
t.Errorf("insert return value was weird: %#v", obj)
}
if obj, err := ms.Get("baz"); err != nil || obj.(api.Minion).ID != "baz" {
t.Errorf("insert didn't actually insert")
}
if _, err := ms.Delete("bar"); err != nil {
c, err = ms.Delete("bar")
if err != nil {
t.Errorf("delete failed")
}
obj = <-c
if s, ok := obj.(api.Status); !ok || s.Status != api.StatusSuccess {
t.Errorf("delete return value was weird: %#v", obj)
}
if _, err := ms.Get("bar"); err != ErrDoesNotExist {
t.Errorf("delete didn't actually delete")
}
if _, err := ms.Delete("bar"); err != ErrDoesNotExist {
_, err = ms.Delete("bar")
if err != ErrDoesNotExist {
t.Errorf("delete returned wrong error")
}

View File

@ -131,7 +131,9 @@ func (storage *PodRegistryStorage) Get(id string) (interface{}, error) {
}
func (storage *PodRegistryStorage) Delete(id string) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.DeletePod(id)
return apiserver.MakeAsync(func() (interface{}, error) {
return api.Status{Status: api.StatusSuccess}, storage.registry.DeletePod(id)
}), nil
}
func (storage *PodRegistryStorage) Extract(body []byte) (interface{}, error) {
@ -140,19 +142,37 @@ func (storage *PodRegistryStorage) Extract(body []byte) (interface{}, error) {
return pod, err
}
func (storage *PodRegistryStorage) Create(pod interface{}) (<-chan interface{}, error) {
podObj := pod.(api.Pod)
if len(podObj.ID) == 0 {
func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
pod := obj.(api.Pod)
if len(pod.ID) == 0 {
return nil, fmt.Errorf("id is unspecified: %#v", pod)
}
machine, err := storage.scheduler.Schedule(podObj)
if err != nil {
return nil, err
return apiserver.MakeAsync(func() (interface{}, error) {
// TODO(lavalamp): Separate scheduler more cleanly.
machine, err := storage.scheduler.Schedule(pod)
if err != nil {
return nil, err
}
err = storage.registry.CreatePod(machine, pod)
if err != nil {
return nil, err
}
return storage.registry.GetPod(pod.ID)
}), nil
}
func (storage *PodRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
pod := obj.(api.Pod)
if len(pod.ID) == 0 {
return nil, fmt.Errorf("id is unspecified: %#v", pod)
}
return apiserver.MakeAsync(func() interface{} { return pod }), storage.registry.CreatePod(machine, podObj)
}
func (storage *PodRegistryStorage) Update(pod interface{}) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return pod }), storage.registry.UpdatePod(pod.(api.Pod))
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.registry.UpdatePod(pod)
if err != nil {
return nil, err
}
return storage.registry.GetPod(pod.ID)
}), nil
}

View File

@ -82,24 +82,26 @@ func (sr *ServiceRegistryStorage) Get(id string) (interface{}, error) {
}
func (sr *ServiceRegistryStorage) Delete(id string) (<-chan interface{}, error) {
svc, err := sr.Get(id)
service, err := sr.registry.GetService(id)
if err != nil {
return nil, err
}
if svc.(*api.Service).CreateExternalLoadBalancer {
var balancer cloudprovider.TCPLoadBalancer
var ok bool
if sr.cloud != nil {
balancer, ok = sr.cloud.TCPLoadBalancer()
}
if ok && balancer != nil {
err = balancer.DeleteTCPLoadBalancer(id, "us-central1")
if err != nil {
return nil, err
return apiserver.MakeAsync(func() (interface{}, error) {
if service.CreateExternalLoadBalancer {
var balancer cloudprovider.TCPLoadBalancer
var ok bool
if sr.cloud != nil {
balancer, ok = sr.cloud.TCPLoadBalancer()
}
if ok && balancer != nil {
err = balancer.DeleteTCPLoadBalancer(id, "us-central1")
if err != nil {
return nil, err
}
}
}
}
return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), sr.registry.DeleteService(id)
return api.Status{Status: api.StatusSuccess}, sr.registry.DeleteService(id)
}), nil
}
func (sr *ServiceRegistryStorage) Extract(body []byte) (interface{}, error) {
@ -110,29 +112,51 @@ func (sr *ServiceRegistryStorage) Extract(body []byte) (interface{}, error) {
func (sr *ServiceRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
srv := obj.(api.Service)
if srv.CreateExternalLoadBalancer {
var balancer cloudprovider.TCPLoadBalancer
var ok bool
if sr.cloud != nil {
balancer, ok = sr.cloud.TCPLoadBalancer()
}
if ok && balancer != nil {
hosts, err := sr.machines.List()
if err != nil {
return nil, err
}
err = balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, hosts)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.")
}
if srv.ID == "" {
return nil, fmt.Errorf("ID should not be empty: %#v", srv)
}
// TODO actually wait for the object to be fully created here.
return apiserver.MakeAsync(func() interface{} { return obj }), sr.registry.CreateService(srv)
return apiserver.MakeAsync(func() (interface{}, error) {
// TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers
// correctly no matter what http operations happen.
if srv.CreateExternalLoadBalancer {
var balancer cloudprovider.TCPLoadBalancer
var ok bool
if sr.cloud != nil {
balancer, ok = sr.cloud.TCPLoadBalancer()
}
if ok && balancer != nil {
hosts, err := sr.machines.List()
if err != nil {
return nil, err
}
err = balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, hosts)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.")
}
}
// TODO actually wait for the object to be fully created here.
err := sr.registry.CreateService(srv)
if err != nil {
return nil, err
}
return sr.registry.GetService(srv.ID)
}), nil
}
func (sr *ServiceRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return obj }), sr.registry.UpdateService(obj.(api.Service))
srv := obj.(api.Service)
if srv.ID == "" {
return nil, fmt.Errorf("ID should not be empty: %#v", srv)
}
return apiserver.MakeAsync(func() (interface{}, error) {
// TODO: check to see if external load balancer status changed
err := sr.registry.UpdateService(srv)
if err != nil {
return nil, err
}
return sr.registry.GetService(srv.ID)
}), nil
}

View File

@ -34,7 +34,8 @@ func TestServiceRegistry(t *testing.T) {
svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"},
}
storage.Create(svc)
c, _ := storage.Create(svc)
<-c
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
@ -57,7 +58,8 @@ func TestServiceRegistryExternalService(t *testing.T) {
JSONBase: api.JSONBase{ID: "foo"},
CreateExternalLoadBalancer: true,
}
storage.Create(svc)
c, _ := storage.Create(svc)
<-c
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "create" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
@ -82,7 +84,8 @@ func TestServiceRegistryExternalServiceError(t *testing.T) {
JSONBase: api.JSONBase{ID: "foo"},
CreateExternalLoadBalancer: true,
}
storage.Create(svc)
c, _ := storage.Create(svc)
<-c
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "create" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
@ -106,7 +109,8 @@ func TestServiceRegistryDelete(t *testing.T) {
}
memory.CreateService(svc)
storage.Delete(svc.ID)
c, _ := storage.Delete(svc.ID)
<-c
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
@ -131,7 +135,8 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
}
memory.CreateService(svc)
storage.Delete(svc.ID)
c, _ := storage.Delete(svc.ID)
<-c
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "delete" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)