Merge pull request #6939 from yichengq/bump-go-etcd

*: update go-etcd dependency
pull/6/head
Daniel Smith 2015-04-17 15:46:27 -07:00
commit eefaac26b7
9 changed files with 226 additions and 34 deletions

4
Godeps/Godeps.json generated
View File

@ -74,8 +74,8 @@
}, },
{ {
"ImportPath": "github.com/coreos/go-etcd/etcd", "ImportPath": "github.com/coreos/go-etcd/etcd",
"Comment": "v0.4.6-8-g60e12ca", "Comment": "v2.0.0-3-g0424b5f",
"Rev": "60e12cac3db8ffce00b576b4af0e7b0a968f1003" "Rev": "0424b5f86ef0ca57a5309c599f74bbb3e97ecd9d"
}, },
{ {
"ImportPath": "github.com/coreos/go-systemd/dbus", "ImportPath": "github.com/coreos/go-systemd/dbus",

View File

@ -15,8 +15,6 @@ import (
"path" "path"
"strings" "strings"
"time" "time"
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
) )
// See SetConsistency for how to use these constants. // See SetConsistency for how to use these constants.
@ -44,10 +42,17 @@ type Config struct {
Consistency string `json:"consistency"` Consistency string `json:"consistency"`
} }
type credentials struct {
username string
password string
}
type Client struct { type Client struct {
config Config `json:"config"` config Config `json:"config"`
cluster *Cluster `json:"cluster"` cluster *Cluster `json:"cluster"`
httpClient *http.Client httpClient *http.Client
credentials *credentials
transport *http.Transport
persistence io.Writer persistence io.Writer
cURLch chan string cURLch chan string
// CheckRetry can be used to control the policy for failed requests // CheckRetry can be used to control the policy for failed requests
@ -172,17 +177,27 @@ func NewClientFromReader(reader io.Reader) (*Client, error) {
// Override the Client's HTTP Transport object // Override the Client's HTTP Transport object
func (c *Client) SetTransport(tr *http.Transport) { func (c *Client) SetTransport(tr *http.Transport) {
c.httpClient.Transport = tr c.httpClient.Transport = tr
c.transport = tr
}
func (c *Client) SetCredentials(username, password string) {
c.credentials = &credentials{username, password}
}
func (c *Client) Close() {
c.transport.DisableKeepAlives = true
c.transport.CloseIdleConnections()
} }
// initHTTPClient initializes a HTTP client for etcd client // initHTTPClient initializes a HTTP client for etcd client
func (c *Client) initHTTPClient() { func (c *Client) initHTTPClient() {
tr := &http.Transport{ c.transport = &http.Transport{
Dial: c.dial, Dial: c.dial,
TLSClientConfig: &tls.Config{ TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, InsecureSkipVerify: true,
}, },
} }
c.httpClient = &http.Client{Transport: tr} c.httpClient = &http.Client{Transport: c.transport}
} }
// initHTTPClient initializes a HTTPS client for etcd client // initHTTPClient initializes a HTTPS client for etcd client
@ -305,31 +320,49 @@ func (c *Client) internalSyncCluster(machines []string) bool {
continue continue
} }
b, err := ioutil.ReadAll(resp.Body) if resp.StatusCode != http.StatusOK { // fall-back to old endpoint
resp.Body.Close() httpPath := c.createHttpPath(machine, path.Join(version, "machines"))
if err != nil { resp, err := c.httpClient.Get(httpPath)
// try another machine in the cluster if err != nil {
continue // try another machine in the cluster
} continue
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
// try another machine in the cluster
continue
}
// update Machines List
c.cluster.updateFromStr(string(b))
} else {
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
// try another machine in the cluster
continue
}
var mCollection httptypes.MemberCollection var mCollection memberCollection
if err := json.Unmarshal(b, &mCollection); err != nil { if err := json.Unmarshal(b, &mCollection); err != nil {
// try another machine // try another machine
continue continue
} }
urls := make([]string, 0) urls := make([]string, 0)
for _, m := range mCollection { for _, m := range mCollection {
urls = append(urls, m.ClientURLs...) urls = append(urls, m.ClientURLs...)
} }
// update Machines List // update Machines List
c.cluster.updateFromStr(strings.Join(urls, ",")) c.cluster.updateFromStr(strings.Join(urls, ","))
}
logger.Debug("sync.machines ", c.cluster.Machines) logger.Debug("sync.machines ", c.cluster.Machines)
c.saveConfig() c.saveConfig()
return true return true
} }
return false return false
} }

View File

@ -94,3 +94,15 @@ func TestPersistence(t *testing.T) {
t.Fatalf("The two configs should be equal!") t.Fatalf("The two configs should be equal!")
} }
} }
func TestClientRetry(t *testing.T) {
c := NewClient([]string{"http://strange", "http://127.0.0.1:4001"})
// use first endpoint as the picked url
c.cluster.picked = 0
if _, err := c.Set("foo", "bar", 5); err != nil {
t.Fatal(err)
}
if _, err := c.Delete("foo", true); err != nil {
t.Fatal(err)
}
}

View File

@ -30,5 +30,8 @@ func (cl *Cluster) pick() string { return cl.Machines[cl.picked] }
func (cl *Cluster) updateFromStr(machines string) { func (cl *Cluster) updateFromStr(machines string) {
cl.Machines = strings.Split(machines, ",") cl.Machines = strings.Split(machines, ",")
for i := range cl.Machines {
cl.Machines[i] = strings.TrimSpace(cl.Machines[i])
}
cl.picked = rand.Intn(len(cl.Machines)) cl.picked = rand.Intn(len(cl.Machines))
} }

View File

@ -0,0 +1,30 @@
package etcd
import "encoding/json"
type Member struct {
ID string `json:"id"`
Name string `json:"name"`
PeerURLs []string `json:"peerURLs"`
ClientURLs []string `json:"clientURLs"`
}
type memberCollection []Member
func (c *memberCollection) UnmarshalJSON(data []byte) error {
d := struct {
Members []Member
}{}
if err := json.Unmarshal(data, &d); err != nil {
return err
}
if d.Members == nil {
*c = make([]Member, 0)
return nil
}
*c = d.Members
return nil
}

View File

@ -0,0 +1,71 @@
package etcd
import (
"encoding/json"
"reflect"
"testing"
)
func TestMemberCollectionUnmarshal(t *testing.T) {
tests := []struct {
body []byte
want memberCollection
}{
{
body: []byte(`{"members":[]}`),
want: memberCollection([]Member{}),
},
{
body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
want: memberCollection(
[]Member{
{
ID: "2745e2525fce8fe",
Name: "node3",
PeerURLs: []string{
"http://127.0.0.1:7003",
},
ClientURLs: []string{
"http://127.0.0.1:4003",
},
},
{
ID: "42134f434382925",
Name: "node1",
PeerURLs: []string{
"http://127.0.0.1:2380",
"http://127.0.0.1:7001",
},
ClientURLs: []string{
"http://127.0.0.1:2379",
"http://127.0.0.1:4001",
},
},
{
ID: "94088180e21eb87b",
Name: "node2",
PeerURLs: []string{
"http://127.0.0.1:7002",
},
ClientURLs: []string{
"http://127.0.0.1:4002",
},
},
},
),
},
}
for i, tt := range tests {
var got memberCollection
err := json.Unmarshal(tt.body, &got)
if err != nil {
t.Errorf("#%d: unexpected error: %v", i, err)
continue
}
if !reflect.DeepEqual(tt.want, got) {
t.Errorf("#%d: incorrect output: want=%#v, got=%#v", i, tt.want, got)
}
}
}

View File

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
@ -189,7 +188,10 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath) logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath)
httpPath = c.getHttpPath(rr.RelativePath) // get httpPath if not set
if httpPath == "" {
httpPath = c.getHttpPath(rr.RelativePath)
}
// Return a cURL command if curlChan is set // Return a cURL command if curlChan is set
if c.cURLch != nil { if c.cURLch != nil {
@ -197,6 +199,9 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
for key, value := range rr.Values { for key, value := range rr.Values {
command += fmt.Sprintf(" -d %s=%s", key, value[0]) command += fmt.Sprintf(" -d %s=%s", key, value[0])
} }
if c.credentials != nil {
command += fmt.Sprintf(" -u %s", c.credentials.username)
}
c.sendCURL(command) c.sendCURL(command)
} }
@ -226,7 +231,13 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
return nil, err return nil, err
} }
if c.credentials != nil {
req.SetBasicAuth(c.credentials.username, c.credentials.password)
}
resp, err = c.httpClient.Do(req) resp, err = c.httpClient.Do(req)
// clear previous httpPath
httpPath = ""
defer func() { defer func() {
if resp != nil { if resp != nil {
resp.Body.Close() resp.Body.Close()
@ -281,6 +292,19 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
} }
} }
if resp.StatusCode == http.StatusTemporaryRedirect {
u, err := resp.Location()
if err != nil {
logger.Warning(err)
} else {
// set httpPath for following redirection
httpPath = u.String()
}
resp.Body.Close()
continue
}
if checkErr := checkRetry(c.cluster, numReqs, *resp, if checkErr := checkRetry(c.cluster, numReqs, *resp,
errors.New("Unexpected HTTP status code")); checkErr != nil { errors.New("Unexpected HTTP status code")); checkErr != nil {
return nil, checkErr return nil, checkErr
@ -304,9 +328,8 @@ func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
err error) error { err error) error {
if isEmptyResponse(lastResp) { if isEmptyResponse(lastResp) {
if !isConnectionError(err) { // always retry if it failed to get response from one machine
return err return nil
}
} else if !shouldRetry(lastResp) { } else if !shouldRetry(lastResp) {
body := []byte("nil") body := []byte("nil")
if lastResp.Body != nil { if lastResp.Body != nil {
@ -333,11 +356,6 @@ func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
func isEmptyResponse(r http.Response) bool { return r.StatusCode == 0 } func isEmptyResponse(r http.Response) bool { return r.StatusCode == 0 }
func isConnectionError(err error) bool {
_, ok := err.(*net.OpError)
return ok
}
// shouldRetry returns whether the reponse deserves retry. // shouldRetry returns whether the reponse deserves retry.
func shouldRetry(r http.Response) bool { func shouldRetry(r http.Response) bool {
// TODO: only retry when the cluster is in leader election // TODO: only retry when the cluster is in leader election

View File

@ -0,0 +1,22 @@
package etcd
import "testing"
func TestKeyToPath(t *testing.T) {
tests := []struct {
key string
wpath string
}{
{"", "keys/"},
{"foo", "keys/foo"},
{"foo/bar", "keys/foo/bar"},
{"%z", "keys/%25z"},
{"/", "keys/"},
}
for i, tt := range tests {
path := keyToPath(tt.key)
if path != tt.wpath {
t.Errorf("#%d: path = %s, want %s", i, path, tt.wpath)
}
}
}

View File

@ -1,3 +1,6 @@
package etcd package etcd
const version = "v2" const (
version = "v2"
packageVersion = "v2.0.0+git"
)