Merge pull request #60034 from pohly/device-manager-goroutine

Automatic merge from submit-queue (batch tested with PRs 58474, 60034, 62101, 63198). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

avoid race condition in device manager and plugin startup/shutdown: wait for goroutines

**What this PR does / why we need it**:

Commit 1325c2f worked around issue #59488, but it is still worthwhile to fix the underlying root cause properly.

**Which issue(s) this PR fixes**:
Fixes #59488

**Special notes for your reviewer**:

This is an alternative to PR #59861, which used a different approach. Personally I tend to prefer this one now.

**Release note**:
```release-note
NONE
```

/sig node
/area hw-accelerators
/assign vikaschoudhary16
pull/8/head
Kubernetes Submit Queue 2018-04-30 13:24:08 -07:00 committed by GitHub
commit 15cc20630d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 5 deletions

View File

@ -22,6 +22,7 @@ import (
"net"
"os"
"path"
"sync"
"time"
"google.golang.org/grpc"
@ -35,6 +36,7 @@ type Stub struct {
socket string
stop chan interface{}
wg sync.WaitGroup
update chan []*pluginapi.Device
server *grpc.Server
@ -70,7 +72,8 @@ func (m *Stub) SetAllocFunc(f stubAllocFunc) {
m.allocFunc = f
}
// Start starts the gRPC server of the device plugin
// Start starts the gRPC server of the device plugin. Can only
// be called once.
func (m *Stub) Start() error {
err := m.cleanup()
if err != nil {
@ -82,10 +85,14 @@ func (m *Stub) Start() error {
return err
}
m.wg.Add(1)
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterDevicePluginServer(m.server, m)
go m.server.Serve(sock)
go func() {
defer m.wg.Done()
m.server.Serve(sock)
}()
_, conn, err := dial(m.socket)
if err != nil {
return err
@ -96,10 +103,17 @@ func (m *Stub) Start() error {
return nil
}
// Stop stops the gRPC server
// Stop stops the gRPC server. Can be called without a prior Start
// and more than once. Not safe to be called concurrently by different
// goroutines!
func (m *Stub) Stop() error {
if m.server == nil {
return nil
}
m.server.Stop()
close(m.stop)
m.wg.Wait()
m.server = nil
close(m.stop) // This prevents re-starting the server.
return m.cleanup()
}

View File

@ -59,6 +59,7 @@ type ManagerImpl struct {
mutex sync.Mutex
server *grpc.Server
wg sync.WaitGroup
// activePods is a method for listing active pods on the node
// so the amount of pluginResources requested by existing pods
@ -224,10 +225,14 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc
return err
}
m.wg.Add(1)
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterRegistrationServer(m.server, m)
go m.server.Serve(s)
go func() {
defer m.wg.Done()
m.server.Serve(s)
}()
glog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
@ -313,6 +318,8 @@ func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest
}
// Stop is the function that can stop the gRPC server.
// Can be called concurrently, more than once, and is safe to call
// without a prior Start.
func (m *ManagerImpl) Stop() error {
m.mutex.Lock()
defer m.mutex.Unlock()
@ -320,7 +327,12 @@ func (m *ManagerImpl) Stop() error {
e.stop()
}
if m.server == nil {
return nil
}
m.server.Stop()
m.wg.Wait()
m.server = nil
return nil
}

View File

@ -67,6 +67,29 @@ func TestNewManagerImplStart(t *testing.T) {
defer os.RemoveAll(socketDir)
m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName)
cleanup(t, m, p)
// Stop should tolerate being called more than once.
cleanup(t, m, p)
}
func TestNewManagerImplStop(t *testing.T) {
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
require.NoError(t, err)
defer os.RemoveAll(socketDir)
m, err := newManagerImpl(socketName)
require.NoError(t, err)
// No prior Start, but that should be okay.
err = m.Stop()
require.NoError(t, err)
devs := []*pluginapi.Device{
{ID: "Dev1", Health: pluginapi.Healthy},
{ID: "Dev2", Health: pluginapi.Healthy},
}
p := NewDevicePluginStub(devs, pluginSocketName)
// Same here.
err = p.Stop()
require.NoError(t, err)
}
// Tests that the device plugin manager correctly handles registration and re-registration by