Add a generic filter that blocks certain write requests before
StorageVersions are updated during apiserver bootstrap. Also add a poststarthook to the aggregator which updates the StorageVersions via the storageversion.Manager
This commit is contained in:
parent
3694756816
commit
7218978716
@ -34,6 +34,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
genericfeatures "k8s.io/apiserver/pkg/features"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
genericoptions "k8s.io/apiserver/pkg/server/options"
|
||||
@ -67,6 +68,13 @@ func createAggregatorConfig(
|
||||
genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
|
||||
genericConfig.RESTOptionsGetter = nil
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
|
||||
// Add StorageVersionPrecondition handler to aggregator-apiserver.
|
||||
// The handler will block write requests to built-in resources until the
|
||||
// target resources' storage versions are up-to-date.
|
||||
genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition
|
||||
}
|
||||
|
||||
// override genericConfig.AdmissionControl with kube-aggregator's scheme,
|
||||
// because aggregator apiserver should use its own scheme to convert its own resources.
|
||||
err := commandOptions.Admission.ApplyTo(
|
||||
|
@ -0,0 +1,80 @@
|
||||
/*
|
||||
Copyright 2020 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package filters
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/storageversion"
|
||||
_ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// WithStorageVersionPrecondition checks if the storage version barrier has
|
||||
// completed, if not, it only passes the following API requests:
|
||||
// 1. non-resource requests,
|
||||
// 2. read requests,
|
||||
// 3. write requests to the storageversion API,
|
||||
// 4. resources whose StorageVersion is not pending update, including non-persisted resources.
|
||||
func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Manager) http.Handler {
|
||||
if svm == nil {
|
||||
// TODO(roycaihw): switch to warning after the feature graduate to beta/GA
|
||||
klog.V(2).Infof("Storage Version barrier is disabled")
|
||||
return handler
|
||||
}
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
if svm.Completed() {
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
ctx := req.Context()
|
||||
requestInfo, found := request.RequestInfoFrom(ctx)
|
||||
if !found {
|
||||
responsewriters.InternalError(w, req, errors.New("no RequestInfo found in the context"))
|
||||
return
|
||||
}
|
||||
// Allow non-resource requests
|
||||
if !requestInfo.IsResourceRequest {
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
// Allow read requests
|
||||
if requestInfo.Verb == "get" || requestInfo.Verb == "list" || requestInfo.Verb == "watch" {
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
// Allow writes to the storage version API
|
||||
if requestInfo.APIGroup == "internal.apiserver.k8s.io" && requestInfo.Resource == "storageversions" {
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
// If the resource's StorageVersion is not in the to-be-updated list, let it pass.
|
||||
// Non-persisted resources are not in the to-be-updated list, so they will pass.
|
||||
gr := schema.GroupResource{requestInfo.APIGroup, requestInfo.Resource}
|
||||
if !svm.PendingUpdate(gr) {
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
responsewriters.ServiceUnavailabeError(w, req, errors.New(fmt.Sprintf("wait for storage version registration to complete for resource: %v, last seen error: %v", gr, svm.LastUpdateError(gr))))
|
||||
})
|
||||
}
|
@ -76,3 +76,10 @@ func InternalError(w http.ResponseWriter, req *http.Request, err error) {
|
||||
http.StatusInternalServerError)
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
|
||||
// ServiceUnavailabeError renders a simple internal error
|
||||
func ServiceUnavailabeError(w http.ResponseWriter, req *http.Request, err error) {
|
||||
http.Error(w, sanitizer.Replace(fmt.Sprintf("Service Unavailable: %q: %v", req.RequestURI, err)),
|
||||
http.StatusServiceUnavailable)
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
|
@ -62,6 +62,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/apiserver/pkg/server/routes"
|
||||
serverstore "k8s.io/apiserver/pkg/server/storage"
|
||||
"k8s.io/apiserver/pkg/storageversion"
|
||||
"k8s.io/apiserver/pkg/util/feature"
|
||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||
"k8s.io/client-go/informers"
|
||||
@ -227,6 +228,9 @@ type Config struct {
|
||||
|
||||
// APIServerID is the ID of this API server
|
||||
APIServerID string
|
||||
|
||||
// StorageVersionManager holds the storage versions of the API resources installed by this server.
|
||||
StorageVersionManager storageversion.Manager
|
||||
}
|
||||
|
||||
type RecommendedConfig struct {
|
||||
@ -333,6 +337,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
|
||||
// Generic API servers have no inherent long-running subresources
|
||||
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
|
||||
APIServerID: id,
|
||||
StorageVersionManager: storageversion.NewDefaultManager(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -582,7 +587,9 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
||||
|
||||
maxRequestBodyBytes: c.MaxRequestBodyBytes,
|
||||
livezClock: clock.RealClock{},
|
||||
|
||||
APIServerID: c.APIServerID,
|
||||
StorageVersionManager: c.StorageVersionManager,
|
||||
}
|
||||
|
||||
for {
|
||||
@ -703,6 +710,12 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c *Config) http.Handler {
|
||||
// WithStorageVersionPrecondition needs the WithRequestInfo to run first
|
||||
handler := genericapifilters.WithStorageVersionPrecondition(apiHandler, c.StorageVersionManager)
|
||||
return DefaultBuildHandlerChain(handler, c)
|
||||
}
|
||||
|
||||
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
||||
handler := filterlatency.TrackCompleted(apiHandler)
|
||||
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
|
||||
|
@ -24,9 +24,12 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
genericfeatures "k8s.io/apiserver/pkg/features"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/egressselector"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/pkg/version"
|
||||
openapicommon "k8s.io/kube-openapi/pkg/common"
|
||||
|
||||
@ -264,6 +267,34 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
||||
return nil
|
||||
})
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
|
||||
// Spawn a goroutine in aggregator apiserver to update storage version for
|
||||
// all built-in resources
|
||||
s.GenericAPIServer.AddPostStartHookOrDie("built-in-resources-storage-version-updater", func(context genericapiserver.PostStartHookContext) error {
|
||||
// Technically an apiserver only needs to update storage version once during bootstrap.
|
||||
// Reconcile StorageVersion objects every 10 minutes will help in the case that the
|
||||
// StorageVersion objects get accidentally modified/deleted by a different agent. In that
|
||||
// case, the reconciliation ensures future storage migration still works. If nothing gets
|
||||
// changed, the reconciliation update is a noop and gets short-circuited by the apiserver,
|
||||
// therefore won't change the resource version and trigger storage migration.
|
||||
go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) {
|
||||
// All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)
|
||||
// share the same generic apiserver config. The same StorageVersion manager is used
|
||||
// to register all built-in resources when the generic apiservers install APIs.
|
||||
s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(context.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
|
||||
return false, nil
|
||||
}, context.StopCh)
|
||||
// Once the storage version updater finishes the first round of update,
|
||||
// the PostStartHook will return to unblock /healthz. The handler chain
|
||||
// won't block write requests anymore. Check every second since it's not
|
||||
// expensive.
|
||||
wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
|
||||
return s.GenericAPIServer.StorageVersionManager.Completed(), nil
|
||||
}, context.StopCh)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user