construct master URIs from MasterInfo.Address if present; prefer /state over /state.json

pull/6/head
James DeFelice 2016-01-12 17:49:19 +00:00
parent e20a0db159
commit ad1803a4ce
1 changed files with 75 additions and 27 deletions

View File

@ -19,6 +19,7 @@ package mesos
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
@ -37,7 +38,7 @@ import (
const defaultClusterName = "mesos"
var noLeadingMasterError = fmt.Errorf("there is no current leading master available to query")
var noLeadingMasterError = errors.New("there is no current leading master available to query")
type mesosClient struct {
masterLock sync.RWMutex
@ -136,17 +137,11 @@ func createMesosClient(
client.state.refill = client.pollMasterForState
first := true
if err := md.Detect(detector.OnMasterChanged(func(info *mesos.MasterInfo) {
client.masterLock.Lock()
defer client.masterLock.Unlock()
if info == nil {
client.master = ""
} else if host := info.GetHostname(); host != "" {
client.master = host
} else {
client.master = unpackIPv4(info.GetIp())
}
if len(client.master) > 0 {
client.master = fmt.Sprintf("%s:%d", client.master, info.GetPort())
host, port := extractMasterAddress(info)
if len(host) > 0 {
client.masterLock.Lock()
defer client.masterLock.Unlock()
client.master = fmt.Sprintf("%s:%d", host, port)
if first {
first = false
close(initialMaster)
@ -160,6 +155,28 @@ func createMesosClient(
return client, nil
}
func extractMasterAddress(info *mesos.MasterInfo) (host string, port int) {
if info != nil {
host = info.GetAddress().GetHostname()
if host == "" {
host = info.GetAddress().GetIp()
}
if host != "" {
// use port from Address
port = int(info.GetAddress().GetPort())
} else {
// deprecated: get host and port directly from MasterInfo (and not Address)
host = info.GetHostname()
if host == "" {
host = unpackIPv4(info.GetIp())
}
port = int(info.GetPort())
}
}
return
}
func unpackIPv4(ip uint32) string {
octets := make([]byte, 4, 4)
binary.BigEndian.PutUint32(octets, ip)
@ -198,20 +215,8 @@ func (c *mesosClient) pollMasterForState(ctx context.Context) (*mesosState, erro
//TODO(jdef) should not assume master uses http (what about https?)
uri := fmt.Sprintf("http://%s/state.json", master)
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, err
}
var state *mesosState
err = c.httpDo(ctx, req, func(res *http.Response, err error) error {
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("HTTP request failed with code %d: %v", res.StatusCode, res.Status)
}
successHandler := func(res *http.Response) error {
blob, err1 := ioutil.ReadAll(res.Body)
if err1 != nil {
return err1
@ -219,8 +224,51 @@ func (c *mesosClient) pollMasterForState(ctx context.Context) (*mesosState, erro
log.V(3).Infof("Got mesos state, content length %v", len(blob))
state, err1 = parseMesosState(blob)
return err1
})
return state, err
}
// thinking here is that we may get some other status codes from mesos at some point:
// - authentication
// - redirection (possibly from http to https)
// ...
for _, tt := range []struct {
uri string
handlers map[int]func(*http.Response) error
}{
{
uri: fmt.Sprintf("http://%s/state", master),
handlers: map[int]func(*http.Response) error{
200: successHandler,
},
},
{
uri: fmt.Sprintf("http://%s/state.json", master),
handlers: map[int]func(*http.Response) error{
200: successHandler,
},
},
} {
req, err := http.NewRequest("GET", tt.uri, nil)
if err != nil {
return nil, err
}
err = c.httpDo(ctx, req, func(res *http.Response, err error) error {
if err != nil {
return err
}
defer res.Body.Close()
if handler, ok := tt.handlers[res.StatusCode]; ok {
err1 := handler(res)
if err1 != nil {
return err1
}
}
// no handler for this error code, proceed to the next connection type
return nil
})
if state != nil || err != nil {
return state, err
}
}
return nil, errors.New("failed to sync with Mesos master")
}
func parseMesosState(blob []byte) (*mesosState, error) {