Merge pull request #125483 from wojtek-t/storage_readiness_hook
Implement resilient watchcache initialization post-start-hook
This commit is contained in:
		| @@ -139,16 +139,17 @@ func TestAddFlags(t *testing.T) { | ||||
| 	expected := &ServerRunOptions{ | ||||
| 		Options: &controlplaneapiserver.Options{ | ||||
| 			GenericServerRunOptions: &apiserveroptions.ServerRunOptions{ | ||||
| 				AdvertiseAddress:            netutils.ParseIPSloppy("192.168.10.10"), | ||||
| 				CorsAllowedOriginList:       []string{"10.10.10.100", "10.10.10.200"}, | ||||
| 				MaxRequestsInFlight:         400, | ||||
| 				MaxMutatingRequestsInFlight: 200, | ||||
| 				RequestTimeout:              time.Duration(2) * time.Minute, | ||||
| 				MinRequestTimeout:           1800, | ||||
| 				JSONPatchMaxCopyBytes:       int64(3 * 1024 * 1024), | ||||
| 				MaxRequestBodyBytes:         int64(3 * 1024 * 1024), | ||||
| 				ComponentGlobalsRegistry:    componentGlobalsRegistry, | ||||
| 				ComponentName:               utilversion.DefaultKubeComponent, | ||||
| 				AdvertiseAddress:             netutils.ParseIPSloppy("192.168.10.10"), | ||||
| 				CorsAllowedOriginList:        []string{"10.10.10.100", "10.10.10.200"}, | ||||
| 				MaxRequestsInFlight:          400, | ||||
| 				MaxMutatingRequestsInFlight:  200, | ||||
| 				RequestTimeout:               time.Duration(2) * time.Minute, | ||||
| 				MinRequestTimeout:            1800, | ||||
| 				StorageInitializationTimeout: time.Minute, | ||||
| 				JSONPatchMaxCopyBytes:        int64(3 * 1024 * 1024), | ||||
| 				MaxRequestBodyBytes:          int64(3 * 1024 * 1024), | ||||
| 				ComponentGlobalsRegistry:     componentGlobalsRegistry, | ||||
| 				ComponentName:                utilversion.DefaultKubeComponent, | ||||
| 			}, | ||||
| 			Admission: &kubeoptions.AdmissionOptions{ | ||||
| 				GenericAdmission: &apiserveroptions.AdmissionOptions{ | ||||
|   | ||||
| @@ -123,16 +123,17 @@ func TestAddFlags(t *testing.T) { | ||||
| 	// This is a snapshot of expected options parsed by args. | ||||
| 	expected := &Options{ | ||||
| 		GenericServerRunOptions: &apiserveroptions.ServerRunOptions{ | ||||
| 			AdvertiseAddress:            netutils.ParseIPSloppy("192.168.10.10"), | ||||
| 			CorsAllowedOriginList:       []string{"10.10.10.100", "10.10.10.200"}, | ||||
| 			MaxRequestsInFlight:         400, | ||||
| 			MaxMutatingRequestsInFlight: 200, | ||||
| 			RequestTimeout:              time.Duration(2) * time.Minute, | ||||
| 			MinRequestTimeout:           1800, | ||||
| 			JSONPatchMaxCopyBytes:       int64(3 * 1024 * 1024), | ||||
| 			MaxRequestBodyBytes:         int64(3 * 1024 * 1024), | ||||
| 			ComponentGlobalsRegistry:    componentGlobalsRegistry, | ||||
| 			ComponentName:               utilversion.DefaultKubeComponent, | ||||
| 			AdvertiseAddress:             netutils.ParseIPSloppy("192.168.10.10"), | ||||
| 			CorsAllowedOriginList:        []string{"10.10.10.100", "10.10.10.200"}, | ||||
| 			MaxRequestsInFlight:          400, | ||||
| 			MaxMutatingRequestsInFlight:  200, | ||||
| 			RequestTimeout:               time.Duration(2) * time.Minute, | ||||
| 			MinRequestTimeout:            1800, | ||||
| 			StorageInitializationTimeout: time.Minute, | ||||
| 			JSONPatchMaxCopyBytes:        int64(3 * 1024 * 1024), | ||||
| 			MaxRequestBodyBytes:          int64(3 * 1024 * 1024), | ||||
| 			ComponentGlobalsRegistry:     componentGlobalsRegistry, | ||||
| 			ComponentName:                utilversion.DefaultKubeComponent, | ||||
| 		}, | ||||
| 		Admission: &kubeoptions.AdmissionOptions{ | ||||
| 			GenericAdmission: &apiserveroptions.AdmissionOptions{ | ||||
|   | ||||
| @@ -245,6 +245,10 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.WatchCacheInitializationPostStartHook) { | ||||
| 		s.GenericAPIServer.AddPostStartHookOrDie("storage-readiness", s.GenericAPIServer.StorageReadinessHook.Hook) | ||||
| 	} | ||||
|  | ||||
| 	s.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error { | ||||
| 		go legacytokentracking.NewController(client).Run(hookContext.StopCh) | ||||
| 		return nil | ||||
|   | ||||
| @@ -1275,6 +1275,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS | ||||
|  | ||||
| 	genericfeatures.WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, | ||||
|  | ||||
| 	genericfeatures.WatchCacheInitializationPostStartHook: {Default: false, PreRelease: featuregate.Beta}, | ||||
|  | ||||
| 	genericfeatures.WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta}, | ||||
|  | ||||
| 	genericfeatures.WatchList: {Default: true, PreRelease: featuregate.Beta}, | ||||
|   | ||||
| @@ -290,6 +290,12 @@ const ( | ||||
| 	// Enables support for watch bookmark events. | ||||
| 	WatchBookmark featuregate.Feature = "WatchBookmark" | ||||
|  | ||||
| 	// owner: @wojtek-t | ||||
| 	// beta: v1.31 | ||||
| 	// | ||||
| 	// Enables post-start-hook for storage readiness | ||||
| 	WatchCacheInitializationPostStartHook featuregate.Feature = "WatchCacheInitializationPostStartHook" | ||||
|  | ||||
| 	// owner: @serathius | ||||
| 	// beta: 1.30 | ||||
| 	// Enables watches without resourceVersion to be served from storage. | ||||
| @@ -408,6 +414,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS | ||||
|  | ||||
| 	WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, | ||||
|  | ||||
| 	WatchCacheInitializationPostStartHook: {Default: false, PreRelease: featuregate.Beta}, | ||||
|  | ||||
| 	WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta}, | ||||
|  | ||||
| 	InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha}, | ||||
|   | ||||
| @@ -226,6 +226,10 @@ type Store struct { | ||||
| 	// storageVersionHash as empty in the discovery document. | ||||
| 	StorageVersioner runtime.GroupVersioner | ||||
|  | ||||
| 	// ReadinessCheckFunc checks if the storage is ready for accepting requests. | ||||
| 	// The field is optional, if set needs to be thread-safe. | ||||
| 	ReadinessCheckFunc func() error | ||||
|  | ||||
| 	// DestroyFunc cleans up clients used by the underlying Storage; optional. | ||||
| 	// If set, DestroyFunc has to be implemented in thread-safe way and | ||||
| 	// be prepared for being called more than once. | ||||
| @@ -234,6 +238,7 @@ type Store struct { | ||||
|  | ||||
| // Note: the rest.StandardStorage interface aggregates the common REST verbs | ||||
| var _ rest.StandardStorage = &Store{} | ||||
| var _ rest.StorageWithReadiness = &Store{} | ||||
| var _ rest.TableConvertor = &Store{} | ||||
| var _ GenericStore = &Store{} | ||||
|  | ||||
| @@ -292,6 +297,14 @@ func (e *Store) New() runtime.Object { | ||||
| 	return e.NewFunc() | ||||
| } | ||||
|  | ||||
| // ReadinessCheck checks if the storage is ready for accepting requests. | ||||
| func (e *Store) ReadinessCheck() error { | ||||
| 	if e.ReadinessCheckFunc != nil { | ||||
| 		return e.ReadinessCheckFunc() | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Destroy cleans up its resources on shutdown. | ||||
| func (e *Store) Destroy() { | ||||
| 	if e.DestroyFunc != nil { | ||||
| @@ -1614,6 +1627,9 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	if e.Storage.Storage != nil { | ||||
| 		e.ReadinessCheckFunc = e.Storage.Storage.ReadinessCheck | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -52,6 +52,8 @@ import ( | ||||
| // Storage is a generic interface for RESTful storage services. | ||||
| // Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected | ||||
| // that objects may implement any of the below interfaces. | ||||
| // | ||||
| // Consider using StorageWithReadiness whenever possible. | ||||
| type Storage interface { | ||||
| 	// New returns an empty object that can be used with Create and Update after request data has been put into it. | ||||
| 	// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) | ||||
| @@ -63,6 +65,14 @@ type Storage interface { | ||||
| 	Destroy() | ||||
| } | ||||
|  | ||||
| // StorageWithReadiness extends Storage interface with the readiness check. | ||||
| type StorageWithReadiness interface { | ||||
| 	Storage | ||||
|  | ||||
| 	// ReadinessCheck allows for checking storage readiness. | ||||
| 	ReadinessCheck() error | ||||
| } | ||||
|  | ||||
| // Scoper indicates what scope the resource is at. It must be specified. | ||||
| // It is usually provided automatically based on your strategy. | ||||
| type Scoper interface { | ||||
|   | ||||
| @@ -216,6 +216,10 @@ type Config struct { | ||||
| 	// twice this value.  Note that it is up to the request handlers to ignore or honor this timeout. In seconds. | ||||
| 	MinRequestTimeout int | ||||
|  | ||||
| 	// StorageInitializationTimeout defines the maximum amount of time to wait for storage initialization | ||||
| 	// before declaring apiserver ready. | ||||
| 	StorageInitializationTimeout time.Duration | ||||
|  | ||||
| 	// This represents the maximum amount of time it should take for apiserver to complete its startup | ||||
| 	// sequence and become healthy. From apiserver's start time to when this amount of time has | ||||
| 	// elapsed, /livez will assume that unfinished post-start hooks will complete successfully and | ||||
| @@ -426,6 +430,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { | ||||
| 		MaxMutatingRequestsInFlight:    200, | ||||
| 		RequestTimeout:                 time.Duration(60) * time.Second, | ||||
| 		MinRequestTimeout:              1800, | ||||
| 		StorageInitializationTimeout:   time.Minute, | ||||
| 		LivezGracePeriod:               time.Duration(0), | ||||
| 		ShutdownDelayDuration:          time.Duration(0), | ||||
| 		// 1.5MB is the default client request size in bytes | ||||
| @@ -824,6 +829,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G | ||||
| 		ShutdownSendRetryAfter: c.ShutdownSendRetryAfter, | ||||
|  | ||||
| 		APIServerID:           c.APIServerID, | ||||
| 		StorageReadinessHook:  NewStorageReadinessHook(c.StorageInitializationTimeout), | ||||
| 		StorageVersionManager: c.StorageVersionManager, | ||||
|  | ||||
| 		EffectiveVersion: c.EffectiveVersion, | ||||
|   | ||||
| @@ -233,6 +233,10 @@ type GenericAPIServer struct { | ||||
| 	// APIServerID is the ID of this API server | ||||
| 	APIServerID string | ||||
|  | ||||
| 	// StorageReadinessHook implements post-start-hook functionality for checking readiness | ||||
| 	// of underlying storage for registered resources. | ||||
| 	StorageReadinessHook *StorageReadinessHook | ||||
|  | ||||
| 	// StorageVersionManager holds the storage versions of the API resources installed by this server. | ||||
| 	StorageVersionManager storageversion.Manager | ||||
|  | ||||
| @@ -844,6 +848,7 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo | ||||
| 	} else { | ||||
| 		s.Handler.GoRestfulContainer.Add(legacyRootAPIHandler.WebService()) | ||||
| 	} | ||||
| 	s.registerStorageReadinessCheck("", apiGroupInfo) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
| @@ -902,10 +907,28 @@ func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) erro | ||||
|  | ||||
| 		s.DiscoveryGroupManager.AddGroup(apiGroup) | ||||
| 		s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService()) | ||||
| 		s.registerStorageReadinessCheck(apiGroupInfo.PrioritizedVersions[0].Group, apiGroupInfo) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // registerStorageReadinessCheck registers the readiness checks for all underlying storages | ||||
| // for a given APIGroup. | ||||
| func (s *GenericAPIServer) registerStorageReadinessCheck(groupName string, apiGroupInfo *APIGroupInfo) { | ||||
| 	for version, storageMap := range apiGroupInfo.VersionedResourcesStorageMap { | ||||
| 		for resource, storage := range storageMap { | ||||
| 			if withReadiness, ok := storage.(rest.StorageWithReadiness); ok { | ||||
| 				gvr := metav1.GroupVersionResource{ | ||||
| 					Group:    groupName, | ||||
| 					Version:  version, | ||||
| 					Resource: resource, | ||||
| 				} | ||||
| 				s.StorageReadinessHook.RegisterStorage(gvr, withReadiness) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // InstallAPIGroup exposes the given api group in the API. | ||||
| // The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the | ||||
| // underlying storage will be destroyed on this servers shutdown. | ||||
|   | ||||
| @@ -47,16 +47,17 @@ const ( | ||||
| type ServerRunOptions struct { | ||||
| 	AdvertiseAddress net.IP | ||||
|  | ||||
| 	CorsAllowedOriginList       []string | ||||
| 	HSTSDirectives              []string | ||||
| 	ExternalHost                string | ||||
| 	MaxRequestsInFlight         int | ||||
| 	MaxMutatingRequestsInFlight int | ||||
| 	RequestTimeout              time.Duration | ||||
| 	GoawayChance                float64 | ||||
| 	LivezGracePeriod            time.Duration | ||||
| 	MinRequestTimeout           int | ||||
| 	ShutdownDelayDuration       time.Duration | ||||
| 	CorsAllowedOriginList        []string | ||||
| 	HSTSDirectives               []string | ||||
| 	ExternalHost                 string | ||||
| 	MaxRequestsInFlight          int | ||||
| 	MaxMutatingRequestsInFlight  int | ||||
| 	RequestTimeout               time.Duration | ||||
| 	GoawayChance                 float64 | ||||
| 	LivezGracePeriod             time.Duration | ||||
| 	MinRequestTimeout            int | ||||
| 	StorageInitializationTimeout time.Duration | ||||
| 	ShutdownDelayDuration        time.Duration | ||||
| 	// We intentionally did not add a flag for this option. Users of the | ||||
| 	// apiserver library can wire it to a flag. | ||||
| 	JSONPatchMaxCopyBytes int64 | ||||
| @@ -116,6 +117,7 @@ func NewServerRunOptionsForComponent(componentName string, componentGlobalsRegis | ||||
| 		RequestTimeout:                      defaults.RequestTimeout, | ||||
| 		LivezGracePeriod:                    defaults.LivezGracePeriod, | ||||
| 		MinRequestTimeout:                   defaults.MinRequestTimeout, | ||||
| 		StorageInitializationTimeout:        defaults.StorageInitializationTimeout, | ||||
| 		ShutdownDelayDuration:               defaults.ShutdownDelayDuration, | ||||
| 		ShutdownWatchTerminationGracePeriod: defaults.ShutdownWatchTerminationGracePeriod, | ||||
| 		JSONPatchMaxCopyBytes:               defaults.JSONPatchMaxCopyBytes, | ||||
| @@ -140,6 +142,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error { | ||||
| 	c.RequestTimeout = s.RequestTimeout | ||||
| 	c.GoawayChance = s.GoawayChance | ||||
| 	c.MinRequestTimeout = s.MinRequestTimeout | ||||
| 	c.StorageInitializationTimeout = s.StorageInitializationTimeout | ||||
| 	c.ShutdownDelayDuration = s.ShutdownDelayDuration | ||||
| 	c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes | ||||
| 	c.MaxRequestBodyBytes = s.MaxRequestBodyBytes | ||||
| @@ -197,6 +200,10 @@ func (s *ServerRunOptions) Validate() []error { | ||||
| 		errors = append(errors, fmt.Errorf("--min-request-timeout can not be negative value")) | ||||
| 	} | ||||
|  | ||||
| 	if s.StorageInitializationTimeout < 0 { | ||||
| 		errors = append(errors, fmt.Errorf("--storage-initialization-timeout can not be negative value")) | ||||
| 	} | ||||
|  | ||||
| 	if s.ShutdownDelayDuration < 0 { | ||||
| 		errors = append(errors, fmt.Errorf("--shutdown-delay-duration can not be negative value")) | ||||
| 	} | ||||
| @@ -350,6 +357,9 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { | ||||
| 		"handler, which picks a randomized value above this number as the connection timeout, "+ | ||||
| 		"to spread out load.") | ||||
|  | ||||
| 	fs.DurationVar(&s.StorageInitializationTimeout, "storage-initialization-timeout", s.StorageInitializationTimeout, | ||||
| 		"Maximum amount of time to wait for storage initialization before declaring apiserver ready. Defaults to 1m.") | ||||
|  | ||||
| 	fs.DurationVar(&s.ShutdownDelayDuration, "shutdown-delay-duration", s.ShutdownDelayDuration, ""+ | ||||
| 		"Time to delay the termination. During that time the server keeps serving requests normally. The endpoints /healthz and /livez "+ | ||||
| 		"will return success, but /readyz immediately returns failure. Graceful termination starts after this delay "+ | ||||
|   | ||||
| @@ -0,0 +1,91 @@ | ||||
| /* | ||||
| Copyright 2024 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package server | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"k8s.io/apiserver/pkg/registry/rest" | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
|  | ||||
| // StorageReadinessHook implements PostStartHook functionality for checking readiness | ||||
| // of underlying storage for registered resources. | ||||
| type StorageReadinessHook struct { | ||||
| 	timeout time.Duration | ||||
|  | ||||
| 	lock   sync.Mutex | ||||
| 	checks map[string]func() error | ||||
| } | ||||
|  | ||||
| // NewStorageReadinessHook created new StorageReadinessHook. | ||||
| func NewStorageReadinessHook(timeout time.Duration) *StorageReadinessHook { | ||||
| 	return &StorageReadinessHook{ | ||||
| 		checks:  make(map[string]func() error), | ||||
| 		timeout: timeout, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (h *StorageReadinessHook) RegisterStorage(gvr metav1.GroupVersionResource, storage rest.StorageWithReadiness) { | ||||
| 	h.lock.Lock() | ||||
| 	defer h.lock.Unlock() | ||||
|  | ||||
| 	if _, ok := h.checks[gvr.String()]; !ok { | ||||
| 		h.checks[gvr.String()] = storage.ReadinessCheck | ||||
| 	} else { | ||||
| 		klog.Errorf("Registering storage readiness hook for %s again: ", gvr.String()) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (h *StorageReadinessHook) check() bool { | ||||
| 	h.lock.Lock() | ||||
| 	defer h.lock.Unlock() | ||||
|  | ||||
| 	failedChecks := []string{} | ||||
| 	for gvr, check := range h.checks { | ||||
| 		if err := check(); err != nil { | ||||
| 			failedChecks = append(failedChecks, gvr) | ||||
| 		} | ||||
| 	} | ||||
| 	if len(failedChecks) == 0 { | ||||
| 		klog.Infof("Storage is ready for all registered resources") | ||||
| 		return true | ||||
| 	} | ||||
| 	klog.V(4).Infof("Storage is not ready for: %v", failedChecks) | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (h *StorageReadinessHook) Hook(ctx PostStartHookContext) error { | ||||
| 	deadlineCtx, cancel := context.WithTimeout(ctx, h.timeout) | ||||
| 	defer cancel() | ||||
| 	err := wait.PollUntilContextCancel(deadlineCtx, 100*time.Millisecond, true, | ||||
| 		func(_ context.Context) (bool, error) { | ||||
| 			if ok := h.check(); ok { | ||||
| 				return true, nil | ||||
| 			} | ||||
| 			return false, nil | ||||
| 		}) | ||||
| 	if errors.Is(err, context.DeadlineExceeded) { | ||||
| 		klog.Warningf("Deadline exceeded while waiting for storage readiness... ignoring") | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| @@ -0,0 +1,85 @@ | ||||
| /* | ||||
| Copyright 2024 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package server | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| ) | ||||
|  | ||||
| type fakeReadinessStorage struct { | ||||
| 	result error | ||||
| } | ||||
|  | ||||
| func (s *fakeReadinessStorage) New() runtime.Object   { return nil } | ||||
| func (s *fakeReadinessStorage) Destroy()              {} | ||||
| func (s *fakeReadinessStorage) ReadinessCheck() error { return s.result } | ||||
|  | ||||
| func testGVR(index int) metav1.GroupVersionResource { | ||||
| 	return metav1.GroupVersionResource{ | ||||
| 		Group:    "group", | ||||
| 		Version:  "version", | ||||
| 		Resource: fmt.Sprintf("resource-%d", index), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestStorageReadinessHook(t *testing.T) { | ||||
| 	h := NewStorageReadinessHook(time.Second) | ||||
|  | ||||
| 	numChecks := 5 | ||||
| 	storages := make([]*fakeReadinessStorage, numChecks) | ||||
| 	for i := 0; i < numChecks; i++ { | ||||
| 		storages[i] = &fakeReadinessStorage{ | ||||
| 			result: fmt.Errorf("failed"), | ||||
| 		} | ||||
| 		h.RegisterStorage(testGVR(i), storages[i]) | ||||
| 	} | ||||
|  | ||||
| 	for i := 0; i < numChecks; i++ { | ||||
| 		if ok := h.check(); ok { | ||||
| 			t.Errorf("%d: unexpected check pass", i) | ||||
| 		} | ||||
| 		storages[i].result = nil | ||||
| 	} | ||||
| 	if ok := h.check(); !ok { | ||||
| 		t.Errorf("unexpected check failure") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestStorageReadinessHookTimeout(t *testing.T) { | ||||
| 	h := NewStorageReadinessHook(time.Second) | ||||
|  | ||||
| 	storage := &fakeReadinessStorage{ | ||||
| 		result: fmt.Errorf("failed"), | ||||
| 	} | ||||
| 	h.RegisterStorage(testGVR(0), storage) | ||||
|  | ||||
| 	ctx := context.Background() | ||||
| 	hookCtx := PostStartHookContext{ | ||||
| 		LoopbackClientConfig: nil, | ||||
| 		StopCh:               ctx.Done(), | ||||
| 		Context:              ctx, | ||||
| 	} | ||||
| 	if err := h.Hook(hookCtx); err != nil { | ||||
| 		t.Errorf("unexpected hook failure on timeout") | ||||
| 	} | ||||
| } | ||||
| @@ -962,6 +962,14 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) { | ||||
| 	return c.storage.Count(pathPrefix) | ||||
| } | ||||
|  | ||||
| // ReadinessCheck implements storage.Interface. | ||||
| func (c *Cacher) ReadinessCheck() error { | ||||
| 	if !c.ready.check() { | ||||
| 		return storage.ErrStorageNotReady | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // baseObjectThreadUnsafe omits locking for cachingObject. | ||||
| func baseObjectThreadUnsafe(object runtime.Object) runtime.Object { | ||||
| 	if co, ok := object.(*cachingObject); ok { | ||||
|   | ||||
| @@ -181,6 +181,9 @@ func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.O | ||||
| func (d *dummyStorage) Count(_ string) (int64, error) { | ||||
| 	return 0, fmt.Errorf("unimplemented") | ||||
| } | ||||
| func (d *dummyStorage) ReadinessCheck() error { | ||||
| 	return nil | ||||
| } | ||||
| func (d *dummyStorage) injectError(err error) { | ||||
| 	d.Lock() | ||||
| 	defer d.Unlock() | ||||
|   | ||||
| @@ -25,7 +25,10 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/util/validation/field" | ||||
| ) | ||||
|  | ||||
| var ErrResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created") | ||||
| var ( | ||||
| 	ErrResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created") | ||||
| 	ErrStorageNotReady            = errors.New("storage not ready") | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	ErrCodeKeyNotFound int = iota + 1 | ||||
|   | ||||
| @@ -591,6 +591,11 @@ func (s *store) Count(key string) (int64, error) { | ||||
| 	return getResp.Count, nil | ||||
| } | ||||
|  | ||||
| // ReadinessCheck implements storage.Interface. | ||||
| func (s *store) ReadinessCheck() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // resolveGetListRev is used by GetList to resolve the rev to use in the client.KV.Get request. | ||||
| func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts storage.ListOptions) (int64, error) { | ||||
| 	var withRev int64 | ||||
|   | ||||
| @@ -243,6 +243,9 @@ type Interface interface { | ||||
| 	// Count returns number of different entries under the key (generally being path prefix). | ||||
| 	Count(key string) (int64, error) | ||||
|  | ||||
| 	// ReadinessCheck checks if the storage is ready for accepting requests. | ||||
| 	ReadinessCheck() error | ||||
|  | ||||
| 	// RequestWatchProgress requests the a watch stream progress status be sent in the | ||||
| 	// watch response stream as soon as possible. | ||||
| 	// Used for monitor watch progress even if watching resources with no changes. | ||||
|   | ||||
| @@ -24,6 +24,8 @@ import ( | ||||
|  | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	apiserverfeatures "k8s.io/apiserver/pkg/features" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	clientset "k8s.io/client-go/kubernetes" | ||||
| 	restclient "k8s.io/client-go/rest" | ||||
| 	"k8s.io/kubernetes/test/e2e/framework" | ||||
| @@ -126,6 +128,13 @@ var _ = SIGDescribe("health handlers", func() { | ||||
| 	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged | ||||
|  | ||||
| 	ginkgo.It("should contain necessary checks", func(ctx context.Context) { | ||||
| 		if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.WatchCacheInitializationPostStartHook) { | ||||
| 			storageReadinessCheck := "[+]poststarthook/storage-readiness ok" | ||||
| 			requiredHealthzChecks.Insert(storageReadinessCheck) | ||||
| 			requiredLivezChecks.Insert(storageReadinessCheck) | ||||
| 			requiredReadyzChecks.Insert(storageReadinessCheck) | ||||
| 		} | ||||
|  | ||||
| 		ginkgo.By("/health") | ||||
| 		err := testPath(ctx, f.ClientSet, "/healthz?verbose=1", requiredHealthzChecks) | ||||
| 		framework.ExpectNoError(err) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot