Merge pull request #34979 from deads2k/tpr-04-storage-post-hook
Automatic merge from submit-queue convert TPR controller to posthook instead of disable flag Converts the third party resource controller into a posthook using a loopback client instead going direct to etcd. This let's us eliminate more flags and special-casing during initialization. Also, using a client brings us closer to building this without side-effects for downstream composers.
This commit is contained in:
		| @@ -107,8 +107,6 @@ type Config struct { | ||||
| 	Tunneler          genericapiserver.Tunneler | ||||
| 	EnableUISupport   bool | ||||
| 	EnableLogsSupport bool | ||||
|  | ||||
| 	disableThirdPartyControllerForTesting bool | ||||
| } | ||||
|  | ||||
| // EndpointReconcilerConfig holds the endpoint reconciler and endpoint reconciliation interval to be | ||||
| @@ -130,8 +128,6 @@ type Master struct { | ||||
| 	thirdPartyResources map[string]*thirdPartyEntry | ||||
| 	// protects the map | ||||
| 	thirdPartyResourcesLock sync.RWMutex | ||||
| 	// Useful for reliable testing.  Shouldn't be used otherwise. | ||||
| 	disableThirdPartyControllerForTesting bool | ||||
|  | ||||
| 	// nodeClient is used to back the tunneler | ||||
| 	nodeClient coreclient.NodeInterface | ||||
| @@ -205,8 +201,6 @@ func (c completedConfig) New() (*Master, error) { | ||||
| 		GenericAPIServer:        s, | ||||
| 		deleteCollectionWorkers: c.DeleteCollectionWorkers, | ||||
| 		nodeClient:              coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes(), | ||||
|  | ||||
| 		disableThirdPartyControllerForTesting: c.disableThirdPartyControllerForTesting, | ||||
| 	} | ||||
|  | ||||
| 	restOptionsFactory := restOptionsFactory{ | ||||
| @@ -246,10 +240,7 @@ func (c completedConfig) New() (*Master, error) { | ||||
| 	c.RESTStorageProviders[autoscaling.GroupName] = autoscalingrest.RESTStorageProvider{} | ||||
| 	c.RESTStorageProviders[batch.GroupName] = batchrest.RESTStorageProvider{} | ||||
| 	c.RESTStorageProviders[certificates.GroupName] = certificatesrest.RESTStorageProvider{} | ||||
| 	c.RESTStorageProviders[extensions.GroupName] = extensionsrest.RESTStorageProvider{ | ||||
| 		ResourceInterface:                     m, | ||||
| 		DisableThirdPartyControllerForTesting: m.disableThirdPartyControllerForTesting, | ||||
| 	} | ||||
| 	c.RESTStorageProviders[extensions.GroupName] = extensionsrest.RESTStorageProvider{ResourceInterface: m} | ||||
| 	c.RESTStorageProviders[policy.GroupName] = policyrest.RESTStorageProvider{} | ||||
| 	c.RESTStorageProviders[rbac.GroupName] = &rbacrest.RESTStorageProvider{AuthorizerRBACSuperUser: c.GenericConfig.AuthorizerRBACSuperUser} | ||||
| 	c.RESTStorageProviders[storage.GroupName] = storagerest.RESTStorageProvider{} | ||||
|   | ||||
| @@ -99,14 +99,6 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. | ||||
| 	config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}} | ||||
| 	config.EnableCoreControllers = false | ||||
|  | ||||
| 	// TODO: this is kind of hacky.  The trouble is that the sync loop | ||||
| 	// runs in a go-routine and there is no way to validate in the test | ||||
| 	// that the sync routine has actually run.  The right answer here | ||||
| 	// is probably to add some sort of callback that we can register | ||||
| 	// to validate that it's actually been run, but for now we don't | ||||
| 	// run the sync routine and register types manually. | ||||
| 	config.disableThirdPartyControllerForTesting = true | ||||
|  | ||||
| 	master, err := config.Complete().New() | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
|   | ||||
| @@ -17,6 +17,7 @@ limitations under the License. | ||||
| package rest | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
| @@ -25,6 +26,7 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/api/rest" | ||||
| 	"k8s.io/kubernetes/pkg/apis/extensions" | ||||
| 	extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" | ||||
| 	extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" | ||||
| 	"k8s.io/kubernetes/pkg/genericapiserver" | ||||
| 	horizontalpodautoscaleretcd "k8s.io/kubernetes/pkg/registry/autoscaling/horizontalpodautoscaler/etcd" | ||||
| 	jobetcd "k8s.io/kubernetes/pkg/registry/batch/job/etcd" | ||||
| @@ -36,12 +38,12 @@ import ( | ||||
| 	pspetcd "k8s.io/kubernetes/pkg/registry/extensions/podsecuritypolicy/etcd" | ||||
| 	replicasetetcd "k8s.io/kubernetes/pkg/registry/extensions/replicaset/etcd" | ||||
| 	thirdpartyresourceetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresource/etcd" | ||||
| 	utilruntime "k8s.io/kubernetes/pkg/util/runtime" | ||||
| 	"k8s.io/kubernetes/pkg/util/wait" | ||||
| ) | ||||
|  | ||||
| type RESTStorageProvider struct { | ||||
| 	ResourceInterface                     ResourceInterface | ||||
| 	DisableThirdPartyControllerForTesting bool | ||||
| 	ResourceInterface ResourceInterface | ||||
| } | ||||
|  | ||||
| var _ genericapiserver.RESTStorageProvider = &RESTStorageProvider{} | ||||
| @@ -73,17 +75,6 @@ func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource genericapise | ||||
| 	} | ||||
| 	if apiResourceConfigSource.ResourceEnabled(version.WithResource("thirdpartyresources")) { | ||||
| 		thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(restOptionsGetter(extensions.Resource("thirdpartyresources"))) | ||||
| 		thirdPartyControl := ThirdPartyController{ | ||||
| 			master: p.ResourceInterface, | ||||
| 			thirdPartyResourceRegistry: thirdPartyResourceStorage, | ||||
| 		} | ||||
| 		if !p.DisableThirdPartyControllerForTesting { | ||||
| 			go wait.Forever(func() { | ||||
| 				if err := thirdPartyControl.SyncResources(); err != nil { | ||||
| 					glog.Warningf("third party resource sync failed: %v", err) | ||||
| 				} | ||||
| 			}, 10*time.Second) | ||||
| 		} | ||||
| 		storage["thirdpartyresources"] = thirdPartyResourceStorage | ||||
| 	} | ||||
|  | ||||
| @@ -126,3 +117,26 @@ func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource genericapise | ||||
|  | ||||
| 	return storage | ||||
| } | ||||
|  | ||||
| func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) { | ||||
| 	return "extensions/third-party-resources", p.postStartHookFunc, nil | ||||
| } | ||||
| func (p RESTStorageProvider) postStartHookFunc(hookContext genericapiserver.PostStartHookContext) error { | ||||
| 	clientset, err := extensionsclient.NewForConfig(hookContext.LoopbackClientConfig) | ||||
| 	if err != nil { | ||||
| 		utilruntime.HandleError(fmt.Errorf("unable to initialize clusterroles: %v", err)) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	thirdPartyControl := ThirdPartyController{ | ||||
| 		master: p.ResourceInterface, | ||||
| 		client: clientset, | ||||
| 	} | ||||
| 	go wait.Forever(func() { | ||||
| 		if err := thirdPartyControl.SyncResources(); err != nil { | ||||
| 			glog.Warningf("third party resource sync failed: %v", err) | ||||
| 		} | ||||
| 	}, 10*time.Second) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -22,7 +22,7 @@ import ( | ||||
|  | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	"k8s.io/kubernetes/pkg/apis/extensions" | ||||
| 	thirdpartyresourceetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresource/etcd" | ||||
| 	extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" | ||||
| 	"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata" | ||||
| 	"k8s.io/kubernetes/pkg/runtime" | ||||
| 	"k8s.io/kubernetes/pkg/util/sets" | ||||
| @@ -47,8 +47,8 @@ const thirdpartyprefix = "/apis" | ||||
| // ThirdPartyController is a control loop that knows how to synchronize ThirdPartyResource objects with | ||||
| // RESTful resources which are present in the API server. | ||||
| type ThirdPartyController struct { | ||||
| 	master                     ResourceInterface | ||||
| 	thirdPartyResourceRegistry *thirdpartyresourceetcd.REST | ||||
| 	master ResourceInterface | ||||
| 	client extensionsclient.ThirdPartyResourcesGetter | ||||
| } | ||||
|  | ||||
| // SyncOneResource synchronizes a single resource with RESTful resources on the master | ||||
| @@ -68,7 +68,7 @@ func (t *ThirdPartyController) SyncOneResource(rsrc *extensions.ThirdPartyResour | ||||
|  | ||||
| // Synchronize all resources with RESTful resources on the master | ||||
| func (t *ThirdPartyController) SyncResources() error { | ||||
| 	list, err := t.thirdPartyResourceRegistry.List(api.NewDefaultContext(), nil) | ||||
| 	list, err := t.client.ThirdPartyResources().List(api.ListOptions{}) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue