Merge pull request #10733 from mesosphere/mesos-go-update

Update godep mesos-go
pull/6/head
Yu-Ju Hong 2015-07-06 14:15:09 -07:00
commit a98be3b275
29 changed files with 1549 additions and 97 deletions

2
Godeps/Godeps.json generated
View File

@ -387,7 +387,7 @@
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/auth", "ImportPath": "github.com/mesos/mesos-go/auth",
"Rev": "4b1767c0dfc51020e01f35da5b38472f40ce572a" "Rev": "6440c09c9d8a1b365f3c3e9b2297dd856abd017c"
}, },
{ {
"ImportPath": "github.com/mesos/mesos-go/detector", "ImportPath": "github.com/mesos/mesos-go/detector",

View File

@ -65,26 +65,27 @@ func TestAuthticatee_validLogin(t *testing.T) {
transport.On("Stop").Return(nil) transport.On("Stop").Return(nil)
transport.On("Send", mock.Anything, &server, &mesos.AuthenticateMessage{ transport.On("Send", mock.Anything, &server, &mesos.AuthenticateMessage{
Pid: proto.String(client.String()), Pid: proto.String(client.String()),
}).Return(nil).Once() }).Return(nil).Run(func(_ mock.Arguments) {
transport.Recv(&server, &mesos.AuthenticationMechanismsMessage{
Mechanisms: []string{crammd5.Name},
})
}).Once()
transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStartMessage{ transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStartMessage{
Mechanism: proto.String(crammd5.Name), Mechanism: proto.String(crammd5.Name),
Data: proto.String(""), // may be nil, depends on init step Data: proto.String(""), // may be nil, depends on init step
}).Return(nil).Once() }).Return(nil).Run(func(_ mock.Arguments) {
transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStepMessage{
Data: []byte(`foo cc7fd96cd80123ea844a7dba29a594ed`),
}).Return(nil).Once()
go func() {
transport.Recv(&server, &mesos.AuthenticationMechanismsMessage{
Mechanisms: []string{crammd5.Name},
})
transport.Recv(&server, &mesos.AuthenticationStepMessage{ transport.Recv(&server, &mesos.AuthenticationStepMessage{
Data: []byte(`lsd;lfkgjs;dlfkgjs;dfklg`), Data: []byte(`lsd;lfkgjs;dlfkgjs;dfklg`),
}) })
}).Once()
transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStepMessage{
Data: []byte(`foo cc7fd96cd80123ea844a7dba29a594ed`),
}).Return(nil).Run(func(_ mock.Arguments) {
transport.Recv(&server, &mesos.AuthenticationCompletedMessage{}) transport.Recv(&server, &mesos.AuthenticationCompletedMessage{})
}() }).Once()
return transport return transport
}) })
login, err := makeAuthenticatee(handler, factory) login, err := makeAuthenticatee(handler, factory)

View File

@ -130,7 +130,7 @@ func CreateMasterInfo(pid *upid.UPID) *mesos.MasterInfo {
// This is needed for the people cross-compiling from macos to linux. // This is needed for the people cross-compiling from macos to linux.
// The cross-compiled version of net.LookupIP() fails to handle plain IPs. // The cross-compiled version of net.LookupIP() fails to handle plain IPs.
// See https://github.com/mesos/mesos-go/pull/117 // See https://github.com/mesos/mesos-go/pull/117
} else if addrs, err := net.LookupIP(pid.Host); err == nil { } else if addrs, err := net.LookupIP(pid.Host); err == nil {
for _, ip := range addrs { for _, ip := range addrs {
if ip = ip.To4(); ip != nil { if ip = ip.To4(); ip != nil {
ipv4 = ip ipv4 = ip

View File

@ -27,6 +27,21 @@ type MasterChanged interface {
OnMasterChanged(*mesos.MasterInfo) OnMasterChanged(*mesos.MasterInfo)
} }
// AllMasters defines an optional interface that, if implemented by the same
// struct as implements MasterChanged, will receive an additional callbacks
// independently of leadership changes. it's possible that, as a result of a
// leadership change, both the OnMasterChanged and UpdatedMasters callbacks
// would be invoked.
//
// **NOTE:** Detector implementations are not required to support this optional
// interface. Please RTFM of the detector implementation that you want to use.
type AllMasters interface {
// UpdatedMasters is invoked upon a change in the membership of mesos
// masters, and is useful to clients that wish to know the entire set
// of Mesos masters currently running.
UpdatedMasters([]*mesos.MasterInfo)
}
// func/interface adapter // func/interface adapter
type OnMasterChanged func(*mesos.MasterInfo) type OnMasterChanged func(*mesos.MasterInfo)

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"os" "os"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -237,9 +238,9 @@ func TestWatchChildren_flappy(t *testing.T) {
})) }))
go c.connect() go c.connect()
watchChildrenCount := 0 var watchChildrenCount uint64
watcherFunc := ChildWatcher(func(zkc *Client, path string) { watcherFunc := ChildWatcher(func(zkc *Client, path string) {
log.V(1).Infof("ChildWatcher invoked %d", watchChildrenCount) log.V(1).Infof("ChildWatcher invoked %d", atomic.LoadUint64(&watchChildrenCount))
}) })
startTime := time.Now() startTime := time.Now()
endTime := startTime.Add(2 * time.Second) endTime := startTime.Add(2 * time.Second)
@ -252,7 +253,7 @@ watcherLoop:
if _, err := c.watchChildren(currentPath, watcherFunc); err == nil { if _, err := c.watchChildren(currentPath, watcherFunc); err == nil {
// watching children succeeded!! // watching children succeeded!!
t.Logf("child watch success") t.Logf("child watch success")
watchChildrenCount++ atomic.AddUint64(&watchChildrenCount, 1)
} else { } else {
// setting the watch failed // setting the watch failed
t.Logf("setting child watch failed: %v", err) t.Logf("setting child watch failed: %v", err)
@ -264,7 +265,9 @@ watcherLoop:
case <-time.After(endTime.Sub(time.Now())): case <-time.After(endTime.Sub(time.Now())):
} }
} }
assert.Equal(t, 5, watchChildrenCount, "expected watchChildrenCount = 5 instead of %d, should be reinvoked upon initial ChildrenW failures", watchChildrenCount)
wantChildrenCount := atomic.LoadUint64(&watchChildrenCount)
assert.Equal(t, uint64(5), wantChildrenCount, "expected watchChildrenCount = 5 instead of %d, should be reinvoked upon initial ChildrenW failures", wantChildrenCount)
} }
func makeClient() (*Client, error) { func makeClient() (*Client, error) {

View File

@ -112,8 +112,12 @@ func (md *MasterDetector) childrenChanged(zkc *Client, path string, obs detector
return return
} }
topNode := selectTopNode(list) md.notifyMasterChanged(path, list, obs)
md.notifyAllMasters(path, list, obs)
}
func (md *MasterDetector) notifyMasterChanged(path string, list []string, obs detector.MasterChanged) {
topNode := selectTopNode(list)
if md.leaderNode == topNode { if md.leaderNode == topNode {
log.V(2).Infof("ignoring children-changed event, leader has not changed: %v", path) log.V(2).Infof("ignoring children-changed event, leader has not changed: %v", path)
return return
@ -124,21 +128,57 @@ func (md *MasterDetector) childrenChanged(zkc *Client, path string, obs detector
var masterInfo *mesos.MasterInfo var masterInfo *mesos.MasterInfo
if md.leaderNode != "" { if md.leaderNode != "" {
data, err := zkc.data(fmt.Sprintf("%s/%s", path, topNode)) var err error
if err != nil { if masterInfo, err = md.pullMasterInfo(path, topNode); err != nil {
log.Errorf("unable to retrieve leader data: %v", err.Error()) log.Errorln(err.Error())
return
}
masterInfo = new(mesos.MasterInfo)
err = proto.Unmarshal(data, masterInfo)
if err != nil {
log.Errorf("unable to unmarshall MasterInfo data from zookeeper: %v", err)
return
} }
} }
log.V(2).Infof("detected master info: %+v", masterInfo) log.V(2).Infof("detected master info: %+v", masterInfo)
obs.OnMasterChanged(masterInfo) logPanic(func() { obs.OnMasterChanged(masterInfo) })
}
// logPanic safely executes the given func, recovering from and logging a panic if one occurs.
func logPanic(f func()) {
defer func() {
if r := recover(); r != nil {
log.Errorf("recovered from client panic: %v", r)
}
}()
f()
}
func (md *MasterDetector) pullMasterInfo(path, node string) (*mesos.MasterInfo, error) {
data, err := md.client.data(fmt.Sprintf("%s/%s", path, node))
if err != nil {
return nil, fmt.Errorf("failed to retrieve leader data: %v", err)
}
masterInfo := &mesos.MasterInfo{}
err = proto.Unmarshal(data, masterInfo)
if err != nil {
return nil, fmt.Errorf("failed to unmarshall MasterInfo data from zookeeper: %v", err)
}
return masterInfo, nil
}
func (md *MasterDetector) notifyAllMasters(path string, list []string, obs detector.MasterChanged) {
all, ok := obs.(detector.AllMasters)
if !ok {
// not interested in entire master list
return
}
masters := []*mesos.MasterInfo{}
for _, node := range list {
info, err := md.pullMasterInfo(path, node)
if err != nil {
log.Errorln(err.Error())
} else {
masters = append(masters, info)
}
}
log.V(2).Infof("notifying of master membership change: %+v", masters)
logPanic(func() { all.UpdatedMasters(masters) })
} }
// the first call to Detect will kickstart a connection to zookeeper. a nil change listener may // the first call to Detect will kickstart a connection to zookeeper. a nil change listener may

View File

@ -8,11 +8,13 @@ import (
"testing" "testing"
"time" "time"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog" log "github.com/golang/glog"
"github.com/mesos/mesos-go/detector" "github.com/mesos/mesos-go/detector"
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/samuel/go-zookeeper/zk" "github.com/samuel/go-zookeeper/zk"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
) )
const ( const (
@ -324,10 +326,10 @@ func TestMasterDetectMultiple(t *testing.T) {
// **** Test 4 consecutive ChildrenChangedEvents ****** // **** Test 4 consecutive ChildrenChangedEvents ******
// setup event changes // setup event changes
sequences := [][]string{ sequences := [][]string{
[]string{"info_014", "info_010", "info_005"}, {"info_014", "info_010", "info_005"},
[]string{"info_005", "info_004", "info_022"}, {"info_005", "info_004", "info_022"},
[]string{}, // indicates no master {}, // indicates no master
[]string{"info_017", "info_099", "info_200"}, {"info_017", "info_099", "info_200"},
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -425,3 +427,138 @@ func TestMasterDetect_selectTopNode_mixedEntries(t *testing.T) {
node := selectTopNode(nodeList) node := selectTopNode(nodeList)
assert.Equal("info_0000000032", node) assert.Equal("info_0000000032", node)
} }
// implements MasterChanged and AllMasters extension
type allMastersListener struct {
mock.Mock
}
func (a *allMastersListener) OnMasterChanged(mi *mesos.MasterInfo) {
a.Called(mi)
}
func (a *allMastersListener) UpdatedMasters(mi []*mesos.MasterInfo) {
a.Called(mi)
}
func afterFunc(f func()) <-chan struct{} {
ch := make(chan struct{})
go func() {
defer close(ch)
f()
}()
return ch
}
func fatalAfter(t *testing.T, d time.Duration, f func(), msg string, args ...interface{}) {
ch := afterFunc(f)
select {
case <-ch:
return
case <-time.After(d):
t.Fatalf(msg, args...)
}
}
func TestNotifyAllMasters(t *testing.T) {
c, err := newClient(test_zk_hosts, test_zk_path)
assert.NoError(t, err)
childEvents := make(chan zk.Event, 5)
connector := NewMockConnector()
c.setFactory(asFactory(func() (Connector, <-chan zk.Event, error) {
sessionEvents := make(chan zk.Event, 1)
sessionEvents <- zk.Event{
Type: zk.EventSession,
State: zk.StateConnected,
}
return connector, sessionEvents, nil
}))
md, err := NewMasterDetector(zkurl)
defer md.Cancel()
assert.NoError(t, err)
c.errorHandler = ErrorHandler(func(c *Client, e error) {
t.Fatalf("unexpected error: %v", e)
})
md.client = c
listener := &allMastersListener{}
//-- expect primer
var primer sync.WaitGroup
ignoreArgs := func(f func()) func(mock.Arguments) {
primer.Add(1)
return func(_ mock.Arguments) {
f()
}
}
connector.On("Children", test_zk_path).Return([]string{}, &zk.Stat{}, nil).Run(ignoreArgs(primer.Done)).Once()
listener.On("UpdatedMasters", []*mesos.MasterInfo{}).Return().Run(ignoreArgs(primer.Done)).Once()
connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(childEvents), nil).Run(ignoreArgs(primer.Done)).Once()
md.Detect(listener)
fatalAfter(t, 3*time.Second, primer.Wait, "timed out waiting for detection primer")
listener.AssertExpectations(t)
connector.AssertExpectations(t)
//-- test membership changes
type expectedGets struct {
info []byte
err error
}
tt := []struct {
zkEntry []string
gets []expectedGets
leaderIdx int
}{
{[]string{"info_004"}, []expectedGets{{newTestMasterInfo(1), nil}}, 0},
{[]string{"info_007", "info_005", "info_006"}, []expectedGets{{newTestMasterInfo(2), nil}, {newTestMasterInfo(3), nil}, {newTestMasterInfo(4), nil}}, 1},
{nil, nil, -1},
}
for j, tc := range tt {
// expectations
var tcwait sync.WaitGroup
ignoreArgs = func(f func()) func(mock.Arguments) {
tcwait.Add(1)
return func(_ mock.Arguments) {
f()
}
}
expectedInfos := []*mesos.MasterInfo{}
for i, zke := range tc.zkEntry {
connector.On("Get", fmt.Sprintf("%s/%s", test_zk_path, zke)).Return(tc.gets[i].info, &zk.Stat{}, tc.gets[i].err).Run(ignoreArgs(tcwait.Done)).Once()
masterInfo := &mesos.MasterInfo{}
err = proto.Unmarshal(tc.gets[i].info, masterInfo)
if err != nil {
t.Fatalf("failed to unmarshall MasterInfo data: %v", err)
}
expectedInfos = append(expectedInfos, masterInfo)
}
if len(tc.zkEntry) > 0 {
connector.On("Get", fmt.Sprintf("%s/%s", test_zk_path, tc.zkEntry[tc.leaderIdx])).Return(
tc.gets[tc.leaderIdx].info, &zk.Stat{}, tc.gets[tc.leaderIdx].err).Run(ignoreArgs(tcwait.Done)).Once()
}
connector.On("Children", test_zk_path).Return(tc.zkEntry, &zk.Stat{}, nil).Run(ignoreArgs(tcwait.Done)).Once()
listener.On("OnMasterChanged", mock.AnythingOfType("*mesosproto.MasterInfo")).Return().Run(ignoreArgs(tcwait.Done)).Once()
listener.On("UpdatedMasters", expectedInfos).Return().Run(ignoreArgs(tcwait.Done)).Once()
connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(childEvents), nil).Run(ignoreArgs(tcwait.Done)).Once()
// fire the event that triggers the test case
childEvents <- zk.Event{
Type: zk.EventNodeChildrenChanged,
Path: test_zk_path,
}
// allow plenty of time for all the async processing to happen
fatalAfter(t, 5*time.Second, tcwait.Wait, "timed out waiting for all-masters test case %d", j+1)
listener.AssertExpectations(t)
connector.AssertExpectations(t)
}
connector.On("Close").Return(nil)
}

View File

@ -0,0 +1,3 @@
// Zookeeper-based mesos-master leaderhip detection.
// Implements support for optional detector.AllMasters interface.
package zoo

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,121 @@
// +build example-exec
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"flag"
"fmt"
exec "github.com/mesos/mesos-go/executor"
mesos "github.com/mesos/mesos-go/mesosproto"
)
type exampleExecutor struct {
tasksLaunched int
}
func newExampleExecutor() *exampleExecutor {
return &exampleExecutor{tasksLaunched: 0}
}
func (exec *exampleExecutor) Registered(driver exec.ExecutorDriver, execInfo *mesos.ExecutorInfo, fwinfo *mesos.FrameworkInfo, slaveInfo *mesos.SlaveInfo) {
fmt.Println("Registered Executor on slave ", slaveInfo.GetHostname())
}
func (exec *exampleExecutor) Reregistered(driver exec.ExecutorDriver, slaveInfo *mesos.SlaveInfo) {
fmt.Println("Re-registered Executor on slave ", slaveInfo.GetHostname())
}
func (exec *exampleExecutor) Disconnected(exec.ExecutorDriver) {
fmt.Println("Executor disconnected.")
}
func (exec *exampleExecutor) LaunchTask(driver exec.ExecutorDriver, taskInfo *mesos.TaskInfo) {
fmt.Println("Launching task", taskInfo.GetName(), "with command", taskInfo.Command.GetValue())
runStatus := &mesos.TaskStatus{
TaskId: taskInfo.GetTaskId(),
State: mesos.TaskState_TASK_RUNNING.Enum(),
}
_, err := driver.SendStatusUpdate(runStatus)
if err != nil {
fmt.Println("Got error", err)
}
exec.tasksLaunched++
fmt.Println("Total tasks launched ", exec.tasksLaunched)
//
// this is where one would perform the requested task
//
// finish task
fmt.Println("Finishing task", taskInfo.GetName())
finStatus := &mesos.TaskStatus{
TaskId: taskInfo.GetTaskId(),
State: mesos.TaskState_TASK_FINISHED.Enum(),
}
_, err = driver.SendStatusUpdate(finStatus)
if err != nil {
fmt.Println("Got error", err)
}
fmt.Println("Task finished", taskInfo.GetName())
}
func (exec *exampleExecutor) KillTask(exec.ExecutorDriver, *mesos.TaskID) {
fmt.Println("Kill task")
}
func (exec *exampleExecutor) FrameworkMessage(driver exec.ExecutorDriver, msg string) {
fmt.Println("Got framework message: ", msg)
}
func (exec *exampleExecutor) Shutdown(exec.ExecutorDriver) {
fmt.Println("Shutting down the executor")
}
func (exec *exampleExecutor) Error(driver exec.ExecutorDriver, err string) {
fmt.Println("Got error message:", err)
}
// -------------------------- func inits () ----------------- //
func init() {
flag.Parse()
}
func main() {
fmt.Println("Starting Example Executor (Go)")
dconfig := exec.DriverConfig{
Executor: newExampleExecutor(),
}
driver, err := exec.NewMesosExecutorDriver(dconfig)
if err != nil {
fmt.Println("Unable to create a ExecutorDriver ", err.Error())
}
_, err = driver.Start()
if err != nil {
fmt.Println("Got error:", err)
return
}
fmt.Println("Executor process has started and running.")
driver.Join()
}

View File

@ -0,0 +1,294 @@
// +build example-sched
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"flag"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/auth"
"github.com/mesos/mesos-go/auth/sasl"
"github.com/mesos/mesos-go/auth/sasl/mech"
mesos "github.com/mesos/mesos-go/mesosproto"
util "github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"golang.org/x/net/context"
)
const (
CPUS_PER_TASK = 1
MEM_PER_TASK = 128
defaultArtifactPort = 12345
)
var (
address = flag.String("address", "127.0.0.1", "Binding address for artifact server")
artifactPort = flag.Int("artifactPort", defaultArtifactPort, "Binding port for artifact server")
authProvider = flag.String("mesos_authentication_provider", sasl.ProviderName,
fmt.Sprintf("Authentication provider to use, default is SASL that supports mechanisms: %+v", mech.ListSupported()))
master = flag.String("master", "127.0.0.1:5050", "Master address <ip:port>")
executorPath = flag.String("executor", "./example_executor", "Path to test executor")
taskCount = flag.String("task-count", "5", "Total task count to run.")
mesosAuthPrincipal = flag.String("mesos_authentication_principal", "", "Mesos authentication principal.")
mesosAuthSecretFile = flag.String("mesos_authentication_secret_file", "", "Mesos authentication secret file.")
)
type ExampleScheduler struct {
executor *mesos.ExecutorInfo
tasksLaunched int
tasksFinished int
totalTasks int
}
func newExampleScheduler(exec *mesos.ExecutorInfo) *ExampleScheduler {
total, err := strconv.Atoi(*taskCount)
if err != nil {
total = 5
}
return &ExampleScheduler{
executor: exec,
tasksLaunched: 0,
tasksFinished: 0,
totalTasks: total,
}
}
func (sched *ExampleScheduler) Registered(driver sched.SchedulerDriver, frameworkId *mesos.FrameworkID, masterInfo *mesos.MasterInfo) {
log.Infoln("Framework Registered with Master ", masterInfo)
}
func (sched *ExampleScheduler) Reregistered(driver sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
log.Infoln("Framework Re-Registered with Master ", masterInfo)
}
func (sched *ExampleScheduler) Disconnected(sched.SchedulerDriver) {}
func (sched *ExampleScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
for _, offer := range offers {
cpuResources := util.FilterResources(offer.Resources, func(res *mesos.Resource) bool {
return res.GetName() == "cpus"
})
cpus := 0.0
for _, res := range cpuResources {
cpus += res.GetScalar().GetValue()
}
memResources := util.FilterResources(offer.Resources, func(res *mesos.Resource) bool {
return res.GetName() == "mem"
})
mems := 0.0
for _, res := range memResources {
mems += res.GetScalar().GetValue()
}
log.Infoln("Received Offer <", offer.Id.GetValue(), "> with cpus=", cpus, " mem=", mems)
remainingCpus := cpus
remainingMems := mems
var tasks []*mesos.TaskInfo
for sched.tasksLaunched < sched.totalTasks &&
CPUS_PER_TASK <= remainingCpus &&
MEM_PER_TASK <= remainingMems {
sched.tasksLaunched++
taskId := &mesos.TaskID{
Value: proto.String(strconv.Itoa(sched.tasksLaunched)),
}
task := &mesos.TaskInfo{
Name: proto.String("go-task-" + taskId.GetValue()),
TaskId: taskId,
SlaveId: offer.SlaveId,
Executor: sched.executor,
Resources: []*mesos.Resource{
util.NewScalarResource("cpus", CPUS_PER_TASK),
util.NewScalarResource("mem", MEM_PER_TASK),
},
}
log.Infof("Prepared task: %s with offer %s for launch\n", task.GetName(), offer.Id.GetValue())
tasks = append(tasks, task)
remainingCpus -= CPUS_PER_TASK
remainingMems -= MEM_PER_TASK
}
log.Infoln("Launching ", len(tasks), "tasks for offer", offer.Id.GetValue())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, &mesos.Filters{RefuseSeconds: proto.Float64(1)})
}
}
func (sched *ExampleScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Infoln("Status update: task", status.TaskId.GetValue(), " is in state ", status.State.Enum().String())
if status.GetState() == mesos.TaskState_TASK_FINISHED {
sched.tasksFinished++
}
if sched.tasksFinished >= sched.totalTasks {
log.Infoln("Total tasks completed, stopping framework.")
driver.Stop(false)
}
if status.GetState() == mesos.TaskState_TASK_LOST ||
status.GetState() == mesos.TaskState_TASK_KILLED ||
status.GetState() == mesos.TaskState_TASK_FAILED {
log.Infoln(
"Aborting because task", status.TaskId.GetValue(),
"is in unexpected state", status.State.String(),
"with message", status.GetMessage(),
)
driver.Abort()
}
}
func (sched *ExampleScheduler) OfferRescinded(sched.SchedulerDriver, *mesos.OfferID) {}
func (sched *ExampleScheduler) FrameworkMessage(sched.SchedulerDriver, *mesos.ExecutorID, *mesos.SlaveID, string) {
}
func (sched *ExampleScheduler) SlaveLost(sched.SchedulerDriver, *mesos.SlaveID) {}
func (sched *ExampleScheduler) ExecutorLost(sched.SchedulerDriver, *mesos.ExecutorID, *mesos.SlaveID, int) {
}
func (sched *ExampleScheduler) Error(driver sched.SchedulerDriver, err string) {
log.Infoln("Scheduler received error:", err)
}
// ----------------------- func init() ------------------------- //
func init() {
flag.Parse()
log.Infoln("Initializing the Example Scheduler...")
}
// returns (downloadURI, basename(path))
func serveExecutorArtifact(path string) (*string, string) {
serveFile := func(pattern string, filename string) {
http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, filename)
})
}
// Create base path (http://foobar:5000/<base>)
pathSplit := strings.Split(path, "/")
var base string
if len(pathSplit) > 0 {
base = pathSplit[len(pathSplit)-1]
} else {
base = path
}
serveFile("/"+base, path)
hostURI := fmt.Sprintf("http://%s:%d/%s", *address, *artifactPort, base)
log.V(2).Infof("Hosting artifact '%s' at '%s'", path, hostURI)
return &hostURI, base
}
func prepareExecutorInfo() *mesos.ExecutorInfo {
executorUris := []*mesos.CommandInfo_URI{}
uri, executorCmd := serveExecutorArtifact(*executorPath)
executorUris = append(executorUris, &mesos.CommandInfo_URI{Value: uri, Executable: proto.Bool(true)})
executorCommand := fmt.Sprintf("./%s", executorCmd)
go http.ListenAndServe(fmt.Sprintf("%s:%d", *address, *artifactPort), nil)
log.V(2).Info("Serving executor artifacts...")
// Create mesos scheduler driver.
return &mesos.ExecutorInfo{
ExecutorId: util.NewExecutorID("default"),
Name: proto.String("Test Executor (Go)"),
Source: proto.String("go_test"),
Command: &mesos.CommandInfo{
Value: proto.String(executorCommand),
Uris: executorUris,
},
}
}
func parseIP(address string) net.IP {
addr, err := net.LookupIP(address)
if err != nil {
log.Fatal(err)
}
if len(addr) < 1 {
log.Fatalf("failed to parse IP from address '%v'", address)
}
return addr[0]
}
// ----------------------- func main() ------------------------- //
func main() {
// build command executor
exec := prepareExecutorInfo()
// the framework
fwinfo := &mesos.FrameworkInfo{
User: proto.String(""), // Mesos-go will fill in user.
Name: proto.String("Test Framework (Go)"),
}
cred := (*mesos.Credential)(nil)
if *mesosAuthPrincipal != "" {
fwinfo.Principal = proto.String(*mesosAuthPrincipal)
secret, err := ioutil.ReadFile(*mesosAuthSecretFile)
if err != nil {
log.Fatal(err)
}
cred = &mesos.Credential{
Principal: proto.String(*mesosAuthPrincipal),
Secret: secret,
}
}
bindingAddress := parseIP(*address)
config := sched.DriverConfig{
Scheduler: newExampleScheduler(exec),
Framework: fwinfo,
Master: *master,
Credential: cred,
BindingAddress: bindingAddress,
WithAuthContext: func(ctx context.Context) context.Context {
ctx = auth.WithLoginProvider(ctx, *authProvider)
ctx = sasl.WithBindingAddress(ctx, bindingAddress)
return ctx
},
}
driver, err := sched.NewMesosSchedulerDriver(config)
if err != nil {
log.Errorln("Unable to create a SchedulerDriver ", err.Error())
}
if stat, err := driver.Run(); err != nil {
log.Infof("Framework stopped with status %s and error: %s\n", stat.String(), err.Error())
}
}

View File

@ -185,6 +185,8 @@ func (driver *MesosExecutorDriver) setStatus(stat mesosproto.Status) {
} }
func (driver *MesosExecutorDriver) Stopped() bool { func (driver *MesosExecutorDriver) Stopped() bool {
driver.lock.RLock()
defer driver.lock.RUnlock()
return driver.stopped return driver.stopped
} }
@ -195,6 +197,8 @@ func (driver *MesosExecutorDriver) setStopped(val bool) {
} }
func (driver *MesosExecutorDriver) Connected() bool { func (driver *MesosExecutorDriver) Connected() bool {
driver.lock.RLock()
defer driver.lock.RUnlock()
return driver.connected return driver.connected
} }
@ -338,6 +342,8 @@ func (driver *MesosExecutorDriver) statusUpdateAcknowledgement(from *upid.UPID,
taskID := msg.GetTaskId() taskID := msg.GetTaskId()
uuid := uuid.UUID(msg.GetUuid()) uuid := uuid.UUID(msg.GetUuid())
driver.lock.Lock()
defer driver.lock.Unlock()
if driver.stopped { if driver.stopped {
log.Infof("Ignoring status update acknowledgement %v for task %v of framework %v because the driver is stopped!\n", log.Infof("Ignoring status update acknowledgement %v for task %v of framework %v because the driver is stopped!\n",
uuid, taskID, frameworkID) uuid, taskID, frameworkID)
@ -526,7 +532,9 @@ func (driver *MesosExecutorDriver) SendStatusUpdate(taskStatus *mesosproto.TaskS
log.Infof("Executor sending status update %v\n", update.String()) log.Infof("Executor sending status update %v\n", update.String())
// Capture the status update. // Capture the status update.
driver.lock.Lock()
driver.updates[uuid.UUID(update.GetUuid()).String()] = update driver.updates[uuid.UUID(update.GetUuid()).String()] = update
driver.lock.Unlock()
// Put the status update in the message. // Put the status update in the message.
message := &mesosproto.StatusUpdateMessage{ message := &mesosproto.StatusUpdateMessage{

View File

@ -193,7 +193,7 @@ func TestExecutorDriverExecutorRegisteredEvent(t *testing.T) {
} }
c := testutil.NewMockMesosClient(t, server.PID) c := testutil.NewMockMesosClient(t, server.PID)
c.SendMessage(driver.self, pbMsg) c.SendMessage(driver.self, pbMsg)
assert.True(t, driver.connected) assert.True(t, driver.Connected())
select { select {
case <-ch: case <-ch:
case <-time.After(time.Millisecond * 2): case <-time.After(time.Millisecond * 2):

View File

@ -98,8 +98,10 @@ func TestExecutorDriverStartFailedToParseEnvironment(t *testing.T) {
clearEnvironments(t) clearEnvironments(t)
exec := NewMockedExecutor() exec := NewMockedExecutor()
exec.On("Error").Return(nil) exec.On("Error").Return(nil)
driver := newTestExecutorDriver(t, exec) dconfig := DriverConfig{Executor: exec}
driver, err := NewMesosExecutorDriver(dconfig)
assert.Nil(t, driver) assert.Nil(t, driver)
assert.Error(t, err)
} }
func TestExecutorDriverStartFailedToStartMessenger(t *testing.T) { func TestExecutorDriverStartFailedToStartMessenger(t *testing.T) {
@ -206,7 +208,7 @@ func TestExecutorDriverRun(t *testing.T) {
assert.Equal(t, mesosproto.Status_DRIVER_STOPPED, stat) assert.Equal(t, mesosproto.Status_DRIVER_STOPPED, stat)
}() }()
time.Sleep(time.Millisecond * 1) // allow for things to settle time.Sleep(time.Millisecond * 1) // allow for things to settle
assert.False(t, driver.stopped) assert.False(t, driver.Stopped())
assert.Equal(t, mesosproto.Status_DRIVER_RUNNING, driver.Status()) assert.Equal(t, mesosproto.Status_DRIVER_RUNNING, driver.Status())
// mannually close it all // mannually close it all
@ -394,3 +396,19 @@ func TestExecutorDriverSendFrameworkMessage(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, mesosproto.Status_DRIVER_RUNNING, stat) assert.Equal(t, mesosproto.Status_DRIVER_RUNNING, stat)
} }
func TestStatusUpdateAckRace_Issue103(t *testing.T) {
driver, _, _ := createTestExecutorDriver(t)
_, err := driver.Start()
assert.NoError(t, err)
msg := &mesosproto.StatusUpdateAcknowledgementMessage{}
go driver.statusUpdateAcknowledgement(nil, msg)
taskStatus := util.NewTaskStatus(
util.NewTaskID("test-task-001"),
mesosproto.TaskState_TASK_STAGING,
)
driver.SendStatusUpdate(taskStatus)
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package healthchecker
import (
"time"
"github.com/mesos/mesos-go/upid"
)
// HealthChecker defines the interface of a health checker.
type HealthChecker interface {
// Start will start the health checker, and returns a notification channel.
// if the checker thinks the slave is unhealthy, it will send the timestamp
// via the channel.
Start() <-chan time.Time
// Pause will pause the slave health checker.
Pause()
// Continue will continue the slave health checker with a new slave upid.
Continue(slaveUPID *upid.UPID)
// Stop will stop the health checker. it should be called only once during
// the life span of the checker.
Stop()
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package healthchecker
import (
"time"
"github.com/mesos/mesos-go/upid"
"github.com/stretchr/testify/mock"
)
type MockedHealthChecker struct {
mock.Mock
ch chan time.Time
}
// NewMockedHealthChecker returns a new mocked health checker.
func NewMockedHealthChecker() *MockedHealthChecker {
return &MockedHealthChecker{ch: make(chan time.Time, 1)}
}
// Start will start the checker and returns the notification channel.
func (m *MockedHealthChecker) Start() <-chan time.Time {
m.Called()
return m.ch
}
// Pause will pause the slave health checker.
func (m *MockedHealthChecker) Pause() {
m.Called()
}
// Continue will continue the slave health checker with a new slave upid.
func (m *MockedHealthChecker) Continue(slaveUPID *upid.UPID) {
m.Called()
}
// Stop will stop the checker.
func (m *MockedHealthChecker) Stop() {
m.Called()
}
func (m *MockedHealthChecker) TriggerUnhealthyEvent() {
m.ch <- time.Now()
}

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package healthchecker
import (
"fmt"
"net/http"
"sync"
"time"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/upid"
)
const (
defaultTimeout = time.Second
defaultCheckDuration = time.Second
defaultThreshold = 5
)
// SlaveHealthChecker is for checking the slave's health.
type SlaveHealthChecker struct {
sync.RWMutex
slaveUPID *upid.UPID
client *http.Client
threshold int
checkDuration time.Duration
continuousUnhealthyCount int
stop chan struct{}
ch chan time.Time
paused bool
}
// NewSlaveHealthChecker creates a slave health checker and return a notification channel.
// Each time the checker thinks the slave is unhealthy, it will send a notification through the channel.
func NewSlaveHealthChecker(slaveUPID *upid.UPID, threshold int, checkDuration time.Duration, timeout time.Duration) *SlaveHealthChecker {
checker := &SlaveHealthChecker{
slaveUPID: slaveUPID,
client: &http.Client{Timeout: timeout},
threshold: threshold,
checkDuration: checkDuration,
stop: make(chan struct{}),
ch: make(chan time.Time, 1),
}
if timeout == 0 {
checker.client.Timeout = defaultTimeout
}
if checkDuration == 0 {
checker.checkDuration = defaultCheckDuration
}
if threshold <= 0 {
checker.threshold = defaultThreshold
}
return checker
}
// Start will start the health checker and returns the notification channel.
func (s *SlaveHealthChecker) Start() <-chan time.Time {
go func() {
ticker := time.Tick(s.checkDuration)
for {
select {
case <-ticker:
s.RLock()
if !s.paused {
s.doCheck()
}
s.RUnlock()
case <-s.stop:
return
}
}
}()
return s.ch
}
// Pause will pause the slave health checker.
func (s *SlaveHealthChecker) Pause() {
s.Lock()
defer s.Unlock()
s.paused = true
}
// Continue will continue the slave health checker with a new slave upid.
func (s *SlaveHealthChecker) Continue(slaveUPID *upid.UPID) {
s.Lock()
defer s.Unlock()
s.paused = false
s.slaveUPID = slaveUPID
}
// Stop will stop the slave health checker.
// It should be called only once during the life span of the checker.
func (s *SlaveHealthChecker) Stop() {
close(s.stop)
}
func (s *SlaveHealthChecker) doCheck() {
path := fmt.Sprintf("http://%s:%s/%s/health", s.slaveUPID.Host, s.slaveUPID.Port, s.slaveUPID.ID)
resp, err := s.client.Head(path)
unhealthy := false
if err != nil {
log.Errorf("Failed to request the health path: %v\n", err)
unhealthy = true
} else if resp.StatusCode != http.StatusOK {
log.Errorf("Failed to request the health path: status: %v\n", resp.StatusCode)
unhealthy = true
}
if unhealthy {
s.continuousUnhealthyCount++
if s.continuousUnhealthyCount >= s.threshold {
select {
case s.ch <- time.Now(): // If no one is receiving the channel, then just skip it.
default:
}
s.continuousUnhealthyCount = 0
}
return
}
s.continuousUnhealthyCount = 0
resp.Body.Close()
}

View File

@ -0,0 +1,262 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package healthchecker
import (
"fmt"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/mesos/mesos-go/upid"
"github.com/stretchr/testify/assert"
)
type thresholdMonitor struct {
cnt int32
threshold int32
}
func newThresholdMonitor(threshold int) *thresholdMonitor {
return &thresholdMonitor{threshold: int32(threshold)}
}
// incAndTest returns true if the threshold is reached.
func (t *thresholdMonitor) incAndTest() bool {
if atomic.AddInt32(&t.cnt, 1) >= t.threshold {
return false
}
return true
}
// blockedServer replies only threshold times, after that
// it will block.
type blockedServer struct {
th *thresholdMonitor
ch chan struct{}
}
func newBlockedServer(threshold int) *blockedServer {
return &blockedServer{
th: newThresholdMonitor(threshold),
ch: make(chan struct{}),
}
}
func (s *blockedServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.th.incAndTest() {
return
}
<-s.ch
}
func (s *blockedServer) stop() {
close(s.ch)
}
// eofServer will close the connection after it replies for threshold times.
// Thus the health checker will get an EOF error.
type eofServer struct {
th *thresholdMonitor
}
func newEOFServer(threshold int) *eofServer {
return &eofServer{newThresholdMonitor(threshold)}
}
func (s *eofServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.th.incAndTest() {
return
}
hj := w.(http.Hijacker)
conn, _, err := hj.Hijack()
if err != nil {
panic("Cannot hijack")
}
conn.Close()
}
// errorStatusCodeServer will reply error status code (e.g. 503) after the
// it replies for threhold time.
type errorStatusCodeServer struct {
th *thresholdMonitor
}
func newErrorStatusServer(threshold int) *errorStatusCodeServer {
return &errorStatusCodeServer{newThresholdMonitor(threshold)}
}
func (s *errorStatusCodeServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.th.incAndTest() {
return
}
w.WriteHeader(http.StatusServiceUnavailable)
}
// goodServer always returns status ok.
type goodServer bool
func (s *goodServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {}
// partitionedServer returns status ok at some first requests.
// Then it will block for a while, and then reply again.
type partitionedServer struct {
healthyCnt int32
partitionCnt int32
cnt int32
mutex *sync.Mutex
cond *sync.Cond
}
func newPartitionedServer(healthyCnt, partitionCnt int) *partitionedServer {
mutex := new(sync.Mutex)
cond := sync.NewCond(mutex)
return &partitionedServer{
healthyCnt: int32(healthyCnt),
partitionCnt: int32(partitionCnt),
mutex: mutex,
cond: cond,
}
}
func (s *partitionedServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cnt := atomic.AddInt32(&s.cnt, 1)
if cnt < s.healthyCnt {
return
}
if cnt < s.healthyCnt+s.partitionCnt {
s.mutex.Lock()
defer s.mutex.Unlock()
s.cond.Wait()
return
}
s.mutex.Lock()
defer s.mutex.Unlock()
s.cond.Broadcast()
}
func TestSlaveHealthCheckerFailedOnBlockedSlave(t *testing.T) {
s := newBlockedServer(5)
ts := httptest.NewUnstartedServer(s)
ts.Start()
defer ts.Close()
upid, err := upid.Parse(fmt.Sprintf("slave@%s", ts.Listener.Addr().String()))
assert.NoError(t, err)
checker := NewSlaveHealthChecker(upid, 10, time.Millisecond*10, time.Millisecond*10)
ch := checker.Start()
defer checker.Stop()
select {
case <-time.After(time.Second):
s.stop()
t.Fatal("timeout")
case <-ch:
s.stop()
assert.True(t, atomic.LoadInt32(&s.th.cnt) > 10)
}
}
func TestSlaveHealthCheckerFailedOnEOFSlave(t *testing.T) {
s := newEOFServer(5)
ts := httptest.NewUnstartedServer(s)
ts.Start()
defer ts.Close()
upid, err := upid.Parse(fmt.Sprintf("slave@%s", ts.Listener.Addr().String()))
assert.NoError(t, err)
checker := NewSlaveHealthChecker(upid, 10, time.Millisecond*10, time.Millisecond*10)
ch := checker.Start()
defer checker.Stop()
select {
case <-time.After(time.Second):
t.Fatal("timeout")
case <-ch:
assert.True(t, atomic.LoadInt32(&s.th.cnt) > 10)
}
}
func TestSlaveHealthCheckerFailedOnErrorStatusSlave(t *testing.T) {
s := newErrorStatusServer(5)
ts := httptest.NewUnstartedServer(s)
ts.Start()
defer ts.Close()
upid, err := upid.Parse(fmt.Sprintf("slave@%s", ts.Listener.Addr().String()))
assert.NoError(t, err)
checker := NewSlaveHealthChecker(upid, 10, time.Millisecond*10, time.Millisecond*10)
ch := checker.Start()
defer checker.Stop()
select {
case <-time.After(time.Second):
t.Fatal("timeout")
case <-ch:
assert.True(t, atomic.LoadInt32(&s.th.cnt) > 10)
}
}
func TestSlaveHealthCheckerSucceed(t *testing.T) {
s := new(goodServer)
ts := httptest.NewUnstartedServer(s)
ts.Start()
defer ts.Close()
upid, err := upid.Parse(fmt.Sprintf("slave@%s", ts.Listener.Addr().String()))
assert.NoError(t, err)
checker := NewSlaveHealthChecker(upid, 10, time.Millisecond*10, time.Millisecond*10)
ch := checker.Start()
defer checker.Stop()
select {
case <-time.After(time.Second):
assert.Equal(t, 0, checker.continuousUnhealthyCount)
case <-ch:
t.Fatal("Shouldn't get unhealthy notification")
}
}
func TestSlaveHealthCheckerPartitonedSlave(t *testing.T) {
s := newPartitionedServer(5, 9)
ts := httptest.NewUnstartedServer(s)
ts.Start()
defer ts.Close()
upid, err := upid.Parse(fmt.Sprintf("slave@%s", ts.Listener.Addr().String()))
assert.NoError(t, err)
checker := NewSlaveHealthChecker(upid, 10, time.Millisecond*10, time.Millisecond*10)
ch := checker.Start()
defer checker.Stop()
select {
case <-time.After(time.Second):
assert.Equal(t, 0, checker.continuousUnhealthyCount)
case <-ch:
t.Fatal("Shouldn't get unhealthy notification")
}
}

View File

@ -0,0 +1,3 @@
// This package was previously the home of the native bindings. Please use the
// native branch if you need to build against the native bindings.
package mesos

View File

@ -1,7 +1,7 @@
####Benchmark of the messenger. ####Benchmark of the messenger.
```shell ```shell
$ go test -v -run=Benckmark* -bench=. $ go test -v -run=Benckmark* -bench=.
PASS PASS
BenchmarkMessengerSendSmallMessage 50000 70568 ns/op BenchmarkMessengerSendSmallMessage 50000 70568 ns/op
BenchmarkMessengerSendMediumMessage 50000 70265 ns/op BenchmarkMessengerSendMediumMessage 50000 70265 ns/op
@ -29,7 +29,7 @@ BenchmarkMessengerSendRecvLargeMessage-4 50000 45472 ns/op
BenchmarkMessengerSendRecvMixedMessage-4 50000 47393 ns/op BenchmarkMessengerSendRecvMixedMessage-4 50000 47393 ns/op
ok github.com/mesos/mesos-go/messenger 105.173s ok github.com/mesos/mesos-go/messenger 105.173s
``` ```
####environment: ####environment:
``` ```

View File

@ -21,7 +21,6 @@ package messenger
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/mesos/mesos-go/upid"
"io/ioutil" "io/ioutil"
"net" "net"
"net/http" "net/http"
@ -33,6 +32,7 @@ import (
"time" "time"
log "github.com/golang/glog" log "github.com/golang/glog"
"github.com/mesos/mesos-go/upid"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -235,7 +235,14 @@ func (t *HTTPTransporter) listen() error {
} else { } else {
host = t.upid.Host host = t.upid.Host
} }
port := t.upid.Port
var port string
if t.upid.Port != "" {
port = t.upid.Port
} else {
port = "0"
}
// NOTE: Explicitly specifies IPv4 because Libprocess // NOTE: Explicitly specifies IPv4 because Libprocess
// only supports IPv4 for now. // only supports IPv4 for now.
ln, err := net.Listen("tcp4", net.JoinHostPort(host, port)) ln, err := net.Listen("tcp4", net.JoinHostPort(host, port))
@ -245,7 +252,15 @@ func (t *HTTPTransporter) listen() error {
} }
// Save the host:port in case they are not specified in upid. // Save the host:port in case they are not specified in upid.
host, port, _ = net.SplitHostPort(ln.Addr().String()) host, port, _ = net.SplitHostPort(ln.Addr().String())
t.upid.Host, t.upid.Port = host, port
if len(t.upid.Host) == 0 {
t.upid.Host = host
}
if len(t.upid.Port) == 0 || t.upid.Port == "0" {
t.upid.Port = port
}
t.listener = ln t.listener = ln
return nil return nil
} }

View File

@ -2,6 +2,7 @@ package messenger
import ( import (
"fmt" "fmt"
"net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"regexp" "regexp"
@ -266,6 +267,66 @@ func TestTransporterStartAndStop(t *testing.T) {
} }
} }
func TestMutatedHostUPid(t *testing.T) {
serverId := "testserver"
serverPort := getNewPort()
serverHost := "127.0.0.1"
serverAddr := serverHost + ":" + strconv.Itoa(serverPort)
// override the upid.Host with this listener IP
addr := net.ParseIP("127.0.0.2")
// setup receiver (server) process
uPid, err := upid.Parse(fmt.Sprintf("%s@%s", serverId, serverAddr))
assert.NoError(t, err)
receiver := NewHTTPTransporter(uPid, addr)
err = receiver.listen()
assert.NoError(t, err)
if receiver.upid.Host != "127.0.0.1" {
t.Fatalf("reciever.upid.Host was expected to return %s, got %s\n", serverHost, receiver.upid.Host)
}
if receiver.upid.Port != strconv.Itoa(serverPort) {
t.Fatalf("receiver.upid.Port was expected to return %d, got %s\n", serverPort, receiver.upid.Port)
}
}
func TestEmptyHostPortUPid(t *testing.T) {
serverId := "testserver"
serverPort := getNewPort()
serverHost := "127.0.0.1"
serverAddr := serverHost + ":" + strconv.Itoa(serverPort)
// setup receiver (server) process
uPid, err := upid.Parse(fmt.Sprintf("%s@%s", serverId, serverAddr))
assert.NoError(t, err)
// Unset upid host and port
uPid.Host = ""
uPid.Port = ""
// override the upid.Host with this listener IP
addr := net.ParseIP("127.0.0.2")
receiver := NewHTTPTransporter(uPid, addr)
err = receiver.listen()
assert.NoError(t, err)
// This should be the host that overrides as uPid.Host is empty
if receiver.upid.Host != "127.0.0.2" {
t.Fatalf("reciever.upid.Host was expected to return %s, got %s\n", serverHost, receiver.upid.Host)
}
// This should end up being a random port, not the server port as uPid
// port is empty
if receiver.upid.Port == strconv.Itoa(serverPort) {
t.Fatalf("receiver.upid.Port was not expected to return %d, got %s\n", serverPort, receiver.upid.Port)
}
}
func makeMockServer(path string, handler func(rsp http.ResponseWriter, req *http.Request)) *httptest.Server { func makeMockServer(path string, handler func(rsp http.ResponseWriter, req *http.Request)) *httptest.Server {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc(path, handler) mux.HandleFunc(path, handler)

View File

@ -76,41 +76,62 @@ type MesosMessenger struct {
tr Transporter tr Transporter
} }
// create a new default messenger (HTTP). If a non-nil, non-wildcard bindingAddress is // ForHostname creates a new default messenger (HTTP), using UPIDBindingAddress to
// specified then it will be used for both the UPID and Transport binding address. Otherwise // determine the binding-address used for both the UPID.Host and Transport binding address.
// hostname is resolved to an IP address and the UPID.Host is set to that address and the
// bindingAddress is passed through to the Transport.
func ForHostname(proc *process.Process, hostname string, bindingAddress net.IP, port uint16) (Messenger, error) { func ForHostname(proc *process.Process, hostname string, bindingAddress net.IP, port uint16) (Messenger, error) {
upid := &upid.UPID{ upid := &upid.UPID{
ID: proc.Label(), ID: proc.Label(),
Port: strconv.Itoa(int(port)), Port: strconv.Itoa(int(port)),
} }
host, err := UPIDBindingAddress(hostname, bindingAddress)
if err != nil {
return nil, err
}
upid.Host = host
return NewHttpWithBindingAddress(upid, bindingAddress), nil
}
// UPIDBindingAddress determines the value of UPID.Host that will be used to build
// a Transport. If a non-nil, non-wildcard bindingAddress is specified then it will be used
// for both the UPID and Transport binding address. Otherwise hostname is resolved to an IP
// address and the UPID.Host is set to that address and the bindingAddress is passed through
// to the Transport.
func UPIDBindingAddress(hostname string, bindingAddress net.IP) (string, error) {
upidHost := ""
if bindingAddress != nil && "0.0.0.0" != bindingAddress.String() { if bindingAddress != nil && "0.0.0.0" != bindingAddress.String() {
upid.Host = bindingAddress.String() upidHost = bindingAddress.String()
} else { } else {
ips, err := net.LookupIP(hostname) if hostname == "" || hostname == "0.0.0.0" {
if err != nil { return "", fmt.Errorf("invalid hostname (%q) specified with binding address %v", hostname, bindingAddress)
return nil, err
} }
// try to find an ipv4 and use that ip := net.ParseIP(hostname)
ip := net.IP(nil) if ip != nil {
for _, addr := range ips { ip = ip.To4()
if ip = addr.To4(); ip != nil {
break
}
} }
if ip == nil { if ip == nil {
// no ipv4? best guess, just take the first addr ips, err := net.LookupIP(hostname)
if len(ips) > 0 { if err != nil {
ip = ips[0] return "", err
log.Warningf("failed to find an IPv4 address for '%v', best guess is '%v'", hostname, ip) }
} else { // try to find an ipv4 and use that
return nil, fmt.Errorf("failed to determine IP address for host '%v'", hostname) for _, addr := range ips {
if ip = addr.To4(); ip != nil {
break
}
}
if ip == nil {
// no ipv4? best guess, just take the first addr
if len(ips) > 0 {
ip = ips[0]
log.Warningf("failed to find an IPv4 address for '%v', best guess is '%v'", hostname, ip)
} else {
return "", fmt.Errorf("failed to determine IP address for host '%v'", hostname)
}
} }
} }
upid.Host = ip.String() upidHost = ip.String()
} }
return NewHttpWithBindingAddress(upid, bindingAddress), nil return upidHost, nil
} }
// NewMesosMessenger creates a new mesos messenger. // NewMesosMessenger creates a new mesos messenger.

View File

@ -3,6 +3,7 @@ package messenger
import ( import (
"fmt" "fmt"
"math/rand" "math/rand"
"net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strconv" "strconv"
@ -431,3 +432,35 @@ func BenchmarkMessengerSendRecvMixedMessage(b *testing.B) {
} }
globalWG.Wait() globalWG.Wait()
} }
func TestUPIDBindingAddress(t *testing.T) {
tt := []struct {
hostname string
binding net.IP
expected string
}{
{"", nil, ""},
{"", net.IPv4(1, 2, 3, 4), "1.2.3.4"},
{"", net.IPv4(0, 0, 0, 0), ""},
{"localhost", nil, "127.0.0.1"},
{"localhost", net.IPv4(5, 6, 7, 8), "5.6.7.8"},
{"localhost", net.IPv4(0, 0, 0, 0), "127.0.0.1"},
{"0.0.0.0", nil, ""},
{"7.8.9.1", nil, "7.8.9.1"},
{"7.8.9.1", net.IPv4(0, 0, 0, 0), "7.8.9.1"},
{"7.8.9.1", net.IPv4(8, 9, 1, 2), "8.9.1.2"},
}
for i, tc := range tt {
actual, err := UPIDBindingAddress(tc.hostname, tc.binding)
if err != nil && tc.expected != "" {
t.Fatalf("test case %d failed; expected %q instead of error %v", i+1, tc.expected, err)
}
if err == nil && actual != tc.expected {
t.Fatalf("test case %d failed; expected %q instead of %q", i+1, tc.expected, actual)
}
if err != nil {
t.Logf("test case %d; received expected error %v", i+1, err)
}
}
}

View File

@ -478,7 +478,7 @@ func (driver *MesosSchedulerDriver) frameworkReregistered(from *upid.UPID, pbMsg
} }
func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg proto.Message) { func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling resource offers.") log.V(2).Infoln("Handling resource offers.")
msg := pbMsg.(*mesos.ResourceOffersMessage) msg := pbMsg.(*mesos.ResourceOffersMessage)
if driver.Status() == mesos.Status_DRIVER_ABORTED { if driver.Status() == mesos.Status_DRIVER_ABORTED {
@ -500,7 +500,7 @@ func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg prot
for i, offer := range msg.Offers { for i, offer := range msg.Offers {
if pid, err := upid.Parse(pidStrings[i]); err == nil { if pid, err := upid.Parse(pidStrings[i]); err == nil {
driver.cache.putOffer(offer, pid) driver.cache.putOffer(offer, pid)
log.V(1).Infof("Cached offer %s from SlavePID %s", offer.Id.GetValue(), pid) log.V(2).Infof("Cached offer %s from SlavePID %s", offer.Id.GetValue(), pid)
} else { } else {
log.Warningf("Failed to parse offer PID '%v': %v", pid, err) log.Warningf("Failed to parse offer PID '%v': %v", pid, err)
} }
@ -822,11 +822,15 @@ func (driver *MesosSchedulerDriver) Stop(failover bool) (mesos.Status, error) {
return stat, fmt.Errorf("Unable to Stop, expected driver status %s, but is %s", mesos.Status_DRIVER_RUNNING, stat) return stat, fmt.Errorf("Unable to Stop, expected driver status %s, but is %s", mesos.Status_DRIVER_RUNNING, stat)
} }
if driver.connected && failover { if driver.connected && !failover {
// unregister the framework // unregister the framework
log.Infoln("Unregistering the scheduler driver")
message := &mesos.UnregisterFrameworkMessage{ message := &mesos.UnregisterFrameworkMessage{
FrameworkId: driver.FrameworkInfo.Id, FrameworkId: driver.FrameworkInfo.Id,
} }
//TODO(jdef) this is actually a little racy: we send an 'unregister' message but then
// immediately afterward the messenger is stopped in driver.stop(). so the unregister message
// may not actually end up being sent out.
if err := driver.send(driver.MasterPid, message); err != nil { if err := driver.send(driver.MasterPid, message); err != nil {
log.Errorf("Failed to send UnregisterFramework message while stopping driver: %v\n", err) log.Errorf("Failed to send UnregisterFramework message while stopping driver: %v\n", err)
return driver.stop(mesos.Status_DRIVER_ABORTED) return driver.stop(mesos.Status_DRIVER_ABORTED)

View File

@ -20,7 +20,6 @@ package scheduler
import ( import (
"fmt" "fmt"
"os"
"os/user" "os/user"
"sync" "sync"
"testing" "testing"
@ -38,6 +37,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"golang.org/x/net/context"
) )
var ( var (
@ -137,7 +137,7 @@ func TestSchedulerDriverNew(t *testing.T) {
driver := newTestSchedulerDriver(t, NewMockScheduler(), &mesos.FrameworkInfo{}, masterAddr, nil) driver := newTestSchedulerDriver(t, NewMockScheduler(), &mesos.FrameworkInfo{}, masterAddr, nil)
user, _ := user.Current() user, _ := user.Current()
assert.Equal(t, user.Username, driver.FrameworkInfo.GetUser()) assert.Equal(t, user.Username, driver.FrameworkInfo.GetUser())
host, _ := os.Hostname() host := util.GetHostname("")
assert.Equal(t, host, driver.FrameworkInfo.GetHostname()) assert.Equal(t, host, driver.FrameworkInfo.GetHostname())
} }
@ -335,9 +335,19 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverStopUnstarted() {
suite.Equal(mesos.Status_DRIVER_NOT_STARTED, stat) suite.Equal(mesos.Status_DRIVER_NOT_STARTED, stat)
} }
func (suite *SchedulerTestSuite) TestSchdulerDriverStopOK() { type msgTracker struct {
*messenger.MockedMessenger
lastMessage proto.Message
}
func (m *msgTracker) Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
m.lastMessage = msg
return m.MockedMessenger.Send(ctx, upid, msg)
}
func (suite *SchedulerTestSuite) TestSchdulerDriverStop_WithoutFailover() {
// Set expections and return values. // Set expections and return values.
messenger := messenger.NewMockedMessenger() messenger := &msgTracker{MockedMessenger: messenger.NewMockedMessenger()}
messenger.On("Start").Return(nil) messenger.On("Start").Return(nil)
messenger.On("UPID").Return(&upid.UPID{}) messenger.On("UPID").Return(&upid.UPID{})
messenger.On("Send").Return(nil) messenger.On("Send").Return(nil)
@ -357,9 +367,54 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverStopOK() {
suite.False(driver.Stopped()) suite.False(driver.Stopped())
suite.Equal(mesos.Status_DRIVER_RUNNING, driver.Status()) suite.Equal(mesos.Status_DRIVER_RUNNING, driver.Status())
driver.connected = true // pretend that we're already registered
driver.Stop(false) driver.Stop(false)
time.Sleep(time.Millisecond * 1)
msg := messenger.lastMessage
suite.NotNil(msg)
_, isUnregMsg := msg.(proto.Message)
suite.True(isUnregMsg, "expected UnregisterFrameworkMessage instead of %+v", msg)
suite.True(driver.Stopped())
suite.Equal(mesos.Status_DRIVER_STOPPED, driver.Status())
}
func (suite *SchedulerTestSuite) TestSchdulerDriverStop_WithFailover() {
// Set expections and return values.
messenger := &msgTracker{MockedMessenger: messenger.NewMockedMessenger()}
messenger.On("Start").Return(nil)
messenger.On("UPID").Return(&upid.UPID{})
messenger.On("Send").Return(nil)
messenger.On("Stop").Return(nil)
messenger.On("Route").Return(nil)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
driver.messenger = messenger
suite.True(driver.Stopped())
stat, err := driver.Start()
suite.NoError(err)
suite.Equal(mesos.Status_DRIVER_RUNNING, stat)
suite.False(driver.Stopped())
driver.connected = true // pretend that we're already registered
go func() {
// Run() blocks until the driver is stopped or aborted
stat, err := driver.Join()
suite.NoError(err)
suite.Equal(mesos.Status_DRIVER_STOPPED, stat)
}()
// wait for Join() to begin blocking (so that it has already validated the driver state)
time.Sleep(200 * time.Millisecond)
driver.Stop(true) // true = scheduler failover
msg := messenger.lastMessage
// we're expecting that lastMessage is nil because when failing over there's no
// 'unregister' message sent by the scheduler.
suite.Nil(msg)
suite.True(driver.Stopped()) suite.True(driver.Stopped())
suite.Equal(mesos.Status_DRIVER_STOPPED, driver.Status()) suite.Equal(mesos.Status_DRIVER_STOPPED, driver.Status())
@ -410,7 +465,7 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverLunchTasksUnstarted() {
suite.True(driver.Stopped()) suite.True(driver.Stopped())
stat, err := driver.LaunchTasks( stat, err := driver.LaunchTasks(
[]*mesos.OfferID{&mesos.OfferID{}}, []*mesos.OfferID{{}},
[]*mesos.TaskInfo{}, []*mesos.TaskInfo{},
&mesos.Filters{}, &mesos.Filters{},
) )
@ -514,7 +569,7 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverLaunchTasks() {
tasks := []*mesos.TaskInfo{task} tasks := []*mesos.TaskInfo{task}
stat, err := driver.LaunchTasks( stat, err := driver.LaunchTasks(
[]*mesos.OfferID{&mesos.OfferID{}}, []*mesos.OfferID{{}},
tasks, tasks,
&mesos.Filters{}, &mesos.Filters{},
) )
@ -565,7 +620,7 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverRequestResources() {
stat, err := driver.RequestResources( stat, err := driver.RequestResources(
[]*mesos.Request{ []*mesos.Request{
&mesos.Request{ {
SlaveId: util.NewSlaveID("test-slave-001"), SlaveId: util.NewSlaveID("test-slave-001"),
Resources: []*mesos.Resource{ Resources: []*mesos.Resource{
util.NewScalarResource("test-res-001", 33.00), util.NewScalarResource("test-res-001", 33.00),

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//Collection of resources for teting mesos artifacts.
package testutil
import (
"bytes"
"fmt"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/upid"
"github.com/stretchr/testify/assert"
"io"
"net"
"net/http"
"net/http/httptest"
"os"
"reflect"
"testing"
)
//MockMesosHttpProcess represents a remote http process: master or slave.
type MockMesosHttpServer struct {
PID *upid.UPID
Addr string
server *httptest.Server
t *testing.T
when map[string]http.HandlerFunc
}
type When interface {
Do(http.HandlerFunc)
}
type WhenFunc func(http.HandlerFunc)
func (w WhenFunc) Do(f http.HandlerFunc) {
w(f)
}
func (m *MockMesosHttpServer) On(uri string) When {
log.V(2).Infof("when %v do something special", uri)
return WhenFunc(func(f http.HandlerFunc) {
log.V(2).Infof("registered callback for %v", uri)
m.when[uri] = f
})
}
func NewMockMasterHttpServer(t *testing.T, handler func(rsp http.ResponseWriter, req *http.Request)) *MockMesosHttpServer {
var server *httptest.Server
when := make(map[string]http.HandlerFunc)
stateHandler := func(rsp http.ResponseWriter, req *http.Request) {
if "/state.json" == req.RequestURI {
state := fmt.Sprintf(`{ "leader": "master@%v" }`, server.Listener.Addr())
log.V(1).Infof("returning JSON %v", state)
io.WriteString(rsp, state)
} else if f, found := when[req.RequestURI]; found {
f(rsp, req)
} else {
handler(rsp, req)
}
}
server = httptest.NewServer(http.HandlerFunc(stateHandler))
assert.NotNil(t, server)
addr := server.Listener.Addr().String()
pid, err := upid.Parse("master@" + addr)
assert.NoError(t, err)
assert.NotNil(t, pid)
log.Infoln("Created test Master http server with PID", pid.String())
return &MockMesosHttpServer{PID: pid, Addr: addr, server: server, t: t, when: when}
}
func NewMockSlaveHttpServer(t *testing.T, handler func(rsp http.ResponseWriter, req *http.Request)) *MockMesosHttpServer {
server := httptest.NewServer(http.HandlerFunc(handler))
assert.NotNil(t, server)
addr := server.Listener.Addr().String()
pid, err := upid.Parse("slave(1)@" + addr)
assert.NoError(t, err)
assert.NotNil(t, pid)
assert.NoError(t, os.Setenv("MESOS_SLAVE_PID", pid.String()))
assert.NoError(t, os.Setenv("MESOS_SLAVE_ID", "test-slave-001"))
log.Infoln("Created test Slave http server with PID", pid.String())
return &MockMesosHttpServer{PID: pid, Addr: addr, server: server, t: t}
}
func (s *MockMesosHttpServer) Close() {
s.server.Close()
}
//MockMesosClient Http client to communicate with mesos processes (master,sched,exec)
type MockMesosClient struct {
pid *upid.UPID
t *testing.T
}
func NewMockMesosClient(t *testing.T, pid *upid.UPID) *MockMesosClient {
return &MockMesosClient{t: t, pid: pid}
}
// sendMessage Mocks sending event messages to a processes such as master, sched or exec.
func (c *MockMesosClient) SendMessage(targetPid *upid.UPID, message proto.Message) {
if c.t == nil {
panic("MockMesosClient needs a testing context.")
}
messageName := reflect.TypeOf(message).Elem().Name()
data, err := proto.Marshal(message)
assert.NoError(c.t, err)
hostport := net.JoinHostPort(targetPid.Host, targetPid.Port)
targetURL := fmt.Sprintf("http://%s/%s/mesos.internal.%s", hostport, targetPid.ID, messageName)
log.Infoln("MockMesosClient Sending message to", targetURL)
req, err := http.NewRequest("POST", targetURL, bytes.NewReader(data))
assert.NoError(c.t, err)
req.Header.Add("Libprocess-From", c.pid.String())
req.Header.Add("Content-Type", "application/x-protobuf")
resp, err := http.DefaultClient.Do(req)
assert.NoError(c.t, err)
assert.Equal(c.t, http.StatusAccepted, resp.StatusCode)
}

View File

@ -1,21 +1,12 @@
package upid package upid
import ( import (
"math/rand"
"strings"
"testing" "testing"
"testing/quick"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func generateRandomString() string {
b := make([]byte, rand.Intn(1024))
for i := range b {
b[i] = byte(rand.Int())
}
return strings.Replace(string(b), "@", "", -1)
}
func TestUPIDParse(t *testing.T) { func TestUPIDParse(t *testing.T) {
u, err := Parse("mesos@foo:bar") u, err := Parse("mesos@foo:bar")
assert.Nil(t, u) assert.Nil(t, u)
@ -29,17 +20,10 @@ func TestUPIDParse(t *testing.T) {
assert.Nil(t, u) assert.Nil(t, u)
assert.Error(t, err) assert.Error(t, err)
// Simple fuzzy test. assert.Nil(t, quick.Check(func(s string) bool {
for i := 0; i < 100000; i++ { u, err := Parse(s)
ra := generateRandomString() return u == nil && err != nil
u, err = Parse(ra) }, &quick.Config{MaxCount: 100000}))
if u != nil {
println(ra)
}
assert.Nil(t, u)
assert.Error(t, err)
}
} }
func TestUPIDString(t *testing.T) { func TestUPIDString(t *testing.T) {