make groupVersionResource listing dynamic when third party resources are

enabled.
pull/6/head
Brendan Burns 2016-10-31 22:55:28 -07:00
parent 0042ce5684
commit ef6529bf2f
5 changed files with 55 additions and 18 deletions

View File

@ -369,11 +369,30 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl
// Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := client("namespace-controller")
namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc)
groupVersionResources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()
// TODO: consider using a list-watch + cache here rather than polling
var gvrFn func() ([]unversioned.GroupVersionResource, error)
rsrcs, err := namespaceKubeClient.Discovery().ServerResources()
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
glog.Fatalf("Failed to get group version resources: %v", err)
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, groupVersionResources, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes)
for _, rsrcList := range rsrcs {
for ix := range rsrcList.APIResources {
rsrc := &rsrcList.APIResources[ix]
if rsrc.Kind == "ThirdPartyResource" {
gvrFn = namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
}
}
}
if gvrFn == nil {
gvr, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()
if err != nil {
glog.Fatalf("Failed to get resources: %v", err)
}
gvrFn = func() ([]unversioned.GroupVersionResource, error) {
return gvr, nil
}
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, gvrFn, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes)
go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

View File

@ -47,8 +47,8 @@ type NamespaceController struct {
controller *cache.Controller
// namespaces that have been queued up for processing by workers
queue workqueue.RateLimitingInterface
// list of preferred group versions and their corresponding resource set for namespace deletion
groupVersionResources []unversioned.GroupVersionResource
// function to list of preferred group versions and their corresponding resource set for namespace deletion
groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error)
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
opCache *operationNotSupportedCache
// finalizerToken is the finalizer token managed by this controller
@ -59,7 +59,7 @@ type NamespaceController struct {
func NewNamespaceController(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
groupVersionResources []unversioned.GroupVersionResource,
groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error),
resyncPeriod time.Duration,
finalizerToken api.FinalizerName) *NamespaceController {
@ -86,9 +86,9 @@ func NewNamespaceController(
kubeClient: kubeClient,
clientPool: clientPool,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
groupVersionResources: groupVersionResources,
opCache: opCache,
finalizerToken: finalizerToken,
groupVersionResourcesFn: groupVersionResourcesFn,
opCache: opCache,
finalizerToken: finalizerToken,
}
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
@ -191,7 +191,7 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
return err
}
namespace := obj.(*api.Namespace)
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResources, namespace, nm.finalizerToken)
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResourcesFn, namespace, nm.finalizerToken)
}
// Run starts observing the system with the specified number of workers.

View File

@ -129,6 +129,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
testNamespace *api.Namespace
kubeClientActionSet sets.String
dynamicClientActionSet sets.String
gvrError error
}{
"pending-finalize": {
testNamespace: testNamespacePendingFinalize,
@ -148,6 +149,15 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
),
dynamicClientActionSet: sets.NewString(),
},
"groupVersionResourceErr": {
testNamespace: testNamespaceFinalizeComplete,
kubeClientActionSet: sets.NewString(
strings.Join([]string{"get", "namespaces", ""}, "-"),
strings.Join([]string{"delete", "namespaces", ""}, "-"),
),
dynamicClientActionSet: sets.NewString(),
gvrError: fmt.Errorf("test error"),
},
}
for scenario, testInput := range scenarios {
@ -158,7 +168,11 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
mockClient := fake.NewSimpleClientset(testInput.testNamespace)
clientPool := dynamic.NewClientPool(clientConfig, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
err := syncNamespace(mockClient, clientPool, &operationNotSupportedCache{m: make(map[operationKey]bool)}, groupVersionResources, testInput.testNamespace, api.FinalizerKubernetes)
fn := func() ([]unversioned.GroupVersionResource, error) {
return groupVersionResources, nil
}
err := syncNamespace(mockClient, clientPool, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testInput.testNamespace, api.FinalizerKubernetes)
if err != nil {
t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err)
}
@ -227,7 +241,10 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
Phase: api.NamespaceActive,
},
}
err := syncNamespace(mockClient, nil, &operationNotSupportedCache{m: make(map[operationKey]bool)}, testGroupVersionResources(), testNamespace, api.FinalizerKubernetes)
fn := func() ([]unversioned.GroupVersionResource, error) {
return testGroupVersionResources(), nil
}
err := syncNamespace(mockClient, nil, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testNamespace, api.FinalizerKubernetes)
if err != nil {
t.Errorf("Unexpected error when synching namespace %v", err)
}

View File

@ -371,7 +371,7 @@ func syncNamespace(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache *operationNotSupportedCache,
groupVersionResources []unversioned.GroupVersionResource,
groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error),
namespace *api.Namespace,
finalizerToken api.FinalizerName,
) error {
@ -422,6 +422,10 @@ func syncNamespace(
}
// there may still be content for us to remove
groupVersionResources, err := groupVersionResourcesFn()
if err != nil {
return err
}
estimate, err := deleteAllContent(kubeClient, clientPool, opCache, groupVersionResources, namespace.Name, *namespace.DeletionTimestamp)
if err != nil {
return err

View File

@ -56,11 +56,8 @@ func (n *NamespaceController) Start() error {
return err
}
clientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
resources, err := client.Discovery().ServerPreferredNamespacedResources()
if err != nil {
return err
}
nc := namespacecontroller.NewNamespaceController(client, clientPool, resources, ncResyncPeriod, api.FinalizerKubernetes)
gvrFn := client.Discovery().ServerPreferredNamespacedResources
nc := namespacecontroller.NewNamespaceController(client, clientPool, gvrFn, ncResyncPeriod, api.FinalizerKubernetes)
go nc.Run(ncConcurrency, n.stopCh)
return nil
}