Merge pull request #73313 from pivotal-k8s/csi-drivers-list

Refactor csiDriversStore
pull/564/head
Kubernetes Prow Robot 2019-02-06 09:31:06 -08:00 committed by GitHub
commit 2d956389dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 167 additions and 53 deletions

View File

@ -6,6 +6,7 @@ go_library(
"csi_attacher.go",
"csi_block.go",
"csi_client.go",
"csi_drivers_store.go",
"csi_mounter.go",
"csi_plugin.go",
"csi_util.go",
@ -44,6 +45,7 @@ go_test(
"csi_attacher_test.go",
"csi_block_test.go",
"csi_client_test.go",
"csi_drivers_store_test.go",
"csi_mounter_test.go",
"csi_plugin_test.go",
],

View File

@ -147,19 +147,12 @@ func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) {
addr := fmt.Sprintf(csiAddrTemplate, driverName)
requiresV0Client := true
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher) {
var existingDriver csiDriver
driverExists := false
func() {
csiDrivers.RLock()
defer csiDrivers.RUnlock()
existingDriver, driverExists = csiDrivers.driversMap[string(driverName)]
}()
existingDriver, driverExists := csiDrivers.Get(string(driverName))
if !driverExists {
return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName)
}
addr = existingDriver.driverEndpoint
addr = existingDriver.endpoint
requiresV0Client = versionRequiresV0Client(existingDriver.highestSupportedVersion)
}

View File

@ -0,0 +1,79 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"sync"
utilversion "k8s.io/apimachinery/pkg/util/version"
)
// Driver is a description of a CSI Driver, defined by an enpoint and the
// highest CSI version supported
type Driver struct {
endpoint string
highestSupportedVersion *utilversion.Version
}
// DriversStore holds a list of CSI Drivers
type DriversStore struct {
store
sync.RWMutex
}
type store map[string]Driver
// Get lets you retrieve a CSI Driver by name.
// This method is protected by a mutex.
func (s *DriversStore) Get(driverName string) (Driver, bool) {
s.RLock()
defer s.RUnlock()
driver, ok := s.store[driverName]
return driver, ok
}
// Set lets you save a CSI Driver to the list and give it a specific name.
// This method is protected by a mutex.
func (s *DriversStore) Set(driverName string, driver Driver) {
s.Lock()
defer s.Unlock()
if s.store == nil {
s.store = store{}
}
s.store[driverName] = driver
}
// Delete lets you delete a CSI Driver by name.
// This method is protected by a mutex.
func (s *DriversStore) Delete(driverName string) {
s.Lock()
defer s.Unlock()
delete(s.store, driverName)
}
// Clear deletes all entries in the store.
// This methiod is protected by a mutex.
func (s *DriversStore) Clear() {
s.Lock()
defer s.Unlock()
s.store = store{}
}

View File

@ -0,0 +1,64 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi_test
import (
"reflect"
"testing"
"k8s.io/kubernetes/pkg/volume/csi"
)
func TestDriversStore(t *testing.T) {
store := &csi.DriversStore{}
someDriver := csi.Driver{}
expectAbsent(t, store, "does-not-exist")
store.Set("some-driver", someDriver)
expectPresent(t, store, "some-driver", someDriver)
store.Delete("some-driver")
expectAbsent(t, store, "some-driver")
store.Set("some-driver", someDriver)
store.Clear()
expectAbsent(t, store, "some-driver")
}
func expectPresent(t *testing.T, store *csi.DriversStore, name string, expected csi.Driver) {
t.Helper()
retrieved, ok := store.Get(name)
if !ok {
t.Fatalf("expected driver '%s' to exist", name)
}
if !reflect.DeepEqual(retrieved, expected) {
t.Fatalf("expected driver '%s' to be equal to %v", name, expected)
}
}
func expectAbsent(t *testing.T, store *csi.DriversStore, name string) {
t.Helper()
if _, ok := store.Get(name); ok {
t.Fatalf("expected driver '%s' not to exist in store", name)
}
}

View File

@ -23,7 +23,6 @@ import (
"path"
"sort"
"strings"
"sync"
"time"
"context"
@ -84,17 +83,6 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
// volume.VolumePlugin methods
var _ volume.VolumePlugin = &csiPlugin{}
type csiDriver struct {
driverName string
driverEndpoint string
highestSupportedVersion *utilversion.Version
}
type csiDriversStore struct {
driversMap map[string]csiDriver
sync.RWMutex
}
// RegistrationHandler is the handler which is fed to the pluginwatcher API.
type RegistrationHandler struct {
}
@ -102,7 +90,7 @@ type RegistrationHandler struct {
// TODO (verult) consider using a struct instead of global variables
// csiDrivers map keep track of all registered CSI drivers on the node and their
// corresponding sockets
var csiDrivers csiDriversStore
var csiDrivers = &DriversStore{}
var nim nodeinfomanager.Interface
@ -141,17 +129,12 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
return err
}
func() {
// Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
// all other CSI components will be able to get the actual socket of CSI drivers by its name.
// It's not necessary to lock the entire RegistrationCallback() function because only the CSI
// client depends on this driver map, and the CSI client does not depend on node information
// updated in the rest of the function.
csiDrivers.Lock()
defer csiDrivers.Unlock()
csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint, highestSupportedVersion: highestSupportedVersion}
}()
// Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
// all other CSI components will be able to get the actual socket of CSI drivers by its name.
csiDrivers.Set(pluginName, Driver{
endpoint: endpoint,
highestSupportedVersion: highestSupportedVersion,
})
// Get node info from the driver.
csi, err := newCsiDriverClient(csiDriverName(pluginName))
@ -200,15 +183,7 @@ func (h *RegistrationHandler) validateVersions(callerName, pluginName string, en
return nil, err
}
// Check for existing drivers with the same name
var existingDriver csiDriver
driverExists := false
func() {
csiDrivers.RLock()
defer csiDrivers.RUnlock()
existingDriver, driverExists = csiDrivers.driversMap[pluginName]
}()
existingDriver, driverExists := csiDrivers.Get(pluginName)
if driverExists {
if !existingDriver.highestSupportedVersion.LessThan(newDriverHighestVersion) {
err := fmt.Errorf("%s for CSI driver %q failed. Another driver with the same name is already registered with a higher supported version: %q", callerName, pluginName, existingDriver.highestSupportedVersion)
@ -245,8 +220,7 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
}
}
// Initializing csiDrivers map and label management channels
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
// Initializing the label management channels
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host)
// TODO(#70514) Init CSINodeInfo object if the CRD exists and create Driver
@ -657,11 +631,7 @@ func (p *csiPlugin) getPublishContext(client clientset.Interface, handle, driver
}
func unregisterDriver(driverName string) error {
func() {
csiDrivers.Lock()
defer csiDrivers.Unlock()
delete(csiDrivers.driversMap, driverName)
}()
csiDrivers.Delete(driverName)
if err := nim.UninstallCSIDriver(driverName); err != nil {
klog.Errorf("Error uninstalling CSI driver: %v", err)

View File

@ -105,13 +105,16 @@ func makeTestPV(name string, sizeGig int, driverName, volID string) *api.Persist
}
func registerFakePlugin(pluginName, endpoint string, versions []string, t *testing.T) {
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
highestSupportedVersions, err := highestSupportedVersion(versions)
if err != nil {
t.Fatalf("unexpected error parsing versions (%v) for pluginName % q endpoint %q: %#v", versions, pluginName, endpoint, err)
}
csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint, highestSupportedVersion: highestSupportedVersions}
csiDrivers.Clear()
csiDrivers.Set(pluginName, Driver{
endpoint: endpoint,
highestSupportedVersion: highestSupportedVersions,
})
}
func TestPluginGetPluginName(t *testing.T) {
@ -839,13 +842,16 @@ func TestValidatePluginExistingDriver(t *testing.T) {
for _, tc := range testCases {
// Arrange & Act
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
highestSupportedVersions1, err := highestSupportedVersion(tc.versions1)
if err != nil {
t.Fatalf("unexpected error parsing version for testcase: %#v", tc)
}
csiDrivers.driversMap[tc.pluginName1] = csiDriver{driverName: tc.pluginName1, driverEndpoint: tc.endpoint1, highestSupportedVersion: highestSupportedVersions1}
csiDrivers.Clear()
csiDrivers.Set(tc.pluginName1, Driver{
endpoint: tc.endpoint1,
highestSupportedVersion: highestSupportedVersions1,
})
// Arrange & Act
err = PluginHandler.ValidatePlugin(tc.pluginName2, tc.endpoint2, tc.versions2, tc.foundInDeprecatedDir2)