mirror of https://github.com/k3s-io/k3s
Move etcd_util.go to separate package
parent
07664a6104
commit
20ead45af9
|
@ -40,7 +40,7 @@ import (
|
|||
kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||
kframework "k8s.io/kubernetes/pkg/controller/framework"
|
||||
kselector "k8s.io/kubernetes/pkg/fields"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
@ -418,7 +418,7 @@ func newEtcdClient(etcdServer string) (*etcd.Client, error) {
|
|||
err error
|
||||
)
|
||||
for attempt := 1; attempt <= maxConnectAttempts; attempt++ {
|
||||
if _, err = etcdstorage.GetEtcdVersion(etcdServer); err == nil {
|
||||
if _, err = etcdutil.GetEtcdVersion(etcdServer); err == nil {
|
||||
break
|
||||
}
|
||||
if attempt == maxConnectAttempts {
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/golang/glog"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
@ -91,10 +91,10 @@ func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd.
|
|||
// We don't handle the TTL delete w/o a write case here, it's handled in the next loop
|
||||
// iteration.
|
||||
_, err := e.etcd.CompareAndSwap(path, id, ttl, "", res.Node.ModifiedIndex)
|
||||
if err != nil && !etcdstorage.IsEtcdTestFailed(err) {
|
||||
if err != nil && !etcdutil.IsEtcdTestFailed(err) {
|
||||
return "", err
|
||||
}
|
||||
if err != nil && etcdstorage.IsEtcdTestFailed(err) {
|
||||
if err != nil && etcdutil.IsEtcdTestFailed(err) {
|
||||
return "", nil
|
||||
}
|
||||
return id, nil
|
||||
|
@ -106,11 +106,11 @@ func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd.
|
|||
// returns "", err if an error occurred
|
||||
func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, error) {
|
||||
_, err := e.etcd.Create(path, id, ttl)
|
||||
if err != nil && !etcdstorage.IsEtcdNodeExist(err) {
|
||||
if err != nil && !etcdutil.IsEtcdNodeExist(err) {
|
||||
// unexpected error
|
||||
return "", err
|
||||
}
|
||||
if err != nil && etcdstorage.IsEtcdNodeExist(err) {
|
||||
if err != nil && etcdutil.IsEtcdNodeExist(err) {
|
||||
return "", nil
|
||||
}
|
||||
return id, nil
|
||||
|
@ -125,12 +125,12 @@ func (e *etcdMasterElector) handleMaster(path, id string, ttl uint64) (string, e
|
|||
res, err := e.etcd.Get(path, false, false)
|
||||
|
||||
// Unexpected error, bail out
|
||||
if err != nil && !etcdstorage.IsEtcdNotFound(err) {
|
||||
if err != nil && !etcdutil.IsEtcdNotFound(err) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// There is no master, try to become the master.
|
||||
if err != nil && etcdstorage.IsEtcdNotFound(err) {
|
||||
if err != nil && etcdutil.IsEtcdNotFound(err) {
|
||||
return e.becomeMaster(path, id, ttl)
|
||||
}
|
||||
|
||||
|
|
|
@ -73,7 +73,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/healthz"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
|
||||
// lock to this API version, compilation will fail when this becomes unsupported
|
||||
|
@ -894,7 +894,7 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred
|
|||
func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.FrameworkID, error) {
|
||||
if s.failoverTimeout > 0 {
|
||||
if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil {
|
||||
if !etcdstorage.IsEtcdNotFound(err) {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
return nil, fmt.Errorf("unexpected failure attempting to load framework ID from etcd: %v", err)
|
||||
}
|
||||
log.V(1).Infof("did not find framework ID in etcd")
|
||||
|
@ -905,7 +905,7 @@ func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.Fram
|
|||
} else {
|
||||
//TODO(jdef) this seems like a totally hackish way to clean up the framework ID
|
||||
if _, err := client.Delete(meta.FrameworkIDKey, true); err != nil {
|
||||
if !etcdstorage.IsEtcdNotFound(err) {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
return nil, fmt.Errorf("failed to delete framework ID from etcd: %v", err)
|
||||
}
|
||||
log.V(1).Infof("nothing to delete: did not find framework ID in etcd")
|
||||
|
|
|
@ -18,16 +18,16 @@ package etcd
|
|||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
)
|
||||
|
||||
// InterpretListError converts a generic etcd error on a retrieval
|
||||
// operation into the appropriate API error.
|
||||
func InterpretListError(err error, kind string) error {
|
||||
switch {
|
||||
case etcdstorage.IsEtcdNotFound(err):
|
||||
case etcdutil.IsEtcdNotFound(err):
|
||||
return errors.NewNotFound(kind, "")
|
||||
case etcdstorage.IsEtcdUnreachable(err):
|
||||
case etcdutil.IsEtcdUnreachable(err):
|
||||
return errors.NewServerTimeout(kind, "list", 2) // TODO: make configurable or handled at a higher level
|
||||
default:
|
||||
return err
|
||||
|
@ -38,9 +38,9 @@ func InterpretListError(err error, kind string) error {
|
|||
// operation into the appropriate API error.
|
||||
func InterpretGetError(err error, kind, name string) error {
|
||||
switch {
|
||||
case etcdstorage.IsEtcdNotFound(err):
|
||||
case etcdutil.IsEtcdNotFound(err):
|
||||
return errors.NewNotFound(kind, name)
|
||||
case etcdstorage.IsEtcdUnreachable(err):
|
||||
case etcdutil.IsEtcdUnreachable(err):
|
||||
return errors.NewServerTimeout(kind, "get", 2) // TODO: make configurable or handled at a higher level
|
||||
default:
|
||||
return err
|
||||
|
@ -51,9 +51,9 @@ func InterpretGetError(err error, kind, name string) error {
|
|||
// operation into the appropriate API error.
|
||||
func InterpretCreateError(err error, kind, name string) error {
|
||||
switch {
|
||||
case etcdstorage.IsEtcdNodeExist(err):
|
||||
case etcdutil.IsEtcdNodeExist(err):
|
||||
return errors.NewAlreadyExists(kind, name)
|
||||
case etcdstorage.IsEtcdUnreachable(err):
|
||||
case etcdutil.IsEtcdUnreachable(err):
|
||||
return errors.NewServerTimeout(kind, "create", 2) // TODO: make configurable or handled at a higher level
|
||||
default:
|
||||
return err
|
||||
|
@ -64,9 +64,9 @@ func InterpretCreateError(err error, kind, name string) error {
|
|||
// operation into the appropriate API error.
|
||||
func InterpretUpdateError(err error, kind, name string) error {
|
||||
switch {
|
||||
case etcdstorage.IsEtcdTestFailed(err), etcdstorage.IsEtcdNodeExist(err):
|
||||
case etcdutil.IsEtcdTestFailed(err), etcdutil.IsEtcdNodeExist(err):
|
||||
return errors.NewConflict(kind, name, err)
|
||||
case etcdstorage.IsEtcdUnreachable(err):
|
||||
case etcdutil.IsEtcdUnreachable(err):
|
||||
return errors.NewServerTimeout(kind, "update", 2) // TODO: make configurable or handled at a higher level
|
||||
default:
|
||||
return err
|
||||
|
@ -77,9 +77,9 @@ func InterpretUpdateError(err error, kind, name string) error {
|
|||
// operation into the appropriate API error.
|
||||
func InterpretDeleteError(err error, kind, name string) error {
|
||||
switch {
|
||||
case etcdstorage.IsEtcdNotFound(err):
|
||||
case etcdutil.IsEtcdNotFound(err):
|
||||
return errors.NewNotFound(kind, name)
|
||||
case etcdstorage.IsEtcdUnreachable(err):
|
||||
case etcdutil.IsEtcdUnreachable(err):
|
||||
return errors.NewServerTimeout(kind, "delete", 2) // TODO: make configurable or handled at a higher level
|
||||
default:
|
||||
return err
|
||||
|
|
|
@ -21,7 +21,7 @@ import (
|
|||
"net/http"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
|
@ -52,7 +52,7 @@ func errToAPIStatus(err error) *unversioned.Status {
|
|||
status := http.StatusInternalServerError
|
||||
switch {
|
||||
//TODO: replace me with NewConflictErr
|
||||
case etcdstorage.IsEtcdTestFailed(err):
|
||||
case etcdutil.IsEtcdTestFailed(err):
|
||||
status = http.StatusConflict
|
||||
}
|
||||
// Log errors that were not converted to an error status
|
||||
|
|
|
@ -76,6 +76,7 @@ import (
|
|||
thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata/etcd"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
"k8s.io/kubernetes/pkg/ui"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
|
@ -855,7 +856,7 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
|
|||
addr = etcdUrl.Host
|
||||
port = 4001
|
||||
}
|
||||
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/health", Validate: etcdstorage.EtcdHealthCheck}
|
||||
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/health", Validate: etcdutil.EtcdHealthCheck}
|
||||
}
|
||||
return serversToValidate
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ import (
|
|||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
|
||||
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/intstr"
|
||||
|
||||
|
@ -806,7 +807,7 @@ func testInstallThirdPartyAPIDeleteVersion(t *testing.T, version string) {
|
|||
thirdPartyObj := extensions.ThirdPartyResourceData{}
|
||||
err = master.thirdPartyStorage.Get(
|
||||
context.TODO(), expectedDeletedKey, &thirdPartyObj, false)
|
||||
if !etcdstorage.IsEtcdNotFound(err) {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
t.Errorf("expected deletion didn't happen: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -893,7 +894,7 @@ func testInstallThirdPartyResourceRemove(t *testing.T, version string) {
|
|||
for _, key := range expectedDeletedKeys {
|
||||
thirdPartyObj := extensions.ThirdPartyResourceData{}
|
||||
err := master.thirdPartyStorage.Get(context.TODO(), key, &thirdPartyObj, false)
|
||||
if !etcdstorage.IsEtcdNotFound(err) {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
t.Errorf("expected deletion didn't happen: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/registry/service/allocator"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
@ -174,7 +174,7 @@ func (e *Etcd) Refresh() (*api.RangeAllocation, error) {
|
|||
|
||||
existing := &api.RangeAllocation{}
|
||||
if err := e.storage.Get(context.TODO(), e.baseKey, existing, false); err != nil {
|
||||
if etcdstorage.IsEtcdNotFound(err) {
|
||||
if etcdutil.IsEtcdNotFound(err) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, etcderr.InterpretGetError(err, e.kind, "")
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/metrics"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
@ -179,7 +180,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object)
|
|||
startTime := time.Now()
|
||||
response, err := h.client.Delete(key, false)
|
||||
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
|
||||
if !IsEtcdNotFound(err) {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
// if the object that existed prior to the delete is returned by etcd, update out.
|
||||
if err != nil || response.PrevNode != nil {
|
||||
_, _, err = h.extractObj(response, err, out, false, true)
|
||||
|
@ -230,7 +231,7 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r
|
|||
response, err := h.client.Get(key, false, false)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
|
||||
|
||||
if err != nil && !IsEtcdNotFound(err) {
|
||||
if err != nil && !etcdutil.IsEtcdNotFound(err) {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
|
||||
|
@ -284,7 +285,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
|
|||
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
||||
trace.Step("Etcd node read")
|
||||
if err != nil {
|
||||
if IsEtcdNotFound(err) {
|
||||
if etcdutil.IsEtcdNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
|
@ -387,12 +388,12 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node
|
|||
}
|
||||
result, err := h.client.Get(key, true, true)
|
||||
if err != nil {
|
||||
index, ok := etcdErrorIndex(err)
|
||||
if !ok {
|
||||
index = 0
|
||||
var index uint64
|
||||
if etcdError, ok := err.(*etcd.EtcdError); ok {
|
||||
index = etcdError.Index
|
||||
}
|
||||
nodes := make([]*etcd.Node, 0)
|
||||
if IsEtcdNotFound(err) {
|
||||
if etcdutil.IsEtcdNotFound(err) {
|
||||
return nodes, index, nil
|
||||
} else {
|
||||
return nodes, index, err
|
||||
|
@ -464,7 +465,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
|
|||
startTime := time.Now()
|
||||
response, err := h.client.Create(key, string(data), ttl)
|
||||
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
|
||||
if IsEtcdNodeExist(err) {
|
||||
if etcdutil.IsEtcdNodeExist(err) {
|
||||
continue
|
||||
}
|
||||
_, _, err = h.extractObj(response, err, ptrToType, false, false)
|
||||
|
@ -479,7 +480,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
|
|||
// Swap origBody with data, if origBody is the latest etcd data.
|
||||
response, err := h.client.CompareAndSwap(key, string(data), ttl, origBody, index)
|
||||
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
|
||||
if IsEtcdTestFailed(err) {
|
||||
if etcdutil.IsEtcdTestFailed(err) {
|
||||
// Try again.
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -17,19 +17,11 @@ limitations under the License.
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"golang.org/x/net/context"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
|
@ -40,6 +32,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/storage"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
|
||||
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
storagetesting "k8s.io/kubernetes/pkg/storage/testing"
|
||||
|
||||
// TODO: once fakeClient has been purged move utils
|
||||
|
@ -69,18 +62,6 @@ func newEtcdHelper(client tools.EtcdClient, codec runtime.Codec, prefix string)
|
|||
return *NewEtcdStorage(client, codec, prefix).(*etcdHelper)
|
||||
}
|
||||
|
||||
func TestIsEtcdNotFound(t *testing.T) {
|
||||
try := func(err error, isNotFound bool) {
|
||||
if IsEtcdNotFound(err) != isNotFound {
|
||||
t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound)
|
||||
}
|
||||
}
|
||||
try(tools.EtcdErrorNotFound, true)
|
||||
try(&etcd.EtcdError{ErrorCode: 101}, false)
|
||||
try(nil, false)
|
||||
try(fmt.Errorf("some other kind of error"), false)
|
||||
}
|
||||
|
||||
// Returns an encoded version of api.Pod with the given name.
|
||||
func getEncodedPod(name string) string {
|
||||
pod, _ := testapi.Default.Codec().Encode(&api.Pod{
|
||||
|
@ -265,7 +246,7 @@ func TestGetNotFoundErr(t *testing.T) {
|
|||
|
||||
var got api.Pod
|
||||
err := helper.Get(context.TODO(), boguskey, &got, false)
|
||||
if !IsEtcdNotFound(err) {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
t.Errorf("Unexpected reponse on key=%v, err=%v", key, err)
|
||||
}
|
||||
}
|
||||
|
@ -539,53 +520,6 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestGetEtcdVersion_ValidVersion(t *testing.T) {
|
||||
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprint(w, validEtcdVersion)
|
||||
}))
|
||||
defer testServer.Close()
|
||||
|
||||
var version string
|
||||
var err error
|
||||
if version, err = GetEtcdVersion(testServer.URL); err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
assert.Equal(t, validEtcdVersion, version, "Unexpected version")
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestGetEtcdVersion_ErrorStatus(t *testing.T) {
|
||||
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
}))
|
||||
defer testServer.Close()
|
||||
|
||||
_, err := GetEtcdVersion(testServer.URL)
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestGetEtcdVersion_NotListening(t *testing.T) {
|
||||
portIsOpen := func(port int) bool {
|
||||
conn, err := net.DialTimeout("tcp", "127.0.0.1:"+strconv.Itoa(port), 1*time.Second)
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
port := rand.Intn((1 << 16) - 1)
|
||||
for tried := 0; portIsOpen(port); tried++ {
|
||||
if tried >= 10 {
|
||||
t.Fatal("Couldn't find a closed TCP port to continue testing")
|
||||
}
|
||||
port++
|
||||
}
|
||||
|
||||
_, err := GetEtcdVersion("http://127.0.0.1:" + strconv.Itoa(port))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestPrefixEtcdKey(t *testing.T) {
|
||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||
defer server.Terminate(t)
|
||||
|
@ -606,32 +540,3 @@ func TestPrefixEtcdKey(t *testing.T) {
|
|||
|
||||
assert.Equal(t, keyBefore, keyAfter, "Prefix incorrectly added by EtcdHelper")
|
||||
}
|
||||
|
||||
func TestEtcdHealthCheck(t *testing.T) {
|
||||
tests := []struct {
|
||||
data string
|
||||
expectErr bool
|
||||
}{
|
||||
{
|
||||
data: "{\"health\": \"true\"}",
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
data: "{\"health\": \"false\"}",
|
||||
expectErr: true,
|
||||
},
|
||||
{
|
||||
data: "invalid json",
|
||||
expectErr: true,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
err := EtcdHealthCheck([]byte(test.data))
|
||||
if err != nil && !test.expectErr {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if err == nil && test.expectErr {
|
||||
t.Error("unexpected non-error")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
@ -139,12 +140,12 @@ func (w *etcdWatcher) etcdWatch(client tools.EtcdClient, key string, resourceVer
|
|||
func etcdGetInitialWatchState(client tools.EtcdClient, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
|
||||
resp, err := client.Get(key, false, recursive)
|
||||
if err != nil {
|
||||
if !IsEtcdNotFound(err) {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
glog.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err)
|
||||
return resourceVersion, err
|
||||
}
|
||||
if index, ok := etcdErrorIndex(err); ok {
|
||||
resourceVersion = index
|
||||
if etcdError, ok := err.(*etcd.EtcdError); ok {
|
||||
resourceVersion = etcdError.Index
|
||||
}
|
||||
return resourceVersion, nil
|
||||
}
|
||||
|
@ -184,7 +185,7 @@ func (w *etcdWatcher) translate() {
|
|||
if err != nil {
|
||||
var status *unversioned.Status
|
||||
switch {
|
||||
case IsEtcdWatchExpired(err):
|
||||
case etcdutil.IsEtcdWatchExpired(err):
|
||||
status = &unversioned.Status{
|
||||
Status: unversioned.StatusFailure,
|
||||
Message: err.Error(),
|
||||
|
@ -193,7 +194,7 @@ func (w *etcdWatcher) translate() {
|
|||
}
|
||||
// TODO: need to generate errors using api/errors which has a circular dependency on this package
|
||||
// no other way to inject errors
|
||||
// case IsEtcdUnreachable(err):
|
||||
// case etcdutil.IsEtcdUnreachable(err):
|
||||
// status = errors.NewServerTimeout(...)
|
||||
default:
|
||||
status = &unversioned.Status{
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package util holds generic etcd-related utility functions that any user of ectd might want to
|
||||
// use, without pulling in kubernetes-specific code.
|
||||
package util
|
|
@ -14,17 +14,15 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd
|
||||
package util
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os/exec"
|
||||
|
||||
goetcd "github.com/coreos/go-etcd/etcd"
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
)
|
||||
|
||||
|
@ -53,26 +51,12 @@ func IsEtcdUnreachable(err error) bool {
|
|||
return isEtcdErrorNum(err, tools.EtcdErrorCodeUnreachable)
|
||||
}
|
||||
|
||||
// IsEtcdWatchStoppedByUser returns true if and only if err is a client triggered stop.
|
||||
func IsEtcdWatchStoppedByUser(err error) bool {
|
||||
return goetcd.ErrWatchStoppedByUser == err
|
||||
}
|
||||
|
||||
// isEtcdErrorNum returns true if and only if err is an etcd error, whose errorCode matches errorCode
|
||||
func isEtcdErrorNum(err error, errorCode int) bool {
|
||||
etcdError, ok := err.(*goetcd.EtcdError)
|
||||
return ok && etcdError != nil && etcdError.ErrorCode == errorCode
|
||||
}
|
||||
|
||||
// etcdErrorIndex returns the index associated with the error message and whether the
|
||||
// index was available.
|
||||
func etcdErrorIndex(err error) (uint64, bool) {
|
||||
if etcdError, ok := err.(*goetcd.EtcdError); ok {
|
||||
return etcdError.Index, true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// GetEtcdVersion performs a version check against the provided Etcd server,
|
||||
// returning the string response, and error (if any).
|
||||
func GetEtcdVersion(host string) (string, error) {
|
||||
|
@ -91,29 +75,6 @@ func GetEtcdVersion(host string) (string, error) {
|
|||
return string(versionBytes), nil
|
||||
}
|
||||
|
||||
func startEtcd() (*exec.Cmd, error) {
|
||||
cmd := exec.Command("etcd")
|
||||
err := cmd.Start()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cmd, nil
|
||||
}
|
||||
|
||||
func NewEtcdClientStartServerIfNecessary(server string) (tools.EtcdClient, error) {
|
||||
_, err := GetEtcdVersion(server)
|
||||
if err != nil {
|
||||
glog.Infof("Failed to find etcd, attempting to start.")
|
||||
_, err := startEtcd()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
servers := []string{server}
|
||||
return goetcd.NewClient(servers), nil
|
||||
}
|
||||
|
||||
type etcdHealth struct {
|
||||
// Note this has to be public so the json library can modify it.
|
||||
Health string `json:"health"`
|
|
@ -0,0 +1,125 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
// TODO: once fakeClient has been purged move utils
|
||||
// and eliminate these deps
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
)
|
||||
|
||||
const validEtcdVersion = "etcd 2.0.9"
|
||||
|
||||
func TestIsEtcdNotFound(t *testing.T) {
|
||||
try := func(err error, isNotFound bool) {
|
||||
if IsEtcdNotFound(err) != isNotFound {
|
||||
t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound)
|
||||
}
|
||||
}
|
||||
try(tools.EtcdErrorNotFound, true)
|
||||
try(&etcd.EtcdError{ErrorCode: 101}, false)
|
||||
try(nil, false)
|
||||
try(fmt.Errorf("some other kind of error"), false)
|
||||
}
|
||||
|
||||
func TestGetEtcdVersion_ValidVersion(t *testing.T) {
|
||||
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprint(w, validEtcdVersion)
|
||||
}))
|
||||
defer testServer.Close()
|
||||
|
||||
var version string
|
||||
var err error
|
||||
if version, err = GetEtcdVersion(testServer.URL); err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
assert.Equal(t, validEtcdVersion, version, "Unexpected version")
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestGetEtcdVersion_ErrorStatus(t *testing.T) {
|
||||
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
}))
|
||||
defer testServer.Close()
|
||||
|
||||
_, err := GetEtcdVersion(testServer.URL)
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestGetEtcdVersion_NotListening(t *testing.T) {
|
||||
portIsOpen := func(port int) bool {
|
||||
conn, err := net.DialTimeout("tcp", "127.0.0.1:"+strconv.Itoa(port), 1*time.Second)
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
port := rand.Intn((1 << 16) - 1)
|
||||
for tried := 0; portIsOpen(port); tried++ {
|
||||
if tried >= 10 {
|
||||
t.Fatal("Couldn't find a closed TCP port to continue testing")
|
||||
}
|
||||
port++
|
||||
}
|
||||
|
||||
_, err := GetEtcdVersion("http://127.0.0.1:" + strconv.Itoa(port))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestEtcdHealthCheck(t *testing.T) {
|
||||
tests := []struct {
|
||||
data string
|
||||
expectErr bool
|
||||
}{
|
||||
{
|
||||
data: "{\"health\": \"true\"}",
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
data: "{\"health\": \"false\"}",
|
||||
expectErr: true,
|
||||
},
|
||||
{
|
||||
data: "invalid json",
|
||||
expectErr: true,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
err := EtcdHealthCheck([]byte(test.data))
|
||||
if err != nil && !test.expectErr {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if err == nil && test.expectErr {
|
||||
t.Error("unexpected non-error")
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue