diff --git a/contrib/mesos/pkg/election/master.go b/contrib/mesos/pkg/election/master.go index d5f1a76a7c..8ab199634b 100644 --- a/contrib/mesos/pkg/election/master.go +++ b/contrib/mesos/pkg/election/master.go @@ -46,14 +46,14 @@ type Service interface { } type notifier struct { - lock sync.Mutex - cond *sync.Cond + changed chan struct{} // to notify the service loop about changed state // desired is updated with every change, current is updated after // Start()/Stop() finishes. 'cond' is used to signal that a change // might be needed. This handles the case where mastership flops // around without calling Start()/Stop() excessively. desired, current Master + lock sync.Mutex // to protect the desired variable // for comparison, to see if we are master. id Master @@ -65,7 +65,7 @@ type notifier struct { // elected master starts/stops matching 'id'. Never returns. func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{}) { n := ¬ifier{id: Master(id), service: s} - n.cond = sync.NewCond(&n.lock) + n.changed = make(chan struct{}) finished := runtime.After(func() { runtime.Until(func() { for { @@ -86,14 +86,16 @@ func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{}) glog.Errorf("Unexpected object from election channel: %v", event.Object) break } - func() { - n.lock.Lock() - defer n.lock.Unlock() - n.desired = electedMaster - if n.desired != n.current { - n.cond.Signal() - } - }() + + n.lock.Lock() + n.desired = electedMaster + n.lock.Unlock() + + // notify serviceLoop, but don't block. If a change + // is queued already it will see the new n.desired. + select { + case n.changed <- struct{}{}: + } } } } @@ -104,31 +106,22 @@ func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{}) // serviceLoop waits for changes, and calls Start()/Stop() as needed. func (n *notifier) serviceLoop(abort <-chan struct{}) { - n.lock.Lock() - defer n.lock.Unlock() for { select { case <-abort: return - default: - for n.desired == n.current { - ch := runtime.After(n.cond.Wait) - select { - case <-abort: - n.cond.Signal() // ensure that Wait() returns - <-ch - return - case <-ch: - // we were notified and have the lock, proceed.. - } - } - if n.current != n.id && n.desired == n.id { - n.service.Validate(n.desired, n.current) + case <-n.changed: + n.lock.Lock() + newDesired := n.desired // copy value to avoid race below + n.lock.Unlock() + + if n.current != n.id && newDesired == n.id { + n.service.Validate(newDesired, n.current) n.service.Start() - } else if n.current == n.id && n.desired != n.id { + } else if n.current == n.id && newDesired != n.id { n.service.Stop() } - n.current = n.desired + n.current = newDesired } } } diff --git a/contrib/mesos/pkg/election/master_test.go b/contrib/mesos/pkg/election/master_test.go index 5584ab25fa..307428e84b 100644 --- a/contrib/mesos/pkg/election/master_test.go +++ b/contrib/mesos/pkg/election/master_test.go @@ -69,8 +69,24 @@ func Test(t *testing.T) { changes := make(chan bool, 1500) done := make(chan struct{}) s := &slowService{t: t, changes: changes, done: done} + + // change master to "notme" such that the initial m.Elect call inside Notify + // will trigger an obversable event. We will wait for it to make sure the + // Notify loop will see those master changes triggered by the go routine below. + m.ChangeMaster(Master("me")) + temporaryWatch := m.mux.Watch() + ch := temporaryWatch.ResultChan() + notifyDone := runtime.After(func() { Notify(m, "", "me", s, done) }) + // wait for the event triggered by the initial m.Elect of Notify. Then drain + // the channel to not block anything. + <-ch + temporaryWatch.Stop() + for i := 0; i < len(ch); i += 1 { // go 1.3 and 1.4 compatible loop + <-ch + } + go func() { defer close(done) for i := 0; i < 500; i++ { @@ -83,16 +99,8 @@ func Test(t *testing.T) { <-notifyDone close(changes) - changeList := []bool{} - for { - change, ok := <-changes - if !ok { - break - } - changeList = append(changeList, change) - } - - if len(changeList) > 1000 { - t.Errorf("unexpected number of changes: %v", len(changeList)) + changesNum := len(changes) + if changesNum > 1000 || changesNum == 0 { + t.Errorf("unexpected number of changes: %v", changesNum) } }