Fixes some races in deviceplugin manager_test.go and manager.go.

pull/6/head
Jiaying Zhang 2017-09-15 11:19:46 -07:00
parent 98ed5dd8a2
commit 34dccc5d2a
3 changed files with 16 additions and 4 deletions

View File

@ -70,7 +70,7 @@ func (m *Stub) Start() error {
// Wait till grpc server is ready.
for i := 0; i < 10; i++ {
services := m.server.GetServiceInfo()
if len(services) > 0 {
if len(services) > 1 {
break
}
time.Sleep(1 * time.Second)
@ -83,6 +83,7 @@ func (m *Stub) Start() error {
// Stop stops the gRPC server
func (m *Stub) Stop() error {
m.server.Stop()
close(m.stop)
return m.cleanup()
}

View File

@ -189,9 +189,12 @@ func (m *ManagerImpl) Register(ctx context.Context,
// Stop is the function that can stop the gRPC server.
func (m *ManagerImpl) Stop() error {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, e := range m.endpoints {
e.stop()
}
m.server.Stop()
return nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package deviceplugin
import (
"sync/atomic"
"testing"
"time"
@ -40,7 +41,8 @@ func TestNewManagerImpl(t *testing.T) {
}
func TestNewManagerImplStart(t *testing.T) {
setup(t, []*pluginapi.Device{}, func(n string, a, u, r []*pluginapi.Device) {})
m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []*pluginapi.Device) {})
cleanup(t, m, p)
}
// Tests that the device plugin manager correctly handles registration and re-registration by
@ -54,9 +56,11 @@ func TestDevicePluginReRegistration(t *testing.T) {
callbackCount := 0
callbackChan := make(chan int)
var stopping int32
stopping = 0
callback := func(n string, a, u, r []*pluginapi.Device) {
// Should be called twice, one for each plugin.
if callbackCount > 1 {
// Should be called twice, one for each plugin registration, till we are stopping.
if callbackCount > 1 && atomic.LoadInt32(&stopping) <= 0 {
t.FailNow()
}
callbackCount++
@ -80,6 +84,10 @@ func TestDevicePluginReRegistration(t *testing.T) {
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
// Wait long enough to catch unexpected callbacks.
time.Sleep(5 * time.Second)
atomic.StoreInt32(&stopping, 1)
cleanup(t, m, p1)
p2.Stop()
}
func setup(t *testing.T, devs []*pluginapi.Device, callback MonitorCallback) (Manager, *Stub) {