diff --git a/cluster/addons/dns/kube2sky/kube2sky.go b/cluster/addons/dns/kube2sky/kube2sky.go index 958b80fd76..d151ff5e22 100644 --- a/cluster/addons/dns/kube2sky/kube2sky.go +++ b/cluster/addons/dns/kube2sky/kube2sky.go @@ -40,7 +40,7 @@ import ( kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" kframework "k8s.io/kubernetes/pkg/controller/framework" kselector "k8s.io/kubernetes/pkg/fields" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/wait" ) @@ -418,7 +418,7 @@ func newEtcdClient(etcdServer string) (*etcd.Client, error) { err error ) for attempt := 1; attempt <= maxConnectAttempts; attempt++ { - if _, err = etcdstorage.GetEtcdVersion(etcdServer); err == nil { + if _, err = etcdutil.GetEtcdVersion(etcdServer); err == nil { break } if attempt == maxConnectAttempts { diff --git a/contrib/mesos/pkg/election/etcd_master.go b/contrib/mesos/pkg/election/etcd_master.go index 8eb4fdb69b..d5dba9445e 100644 --- a/contrib/mesos/pkg/election/etcd_master.go +++ b/contrib/mesos/pkg/election/etcd_master.go @@ -22,7 +22,7 @@ import ( "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" @@ -91,10 +91,10 @@ func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd. // We don't handle the TTL delete w/o a write case here, it's handled in the next loop // iteration. _, err := e.etcd.CompareAndSwap(path, id, ttl, "", res.Node.ModifiedIndex) - if err != nil && !etcdstorage.IsEtcdTestFailed(err) { + if err != nil && !etcdutil.IsEtcdTestFailed(err) { return "", err } - if err != nil && etcdstorage.IsEtcdTestFailed(err) { + if err != nil && etcdutil.IsEtcdTestFailed(err) { return "", nil } return id, nil @@ -106,11 +106,11 @@ func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd. // returns "", err if an error occurred func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, error) { _, err := e.etcd.Create(path, id, ttl) - if err != nil && !etcdstorage.IsEtcdNodeExist(err) { + if err != nil && !etcdutil.IsEtcdNodeExist(err) { // unexpected error return "", err } - if err != nil && etcdstorage.IsEtcdNodeExist(err) { + if err != nil && etcdutil.IsEtcdNodeExist(err) { return "", nil } return id, nil @@ -125,12 +125,12 @@ func (e *etcdMasterElector) handleMaster(path, id string, ttl uint64) (string, e res, err := e.etcd.Get(path, false, false) // Unexpected error, bail out - if err != nil && !etcdstorage.IsEtcdNotFound(err) { + if err != nil && !etcdutil.IsEtcdNotFound(err) { return "", err } // There is no master, try to become the master. - if err != nil && etcdstorage.IsEtcdNotFound(err) { + if err != nil && etcdutil.IsEtcdNotFound(err) { return e.becomeMaster(path, id, ttl) } diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 313079521b..facc28b921 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -73,7 +73,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/master/ports" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/tools" // lock to this API version, compilation will fail when this becomes unsupported @@ -894,7 +894,7 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.FrameworkID, error) { if s.failoverTimeout > 0 { if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil { - if !etcdstorage.IsEtcdNotFound(err) { + if !etcdutil.IsEtcdNotFound(err) { return nil, fmt.Errorf("unexpected failure attempting to load framework ID from etcd: %v", err) } log.V(1).Infof("did not find framework ID in etcd") @@ -905,7 +905,7 @@ func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.Fram } else { //TODO(jdef) this seems like a totally hackish way to clean up the framework ID if _, err := client.Delete(meta.FrameworkIDKey, true); err != nil { - if !etcdstorage.IsEtcdNotFound(err) { + if !etcdutil.IsEtcdNotFound(err) { return nil, fmt.Errorf("failed to delete framework ID from etcd: %v", err) } log.V(1).Infof("nothing to delete: did not find framework ID in etcd") diff --git a/pkg/api/errors/etcd/etcd.go b/pkg/api/errors/etcd/etcd.go index 03580c8f68..45569fa458 100644 --- a/pkg/api/errors/etcd/etcd.go +++ b/pkg/api/errors/etcd/etcd.go @@ -18,16 +18,16 @@ package etcd import ( "k8s.io/kubernetes/pkg/api/errors" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" ) // InterpretListError converts a generic etcd error on a retrieval // operation into the appropriate API error. func InterpretListError(err error, kind string) error { switch { - case etcdstorage.IsEtcdNotFound(err): + case etcdutil.IsEtcdNotFound(err): return errors.NewNotFound(kind, "") - case etcdstorage.IsEtcdUnreachable(err): + case etcdutil.IsEtcdUnreachable(err): return errors.NewServerTimeout(kind, "list", 2) // TODO: make configurable or handled at a higher level default: return err @@ -38,9 +38,9 @@ func InterpretListError(err error, kind string) error { // operation into the appropriate API error. func InterpretGetError(err error, kind, name string) error { switch { - case etcdstorage.IsEtcdNotFound(err): + case etcdutil.IsEtcdNotFound(err): return errors.NewNotFound(kind, name) - case etcdstorage.IsEtcdUnreachable(err): + case etcdutil.IsEtcdUnreachable(err): return errors.NewServerTimeout(kind, "get", 2) // TODO: make configurable or handled at a higher level default: return err @@ -51,9 +51,9 @@ func InterpretGetError(err error, kind, name string) error { // operation into the appropriate API error. func InterpretCreateError(err error, kind, name string) error { switch { - case etcdstorage.IsEtcdNodeExist(err): + case etcdutil.IsEtcdNodeExist(err): return errors.NewAlreadyExists(kind, name) - case etcdstorage.IsEtcdUnreachable(err): + case etcdutil.IsEtcdUnreachable(err): return errors.NewServerTimeout(kind, "create", 2) // TODO: make configurable or handled at a higher level default: return err @@ -64,9 +64,9 @@ func InterpretCreateError(err error, kind, name string) error { // operation into the appropriate API error. func InterpretUpdateError(err error, kind, name string) error { switch { - case etcdstorage.IsEtcdTestFailed(err), etcdstorage.IsEtcdNodeExist(err): + case etcdutil.IsEtcdTestFailed(err), etcdutil.IsEtcdNodeExist(err): return errors.NewConflict(kind, name, err) - case etcdstorage.IsEtcdUnreachable(err): + case etcdutil.IsEtcdUnreachable(err): return errors.NewServerTimeout(kind, "update", 2) // TODO: make configurable or handled at a higher level default: return err @@ -77,9 +77,9 @@ func InterpretUpdateError(err error, kind, name string) error { // operation into the appropriate API error. func InterpretDeleteError(err error, kind, name string) error { switch { - case etcdstorage.IsEtcdNotFound(err): + case etcdutil.IsEtcdNotFound(err): return errors.NewNotFound(kind, name) - case etcdstorage.IsEtcdUnreachable(err): + case etcdutil.IsEtcdUnreachable(err): return errors.NewServerTimeout(kind, "delete", 2) // TODO: make configurable or handled at a higher level default: return err diff --git a/pkg/apiserver/errors.go b/pkg/apiserver/errors.go index f9c47250b1..00508be4c8 100644 --- a/pkg/apiserver/errors.go +++ b/pkg/apiserver/errors.go @@ -21,7 +21,7 @@ import ( "net/http" "k8s.io/kubernetes/pkg/api/unversioned" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/util" ) @@ -52,7 +52,7 @@ func errToAPIStatus(err error) *unversioned.Status { status := http.StatusInternalServerError switch { //TODO: replace me with NewConflictErr - case etcdstorage.IsEtcdTestFailed(err): + case etcdutil.IsEtcdTestFailed(err): status = http.StatusConflict } // Log errors that were not converted to an error status diff --git a/pkg/master/master.go b/pkg/master/master.go index 23b40c2f6d..c8b0c502a6 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -76,6 +76,7 @@ import ( thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata/etcd" "k8s.io/kubernetes/pkg/storage" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/ui" "k8s.io/kubernetes/pkg/util" @@ -855,7 +856,7 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server { addr = etcdUrl.Host port = 4001 } - serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/health", Validate: etcdstorage.EtcdHealthCheck} + serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/health", Validate: etcdutil.EtcdHealthCheck} } return serversToValidate } diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index f1c6cb3d7e..655210c3df 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -49,6 +49,7 @@ import ( etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" + etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/intstr" @@ -806,7 +807,7 @@ func testInstallThirdPartyAPIDeleteVersion(t *testing.T, version string) { thirdPartyObj := extensions.ThirdPartyResourceData{} err = master.thirdPartyStorage.Get( context.TODO(), expectedDeletedKey, &thirdPartyObj, false) - if !etcdstorage.IsEtcdNotFound(err) { + if !etcdutil.IsEtcdNotFound(err) { t.Errorf("expected deletion didn't happen: %v", err) } } @@ -893,7 +894,7 @@ func testInstallThirdPartyResourceRemove(t *testing.T, version string) { for _, key := range expectedDeletedKeys { thirdPartyObj := extensions.ThirdPartyResourceData{} err := master.thirdPartyStorage.Get(context.TODO(), key, &thirdPartyObj, false) - if !etcdstorage.IsEtcdNotFound(err) { + if !etcdutil.IsEtcdNotFound(err) { t.Errorf("expected deletion didn't happen: %v", err) } } diff --git a/pkg/registry/service/allocator/etcd/etcd.go b/pkg/registry/service/allocator/etcd/etcd.go index 35831a66db..3ef4a4fbed 100644 --- a/pkg/registry/service/allocator/etcd/etcd.go +++ b/pkg/registry/service/allocator/etcd/etcd.go @@ -28,7 +28,7 @@ import ( "k8s.io/kubernetes/pkg/registry/service/allocator" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "golang.org/x/net/context" ) @@ -174,7 +174,7 @@ func (e *Etcd) Refresh() (*api.RangeAllocation, error) { existing := &api.RangeAllocation{} if err := e.storage.Get(context.TODO(), e.baseKey, existing, false); err != nil { - if etcdstorage.IsEtcdNotFound(err) { + if etcdutil.IsEtcdNotFound(err) { return nil, nil } return nil, etcderr.InterpretGetError(err, e.kind, "") diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index ff3d1294c0..ffb16ea92c 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage/etcd/metrics" + etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" @@ -179,7 +180,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object) startTime := time.Now() response, err := h.client.Delete(key, false) metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) - if !IsEtcdNotFound(err) { + if !etcdutil.IsEtcdNotFound(err) { // if the object that existed prior to the delete is returned by etcd, update out. if err != nil || response.PrevNode != nil { _, _, err = h.extractObj(response, err, out, false, true) @@ -230,7 +231,7 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r response, err := h.client.Get(key, false, false) metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) - if err != nil && !IsEtcdNotFound(err) { + if err != nil && !etcdutil.IsEtcdNotFound(err) { return "", nil, nil, err } body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false) @@ -284,7 +285,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) trace.Step("Etcd node read") if err != nil { - if IsEtcdNotFound(err) { + if etcdutil.IsEtcdNotFound(err) { return nil } return err @@ -387,12 +388,12 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node } result, err := h.client.Get(key, true, true) if err != nil { - index, ok := etcdErrorIndex(err) - if !ok { - index = 0 + var index uint64 + if etcdError, ok := err.(*etcd.EtcdError); ok { + index = etcdError.Index } nodes := make([]*etcd.Node, 0) - if IsEtcdNotFound(err) { + if etcdutil.IsEtcdNotFound(err) { return nodes, index, nil } else { return nodes, index, err @@ -464,7 +465,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType startTime := time.Now() response, err := h.client.Create(key, string(data), ttl) metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime) - if IsEtcdNodeExist(err) { + if etcdutil.IsEtcdNodeExist(err) { continue } _, _, err = h.extractObj(response, err, ptrToType, false, false) @@ -479,7 +480,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType // Swap origBody with data, if origBody is the latest etcd data. response, err := h.client.CompareAndSwap(key, string(data), ttl, origBody, index) metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime) - if IsEtcdTestFailed(err) { + if etcdutil.IsEtcdTestFailed(err) { // Try again. continue } diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index 455ed3e9bb..16c9c82223 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -17,19 +17,11 @@ limitations under the License. package etcd import ( - "fmt" - "math/rand" - "net" - "net/http" - "net/http/httptest" "path" "reflect" - "strconv" "sync" "testing" - "time" - "github.com/coreos/go-etcd/etcd" "github.com/stretchr/testify/assert" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/api" @@ -40,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" + etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" storagetesting "k8s.io/kubernetes/pkg/storage/testing" // TODO: once fakeClient has been purged move utils @@ -69,18 +62,6 @@ func newEtcdHelper(client tools.EtcdClient, codec runtime.Codec, prefix string) return *NewEtcdStorage(client, codec, prefix).(*etcdHelper) } -func TestIsEtcdNotFound(t *testing.T) { - try := func(err error, isNotFound bool) { - if IsEtcdNotFound(err) != isNotFound { - t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound) - } - } - try(tools.EtcdErrorNotFound, true) - try(&etcd.EtcdError{ErrorCode: 101}, false) - try(nil, false) - try(fmt.Errorf("some other kind of error"), false) -} - // Returns an encoded version of api.Pod with the given name. func getEncodedPod(name string) string { pod, _ := testapi.Default.Codec().Encode(&api.Pod{ @@ -265,7 +246,7 @@ func TestGetNotFoundErr(t *testing.T) { var got api.Pod err := helper.Get(context.TODO(), boguskey, &got, false) - if !IsEtcdNotFound(err) { + if !etcdutil.IsEtcdNotFound(err) { t.Errorf("Unexpected reponse on key=%v, err=%v", key, err) } } @@ -539,53 +520,6 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) { } } -func TestGetEtcdVersion_ValidVersion(t *testing.T) { - testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprint(w, validEtcdVersion) - })) - defer testServer.Close() - - var version string - var err error - if version, err = GetEtcdVersion(testServer.URL); err != nil { - t.Errorf("Unexpected error: %v", err) - } - assert.Equal(t, validEtcdVersion, version, "Unexpected version") - assert.Nil(t, err) -} - -func TestGetEtcdVersion_ErrorStatus(t *testing.T) { - testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusServiceUnavailable) - })) - defer testServer.Close() - - _, err := GetEtcdVersion(testServer.URL) - assert.NotNil(t, err) -} - -func TestGetEtcdVersion_NotListening(t *testing.T) { - portIsOpen := func(port int) bool { - conn, err := net.DialTimeout("tcp", "127.0.0.1:"+strconv.Itoa(port), 1*time.Second) - if err == nil { - conn.Close() - return true - } - return false - } - - port := rand.Intn((1 << 16) - 1) - for tried := 0; portIsOpen(port); tried++ { - if tried >= 10 { - t.Fatal("Couldn't find a closed TCP port to continue testing") - } - port++ - } - - _, err := GetEtcdVersion("http://127.0.0.1:" + strconv.Itoa(port)) - assert.NotNil(t, err) -} - func TestPrefixEtcdKey(t *testing.T) { server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) @@ -606,32 +540,3 @@ func TestPrefixEtcdKey(t *testing.T) { assert.Equal(t, keyBefore, keyAfter, "Prefix incorrectly added by EtcdHelper") } - -func TestEtcdHealthCheck(t *testing.T) { - tests := []struct { - data string - expectErr bool - }{ - { - data: "{\"health\": \"true\"}", - expectErr: false, - }, - { - data: "{\"health\": \"false\"}", - expectErr: true, - }, - { - data: "invalid json", - expectErr: true, - }, - } - for _, test := range tests { - err := EtcdHealthCheck([]byte(test.data)) - if err != nil && !test.expectErr { - t.Errorf("unexpected error: %v", err) - } - if err == nil && test.expectErr { - t.Error("unexpected non-error") - } - } -} diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index 5eefcd1ca5..dd81234e15 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" + etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" @@ -139,12 +140,12 @@ func (w *etcdWatcher) etcdWatch(client tools.EtcdClient, key string, resourceVer func etcdGetInitialWatchState(client tools.EtcdClient, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) { resp, err := client.Get(key, false, recursive) if err != nil { - if !IsEtcdNotFound(err) { + if !etcdutil.IsEtcdNotFound(err) { glog.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err) return resourceVersion, err } - if index, ok := etcdErrorIndex(err); ok { - resourceVersion = index + if etcdError, ok := err.(*etcd.EtcdError); ok { + resourceVersion = etcdError.Index } return resourceVersion, nil } @@ -184,7 +185,7 @@ func (w *etcdWatcher) translate() { if err != nil { var status *unversioned.Status switch { - case IsEtcdWatchExpired(err): + case etcdutil.IsEtcdWatchExpired(err): status = &unversioned.Status{ Status: unversioned.StatusFailure, Message: err.Error(), @@ -193,7 +194,7 @@ func (w *etcdWatcher) translate() { } // TODO: need to generate errors using api/errors which has a circular dependency on this package // no other way to inject errors - // case IsEtcdUnreachable(err): + // case etcdutil.IsEtcdUnreachable(err): // status = errors.NewServerTimeout(...) default: status = &unversioned.Status{ diff --git a/pkg/storage/etcd/util/doc.go b/pkg/storage/etcd/util/doc.go new file mode 100644 index 0000000000..aa1039fafa --- /dev/null +++ b/pkg/storage/etcd/util/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package util holds generic etcd-related utility functions that any user of ectd might want to +// use, without pulling in kubernetes-specific code. +package util diff --git a/pkg/storage/etcd/etcd_util.go b/pkg/storage/etcd/util/etcd_util.go similarity index 74% rename from pkg/storage/etcd/etcd_util.go rename to pkg/storage/etcd/util/etcd_util.go index e58a9fb94f..d4cebf26bd 100644 --- a/pkg/storage/etcd/etcd_util.go +++ b/pkg/storage/etcd/util/etcd_util.go @@ -14,17 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package etcd +package util import ( "encoding/json" "fmt" "io/ioutil" "net/http" - "os/exec" goetcd "github.com/coreos/go-etcd/etcd" - "github.com/golang/glog" "k8s.io/kubernetes/pkg/tools" ) @@ -53,26 +51,12 @@ func IsEtcdUnreachable(err error) bool { return isEtcdErrorNum(err, tools.EtcdErrorCodeUnreachable) } -// IsEtcdWatchStoppedByUser returns true if and only if err is a client triggered stop. -func IsEtcdWatchStoppedByUser(err error) bool { - return goetcd.ErrWatchStoppedByUser == err -} - // isEtcdErrorNum returns true if and only if err is an etcd error, whose errorCode matches errorCode func isEtcdErrorNum(err error, errorCode int) bool { etcdError, ok := err.(*goetcd.EtcdError) return ok && etcdError != nil && etcdError.ErrorCode == errorCode } -// etcdErrorIndex returns the index associated with the error message and whether the -// index was available. -func etcdErrorIndex(err error) (uint64, bool) { - if etcdError, ok := err.(*goetcd.EtcdError); ok { - return etcdError.Index, true - } - return 0, false -} - // GetEtcdVersion performs a version check against the provided Etcd server, // returning the string response, and error (if any). func GetEtcdVersion(host string) (string, error) { @@ -91,29 +75,6 @@ func GetEtcdVersion(host string) (string, error) { return string(versionBytes), nil } -func startEtcd() (*exec.Cmd, error) { - cmd := exec.Command("etcd") - err := cmd.Start() - if err != nil { - return nil, err - } - return cmd, nil -} - -func NewEtcdClientStartServerIfNecessary(server string) (tools.EtcdClient, error) { - _, err := GetEtcdVersion(server) - if err != nil { - glog.Infof("Failed to find etcd, attempting to start.") - _, err := startEtcd() - if err != nil { - return nil, err - } - } - - servers := []string{server} - return goetcd.NewClient(servers), nil -} - type etcdHealth struct { // Note this has to be public so the json library can modify it. Health string `json:"health"` diff --git a/pkg/storage/etcd/util/etcd_util_test.go b/pkg/storage/etcd/util/etcd_util_test.go new file mode 100644 index 0000000000..86f50784b3 --- /dev/null +++ b/pkg/storage/etcd/util/etcd_util_test.go @@ -0,0 +1,125 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "math/rand" + "net" + "net/http" + "net/http/httptest" + "strconv" + "testing" + "time" + + "github.com/coreos/go-etcd/etcd" + "github.com/stretchr/testify/assert" + + // TODO: once fakeClient has been purged move utils + // and eliminate these deps + "k8s.io/kubernetes/pkg/tools" +) + +const validEtcdVersion = "etcd 2.0.9" + +func TestIsEtcdNotFound(t *testing.T) { + try := func(err error, isNotFound bool) { + if IsEtcdNotFound(err) != isNotFound { + t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound) + } + } + try(tools.EtcdErrorNotFound, true) + try(&etcd.EtcdError{ErrorCode: 101}, false) + try(nil, false) + try(fmt.Errorf("some other kind of error"), false) +} + +func TestGetEtcdVersion_ValidVersion(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, validEtcdVersion) + })) + defer testServer.Close() + + var version string + var err error + if version, err = GetEtcdVersion(testServer.URL); err != nil { + t.Errorf("Unexpected error: %v", err) + } + assert.Equal(t, validEtcdVersion, version, "Unexpected version") + assert.Nil(t, err) +} + +func TestGetEtcdVersion_ErrorStatus(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer testServer.Close() + + _, err := GetEtcdVersion(testServer.URL) + assert.NotNil(t, err) +} + +func TestGetEtcdVersion_NotListening(t *testing.T) { + portIsOpen := func(port int) bool { + conn, err := net.DialTimeout("tcp", "127.0.0.1:"+strconv.Itoa(port), 1*time.Second) + if err == nil { + conn.Close() + return true + } + return false + } + + port := rand.Intn((1 << 16) - 1) + for tried := 0; portIsOpen(port); tried++ { + if tried >= 10 { + t.Fatal("Couldn't find a closed TCP port to continue testing") + } + port++ + } + + _, err := GetEtcdVersion("http://127.0.0.1:" + strconv.Itoa(port)) + assert.NotNil(t, err) +} + +func TestEtcdHealthCheck(t *testing.T) { + tests := []struct { + data string + expectErr bool + }{ + { + data: "{\"health\": \"true\"}", + expectErr: false, + }, + { + data: "{\"health\": \"false\"}", + expectErr: true, + }, + { + data: "invalid json", + expectErr: true, + }, + } + for _, test := range tests { + err := EtcdHealthCheck([]byte(test.data)) + if err != nil && !test.expectErr { + t.Errorf("unexpected error: %v", err) + } + if err == nil && test.expectErr { + t.Error("unexpected non-error") + } + } +}