mirror of https://github.com/k3s-io/k3s
convert TPR controller to posthook instead of disable flag
parent
19e9f7e400
commit
1423654295
|
@ -107,8 +107,6 @@ type Config struct {
|
||||||
Tunneler genericapiserver.Tunneler
|
Tunneler genericapiserver.Tunneler
|
||||||
EnableUISupport bool
|
EnableUISupport bool
|
||||||
EnableLogsSupport bool
|
EnableLogsSupport bool
|
||||||
|
|
||||||
disableThirdPartyControllerForTesting bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// EndpointReconcilerConfig holds the endpoint reconciler and endpoint reconciliation interval to be
|
// EndpointReconcilerConfig holds the endpoint reconciler and endpoint reconciliation interval to be
|
||||||
|
@ -130,8 +128,6 @@ type Master struct {
|
||||||
thirdPartyResources map[string]*thirdPartyEntry
|
thirdPartyResources map[string]*thirdPartyEntry
|
||||||
// protects the map
|
// protects the map
|
||||||
thirdPartyResourcesLock sync.RWMutex
|
thirdPartyResourcesLock sync.RWMutex
|
||||||
// Useful for reliable testing. Shouldn't be used otherwise.
|
|
||||||
disableThirdPartyControllerForTesting bool
|
|
||||||
|
|
||||||
// nodeClient is used to back the tunneler
|
// nodeClient is used to back the tunneler
|
||||||
nodeClient coreclient.NodeInterface
|
nodeClient coreclient.NodeInterface
|
||||||
|
@ -205,8 +201,6 @@ func (c completedConfig) New() (*Master, error) {
|
||||||
GenericAPIServer: s,
|
GenericAPIServer: s,
|
||||||
deleteCollectionWorkers: c.DeleteCollectionWorkers,
|
deleteCollectionWorkers: c.DeleteCollectionWorkers,
|
||||||
nodeClient: coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes(),
|
nodeClient: coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes(),
|
||||||
|
|
||||||
disableThirdPartyControllerForTesting: c.disableThirdPartyControllerForTesting,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
restOptionsFactory := restOptionsFactory{
|
restOptionsFactory := restOptionsFactory{
|
||||||
|
@ -246,10 +240,7 @@ func (c completedConfig) New() (*Master, error) {
|
||||||
c.RESTStorageProviders[autoscaling.GroupName] = autoscalingrest.RESTStorageProvider{}
|
c.RESTStorageProviders[autoscaling.GroupName] = autoscalingrest.RESTStorageProvider{}
|
||||||
c.RESTStorageProviders[batch.GroupName] = batchrest.RESTStorageProvider{}
|
c.RESTStorageProviders[batch.GroupName] = batchrest.RESTStorageProvider{}
|
||||||
c.RESTStorageProviders[certificates.GroupName] = certificatesrest.RESTStorageProvider{}
|
c.RESTStorageProviders[certificates.GroupName] = certificatesrest.RESTStorageProvider{}
|
||||||
c.RESTStorageProviders[extensions.GroupName] = extensionsrest.RESTStorageProvider{
|
c.RESTStorageProviders[extensions.GroupName] = extensionsrest.RESTStorageProvider{ResourceInterface: m}
|
||||||
ResourceInterface: m,
|
|
||||||
DisableThirdPartyControllerForTesting: m.disableThirdPartyControllerForTesting,
|
|
||||||
}
|
|
||||||
c.RESTStorageProviders[policy.GroupName] = policyrest.RESTStorageProvider{}
|
c.RESTStorageProviders[policy.GroupName] = policyrest.RESTStorageProvider{}
|
||||||
c.RESTStorageProviders[rbac.GroupName] = &rbacrest.RESTStorageProvider{AuthorizerRBACSuperUser: c.GenericConfig.AuthorizerRBACSuperUser}
|
c.RESTStorageProviders[rbac.GroupName] = &rbacrest.RESTStorageProvider{AuthorizerRBACSuperUser: c.GenericConfig.AuthorizerRBACSuperUser}
|
||||||
c.RESTStorageProviders[storage.GroupName] = storagerest.RESTStorageProvider{}
|
c.RESTStorageProviders[storage.GroupName] = storagerest.RESTStorageProvider{}
|
||||||
|
|
|
@ -98,14 +98,6 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
|
||||||
config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}}
|
config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}}
|
||||||
config.EnableCoreControllers = false
|
config.EnableCoreControllers = false
|
||||||
|
|
||||||
// TODO: this is kind of hacky. The trouble is that the sync loop
|
|
||||||
// runs in a go-routine and there is no way to validate in the test
|
|
||||||
// that the sync routine has actually run. The right answer here
|
|
||||||
// is probably to add some sort of callback that we can register
|
|
||||||
// to validate that it's actually been run, but for now we don't
|
|
||||||
// run the sync routine and register types manually.
|
|
||||||
config.disableThirdPartyControllerForTesting = true
|
|
||||||
|
|
||||||
master, err := config.Complete().New()
|
master, err := config.Complete().New()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||||
package rest
|
package rest
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
@ -25,6 +26,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/api/rest"
|
"k8s.io/kubernetes/pkg/api/rest"
|
||||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||||
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||||
|
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/genericapiserver"
|
"k8s.io/kubernetes/pkg/genericapiserver"
|
||||||
horizontalpodautoscaleretcd "k8s.io/kubernetes/pkg/registry/autoscaling/horizontalpodautoscaler/etcd"
|
horizontalpodautoscaleretcd "k8s.io/kubernetes/pkg/registry/autoscaling/horizontalpodautoscaler/etcd"
|
||||||
jobetcd "k8s.io/kubernetes/pkg/registry/batch/job/etcd"
|
jobetcd "k8s.io/kubernetes/pkg/registry/batch/job/etcd"
|
||||||
|
@ -36,12 +38,12 @@ import (
|
||||||
pspetcd "k8s.io/kubernetes/pkg/registry/extensions/podsecuritypolicy/etcd"
|
pspetcd "k8s.io/kubernetes/pkg/registry/extensions/podsecuritypolicy/etcd"
|
||||||
replicasetetcd "k8s.io/kubernetes/pkg/registry/extensions/replicaset/etcd"
|
replicasetetcd "k8s.io/kubernetes/pkg/registry/extensions/replicaset/etcd"
|
||||||
thirdpartyresourceetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresource/etcd"
|
thirdpartyresourceetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresource/etcd"
|
||||||
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RESTStorageProvider struct {
|
type RESTStorageProvider struct {
|
||||||
ResourceInterface ResourceInterface
|
ResourceInterface ResourceInterface
|
||||||
DisableThirdPartyControllerForTesting bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ genericapiserver.RESTStorageProvider = &RESTStorageProvider{}
|
var _ genericapiserver.RESTStorageProvider = &RESTStorageProvider{}
|
||||||
|
@ -73,17 +75,6 @@ func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource genericapise
|
||||||
}
|
}
|
||||||
if apiResourceConfigSource.ResourceEnabled(version.WithResource("thirdpartyresources")) {
|
if apiResourceConfigSource.ResourceEnabled(version.WithResource("thirdpartyresources")) {
|
||||||
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(restOptionsGetter(extensions.Resource("thirdpartyresources")))
|
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(restOptionsGetter(extensions.Resource("thirdpartyresources")))
|
||||||
thirdPartyControl := ThirdPartyController{
|
|
||||||
master: p.ResourceInterface,
|
|
||||||
thirdPartyResourceRegistry: thirdPartyResourceStorage,
|
|
||||||
}
|
|
||||||
if !p.DisableThirdPartyControllerForTesting {
|
|
||||||
go wait.Forever(func() {
|
|
||||||
if err := thirdPartyControl.SyncResources(); err != nil {
|
|
||||||
glog.Warningf("third party resource sync failed: %v", err)
|
|
||||||
}
|
|
||||||
}, 10*time.Second)
|
|
||||||
}
|
|
||||||
storage["thirdpartyresources"] = thirdPartyResourceStorage
|
storage["thirdpartyresources"] = thirdPartyResourceStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,3 +117,26 @@ func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource genericapise
|
||||||
|
|
||||||
return storage
|
return storage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
|
||||||
|
return "extensions/third-party-resources", p.postStartHookFunc, nil
|
||||||
|
}
|
||||||
|
func (p RESTStorageProvider) postStartHookFunc(hookContext genericapiserver.PostStartHookContext) error {
|
||||||
|
clientset, err := extensionsclient.NewForConfig(hookContext.LoopbackClientConfig)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("unable to initialize clusterroles: %v", err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
thirdPartyControl := ThirdPartyController{
|
||||||
|
master: p.ResourceInterface,
|
||||||
|
client: clientset,
|
||||||
|
}
|
||||||
|
go wait.Forever(func() {
|
||||||
|
if err := thirdPartyControl.SyncResources(); err != nil {
|
||||||
|
glog.Warningf("third party resource sync failed: %v", err)
|
||||||
|
}
|
||||||
|
}, 10*time.Second)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ import (
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||||
thirdpartyresourceetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresource/etcd"
|
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
|
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
|
@ -47,8 +47,8 @@ const thirdpartyprefix = "/apis"
|
||||||
// ThirdPartyController is a control loop that knows how to synchronize ThirdPartyResource objects with
|
// ThirdPartyController is a control loop that knows how to synchronize ThirdPartyResource objects with
|
||||||
// RESTful resources which are present in the API server.
|
// RESTful resources which are present in the API server.
|
||||||
type ThirdPartyController struct {
|
type ThirdPartyController struct {
|
||||||
master ResourceInterface
|
master ResourceInterface
|
||||||
thirdPartyResourceRegistry *thirdpartyresourceetcd.REST
|
client extensionsclient.ThirdPartyResourcesGetter
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncOneResource synchronizes a single resource with RESTful resources on the master
|
// SyncOneResource synchronizes a single resource with RESTful resources on the master
|
||||||
|
@ -68,7 +68,7 @@ func (t *ThirdPartyController) SyncOneResource(rsrc *extensions.ThirdPartyResour
|
||||||
|
|
||||||
// Synchronize all resources with RESTful resources on the master
|
// Synchronize all resources with RESTful resources on the master
|
||||||
func (t *ThirdPartyController) SyncResources() error {
|
func (t *ThirdPartyController) SyncResources() error {
|
||||||
list, err := t.thirdPartyResourceRegistry.List(api.NewDefaultContext(), nil)
|
list, err := t.client.ThirdPartyResources().List(api.ListOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue