Merge pull request #109303 from wojtek-t/clean_storage_shutdown
Cleanup rest storage resources on shutdown
This commit is contained in:
@@ -195,6 +195,12 @@ func (r *StatusREST) New() runtime.Object {
|
||||
return &apiextensions.CustomResourceDefinition{}
|
||||
}
|
||||
|
||||
// Destroy cleans up resources on shutdown.
|
||||
func (r *StatusREST) Destroy() {
|
||||
// Given that underlying store is shared with REST,
|
||||
// we don't destroy it here explicitly.
|
||||
}
|
||||
|
||||
// Get retrieves the object from the storage. It is required to support Patch.
|
||||
func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
|
||||
return r.store.Get(ctx, name, options)
|
||||
|
@@ -462,6 +462,9 @@ func (storage *SimpleRESTStorage) NewList() runtime.Object {
|
||||
return &genericapitesting.SimpleList{}
|
||||
}
|
||||
|
||||
func (storage *SimpleRESTStorage) Destroy() {
|
||||
}
|
||||
|
||||
func (storage *SimpleRESTStorage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
|
||||
storage.checkContext(ctx)
|
||||
storage.created = obj.(*genericapitesting.Simple)
|
||||
@@ -547,6 +550,9 @@ func (s *ConnecterRESTStorage) New() runtime.Object {
|
||||
return &genericapitesting.Simple{}
|
||||
}
|
||||
|
||||
func (s *ConnecterRESTStorage) Destroy() {
|
||||
}
|
||||
|
||||
func (s *ConnecterRESTStorage) Connect(ctx context.Context, id string, options runtime.Object, responder rest.Responder) (http.Handler, error) {
|
||||
s.receivedConnectOptions = options
|
||||
s.receivedID = id
|
||||
@@ -668,6 +674,9 @@ func (storage *SimpleTypedStorage) New() runtime.Object {
|
||||
return storage.baseType
|
||||
}
|
||||
|
||||
func (storage *SimpleTypedStorage) Destroy() {
|
||||
}
|
||||
|
||||
func (storage *SimpleTypedStorage) Get(ctx context.Context, id string, options *metav1.GetOptions) (runtime.Object, error) {
|
||||
storage.checkContext(ctx)
|
||||
return storage.item.DeepCopyObject(), storage.errors["get"]
|
||||
@@ -810,6 +819,9 @@ func (UnimplementedRESTStorage) New() runtime.Object {
|
||||
return &genericapitesting.Simple{}
|
||||
}
|
||||
|
||||
func (UnimplementedRESTStorage) Destroy() {
|
||||
}
|
||||
|
||||
// TestUnimplementedRESTStorage ensures that if a rest.Storage does not implement a given
|
||||
// method, that it is literally not registered with the server. In the past,
|
||||
// we registered everything, and returned method not supported if it didn't support
|
||||
@@ -4322,6 +4334,9 @@ func (storage *SimpleXGSubresourceRESTStorage) New() runtime.Object {
|
||||
return &genericapitesting.SimpleXGSubresource{}
|
||||
}
|
||||
|
||||
func (storage *SimpleXGSubresourceRESTStorage) Destroy() {
|
||||
}
|
||||
|
||||
func (storage *SimpleXGSubresourceRESTStorage) Get(ctx context.Context, id string, options *metav1.GetOptions) (runtime.Object, error) {
|
||||
return storage.item.DeepCopyObject(), nil
|
||||
}
|
||||
|
@@ -70,9 +70,12 @@ func StorageWithCacher() generic.StorageDecorator {
|
||||
if err != nil {
|
||||
return nil, func() {}, err
|
||||
}
|
||||
var once sync.Once
|
||||
destroyFunc := func() {
|
||||
cacher.Stop()
|
||||
d()
|
||||
once.Do(func() {
|
||||
cacher.Stop()
|
||||
d()
|
||||
})
|
||||
}
|
||||
|
||||
// TODO : Remove RegisterStorageCleanup below when PR
|
||||
|
@@ -217,7 +217,10 @@ type Store struct {
|
||||
// If the StorageVersioner is nil, apiserver will leave the
|
||||
// storageVersionHash as empty in the discovery document.
|
||||
StorageVersioner runtime.GroupVersioner
|
||||
// Called to cleanup clients used by the underlying Storage; optional.
|
||||
|
||||
// DestroyFunc cleans up clients used by the underlying Storage; optional.
|
||||
// If set, DestroyFunc has to be implemented in thread-safe way and
|
||||
// be prepared for being called more than once.
|
||||
DestroyFunc func()
|
||||
}
|
||||
|
||||
@@ -279,6 +282,13 @@ func (e *Store) New() runtime.Object {
|
||||
return e.NewFunc()
|
||||
}
|
||||
|
||||
// Destroy cleans up its resources on shutdown.
|
||||
func (e *Store) Destroy() {
|
||||
if e.DestroyFunc != nil {
|
||||
e.DestroyFunc()
|
||||
}
|
||||
}
|
||||
|
||||
// NewList implements rest.Lister.
|
||||
func (e *Store) NewList() runtime.Object {
|
||||
return e.NewListFunc()
|
||||
@@ -1433,11 +1443,14 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
|
||||
if opts.CountMetricPollPeriod > 0 {
|
||||
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod, opts.StorageObjectCountTracker)
|
||||
previousDestroy := e.DestroyFunc
|
||||
var once sync.Once
|
||||
e.DestroyFunc = func() {
|
||||
stopFunc()
|
||||
if previousDestroy != nil {
|
||||
previousDestroy()
|
||||
}
|
||||
once.Do(func() {
|
||||
stopFunc()
|
||||
if previousDestroy != nil {
|
||||
previousDestroy()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -56,6 +56,11 @@ type Storage interface {
|
||||
// New returns an empty object that can be used with Create and Update after request data has been put into it.
|
||||
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
|
||||
New() runtime.Object
|
||||
|
||||
// Destroy cleans up its resources on shutdown.
|
||||
// Destroy has to be implemented in thread-safe way and be prepared
|
||||
// for being called more than once.
|
||||
Destroy()
|
||||
}
|
||||
|
||||
// Scoper indicates what scope the resource is at. It must be specified.
|
||||
@@ -278,6 +283,11 @@ type StandardStorage interface {
|
||||
GracefulDeleter
|
||||
CollectionDeleter
|
||||
Watcher
|
||||
|
||||
// Destroy cleans up its resources on shutdown.
|
||||
// Destroy has to be implemented in thread-safe way and be prepared
|
||||
// for being called more than once.
|
||||
Destroy()
|
||||
}
|
||||
|
||||
// Redirector know how to return a remote resource's location.
|
||||
|
@@ -110,6 +110,9 @@ func (r removedInStorage) New() runtime.Object {
|
||||
return removedInObj{major: r.major, minor: r.minor}
|
||||
}
|
||||
|
||||
func (r removedInStorage) Destroy() {
|
||||
}
|
||||
|
||||
type neverRemovedObj struct {
|
||||
}
|
||||
|
||||
|
@@ -89,6 +89,14 @@ type APIGroupInfo struct {
|
||||
StaticOpenAPISpec *spec.Swagger
|
||||
}
|
||||
|
||||
func (a *APIGroupInfo) destroyStorage() {
|
||||
for _, stores := range a.VersionedResourcesStorageMap {
|
||||
for _, store := range stores {
|
||||
store.Destroy()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GenericAPIServer contains state for a Kubernetes cluster api server.
|
||||
type GenericAPIServer struct {
|
||||
// discoveryAddresses is used to build cluster IPs for discovery.
|
||||
@@ -222,6 +230,9 @@ type GenericAPIServer struct {
|
||||
// lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver.
|
||||
lifecycleSignals lifecycleSignals
|
||||
|
||||
// destroyFns contains a list of functions that should be called on shutdown to clean up resources.
|
||||
destroyFns []func()
|
||||
|
||||
// muxAndDiscoveryCompleteSignals holds signals that indicate all known HTTP paths have been registered.
|
||||
// it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler.
|
||||
// it is exposed for easier composition of the individual servers.
|
||||
@@ -264,6 +275,11 @@ type DelegationTarget interface {
|
||||
|
||||
// MuxAndDiscoveryCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed.
|
||||
MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{}
|
||||
|
||||
// Destroy cleans up its resources on shutdown.
|
||||
// Destroy has to be implemented in thread-safe way and be prepared
|
||||
// for being called more than once.
|
||||
Destroy()
|
||||
}
|
||||
|
||||
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
|
||||
@@ -301,6 +317,18 @@ func (s *GenericAPIServer) MuxAndDiscoveryCompleteSignals() map[string]<-chan st
|
||||
return s.muxAndDiscoveryCompleteSignals
|
||||
}
|
||||
|
||||
// Destroy cleans up all its and its delegation target resources on shutdown.
|
||||
// It starts with destroying its own resources and later proceeds with
|
||||
// its delegation target.
|
||||
func (s *GenericAPIServer) Destroy() {
|
||||
for _, destroyFn := range s.destroyFns {
|
||||
destroyFn()
|
||||
}
|
||||
if s.delegationTarget != nil {
|
||||
s.delegationTarget.Destroy()
|
||||
}
|
||||
}
|
||||
|
||||
type emptyDelegate struct {
|
||||
// handler is called at the end of the delegation chain
|
||||
// when a request has been made against an unregistered HTTP path the individual servers will simply pass it through until it reaches the handler.
|
||||
@@ -340,6 +368,8 @@ func (s emptyDelegate) PrepareRun() preparedGenericAPIServer {
|
||||
func (s emptyDelegate) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} {
|
||||
return map[string]<-chan struct{}{}
|
||||
}
|
||||
func (s emptyDelegate) Destroy() {
|
||||
}
|
||||
|
||||
// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
|
||||
type preparedGenericAPIServer struct {
|
||||
@@ -395,6 +425,9 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
||||
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
|
||||
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
|
||||
|
||||
// Clean up resources on shutdown.
|
||||
defer s.Destroy()
|
||||
|
||||
// spawn a new goroutine for closing the MuxAndDiscoveryComplete signal
|
||||
// registration happens during construction of the generic api server
|
||||
// the last server in the chain aggregates signals from the previous instances
|
||||
@@ -584,6 +617,8 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A
|
||||
resourceInfos = append(resourceInfos, r...)
|
||||
}
|
||||
|
||||
s.destroyFns = append(s.destroyFns, apiGroupInfo.destroyStorage)
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) &&
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
|
||||
// API installation happens before we start listening on the handlers,
|
||||
@@ -595,6 +630,9 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A
|
||||
return nil
|
||||
}
|
||||
|
||||
// InstallLegacyAPIGroup exposes the given legacy api group in the API.
|
||||
// The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the
|
||||
// underlying storage will be destroyed on this servers shutdown.
|
||||
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
|
||||
if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
|
||||
return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
|
||||
@@ -616,7 +654,9 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo
|
||||
return nil
|
||||
}
|
||||
|
||||
// Exposes given api groups in the API.
|
||||
// InstallAPIGroups exposes given api groups in the API.
|
||||
// The <apiGroupInfos> passed into this function shouldn't be used elsewhere as the
|
||||
// underlying storage will be destroyed on this servers shutdown.
|
||||
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
|
||||
for _, apiGroupInfo := range apiGroupInfos {
|
||||
// Do not register empty group or empty version. Doing so claims /apis/ for the wrong entity to be returned.
|
||||
@@ -669,7 +709,9 @@ func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// Exposes the given api group in the API.
|
||||
// InstallAPIGroup exposes the given api group in the API.
|
||||
// The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the
|
||||
// underlying storage will be destroyed on this servers shutdown.
|
||||
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
|
||||
return s.InstallAPIGroups(apiGroupInfo)
|
||||
}
|
||||
|
@@ -544,6 +544,9 @@ func (p *testGetterStorage) New() runtime.Object {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *testGetterStorage) Destroy() {
|
||||
}
|
||||
|
||||
func (p *testGetterStorage) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -565,6 +568,9 @@ func (p *testNoVerbsStorage) New() runtime.Object {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *testNoVerbsStorage) Destroy() {
|
||||
}
|
||||
|
||||
func fakeVersion() version.Info {
|
||||
return version.Info{
|
||||
Major: "42",
|
||||
|
@@ -147,6 +147,12 @@ func (r *StatusREST) New() runtime.Object {
|
||||
return &apiregistration.APIService{}
|
||||
}
|
||||
|
||||
// Destroy cleans up resources on shutdown.
|
||||
func (r *StatusREST) Destroy() {
|
||||
// Given that underlying store is shared with REST,
|
||||
// we don't destroy it here explicitly.
|
||||
}
|
||||
|
||||
// Get retrieves the object from the storage. It is required to support Patch.
|
||||
func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
|
||||
return r.store.Get(ctx, name, options)
|
||||
|
@@ -20,7 +20,6 @@ import (
|
||||
"fmt"
|
||||
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
)
|
||||
|
||||
// REST implements a RESTStorage for API services against etcd
|
||||
@@ -31,7 +30,7 @@ type REST struct {
|
||||
// RESTInPeace is just a simple function that panics on error.
|
||||
// Otherwise returns the given storage object. It is meant to be
|
||||
// a wrapper for wardle registries.
|
||||
func RESTInPeace(storage rest.StandardStorage, err error) rest.StandardStorage {
|
||||
func RESTInPeace(storage *REST, err error) *REST {
|
||||
if err != nil {
|
||||
err = fmt.Errorf("unable to create REST storage for a resource due to %v, will die", err)
|
||||
panic(err)
|
||||
|
Reference in New Issue
Block a user