From d2f08c94a99b2e3981aa808ea8b6c1370d230015 Mon Sep 17 00:00:00 2001 From: Renaud Gaubert Date: Mon, 18 Sep 2017 22:29:47 +0200 Subject: [PATCH] Device Plugin now closes client connexion --- pkg/kubelet/deviceplugin/endpoint.go | 29 ++++++++++------------------ pkg/kubelet/deviceplugin/manager.go | 5 +++++ 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/pkg/kubelet/deviceplugin/endpoint.go b/pkg/kubelet/deviceplugin/endpoint.go index f0523471a3..cefffcd433 100644 --- a/pkg/kubelet/deviceplugin/endpoint.go +++ b/pkg/kubelet/deviceplugin/endpoint.go @@ -33,7 +33,8 @@ import ( // for managing gRPC communications with the device plugin and caching // device states reported by the device plugin. type endpoint struct { - client pluginapi.DevicePluginClient + client pluginapi.DevicePluginClient + clientConn *grpc.ClientConn socketPath string resourceName string @@ -42,32 +43,25 @@ type endpoint struct { mutex sync.Mutex callback MonitorCallback - - cancel context.CancelFunc - ctx context.Context } // newEndpoint creates a new endpoint for the given resourceName. func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*endpoint, error) { - client, err := dial(socketPath) + client, c, err := dial(socketPath) if err != nil { glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err) return nil, err } - ctx, stop := context.WithCancel(context.Background()) - return &endpoint{ - client: client, + client: client, + clientConn: c, socketPath: socketPath, resourceName: resourceName, devices: nil, callback: callback, - - cancel: stop, - ctx: ctx, }, nil } @@ -80,11 +74,9 @@ func (e *endpoint) getDevices() []*pluginapi.Device { // list initializes ListAndWatch gRPC call for the device plugin and gets the // initial list of the devices. Returns ListAndWatch gRPC stream on success. func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) { - glog.V(3).Infof("Starting List") - stream, err := e.client.ListAndWatch(e.ctx, &pluginapi.Empty{}) + stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{}) if err != nil { glog.Errorf(errListAndWatch, e.resourceName, err) - return nil, err } @@ -184,7 +176,6 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient e.callback(e.resourceName, added, updated, deleted) } - } // allocate issues Allocate gRPC call to the device plugin. @@ -195,11 +186,11 @@ func (e *endpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) } func (e *endpoint) stop() { - e.cancel() + e.clientConn.Close() } // dial establishes the gRPC communication with the registered device plugin. -func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) { +func dial(unixSocketPath string) (pluginapi.DevicePluginClient, *grpc.ClientConn, error) { c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { return net.DialTimeout("unix", addr, timeout) @@ -207,8 +198,8 @@ func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) { ) if err != nil { - return nil, fmt.Errorf(errFailedToDialDevicePlugin+" %v", err) + return nil, nil, fmt.Errorf(errFailedToDialDevicePlugin+" %v", err) } - return pluginapi.NewDevicePluginClient(c), nil + return pluginapi.NewDevicePluginClient(c), c, nil } diff --git a/pkg/kubelet/deviceplugin/manager.go b/pkg/kubelet/deviceplugin/manager.go index 5c05a97289..a869ea8131 100644 --- a/pkg/kubelet/deviceplugin/manager.go +++ b/pkg/kubelet/deviceplugin/manager.go @@ -206,9 +206,11 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) return } + stream, err := e.list() if err != nil { glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err) + e.stop() return } @@ -219,12 +221,14 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { m.endpoints[r.ResourceName] = e m.mutex.Unlock() glog.V(2).Infof("Registered endpoint %v", e) + if ok && old != nil { old.stop() } go func() { e.listAndWatch(stream) + e.stop() m.mutex.Lock() if old, ok := m.endpoints[r.ResourceName]; ok && old == e { @@ -233,6 +237,7 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { // Issues callback to delete all of devices. e.callback(e.resourceName, []*pluginapi.Device{}, []*pluginapi.Device{}, e.getDevices()) } + glog.V(2).Infof("Unregistered endpoint %v", e) m.mutex.Unlock() }()