diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index b2a0f331f53..44311f5b247 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/profile" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) func (sched *Scheduler) onStorageClassAdd(obj interface{}) { @@ -288,6 +289,7 @@ func addAllEventHandlers( sched *Scheduler, informerFactory informers.SharedInformerFactory, dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, + resourceClaimCache *assumecache.AssumeCache, gvkMap map[framework.GVK]framework.ActionType, ) error { var ( @@ -456,11 +458,9 @@ func addAllEventHandlers( } case framework.ResourceClaim: if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { - if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler( + handlerRegistration = resourceClaimCache.AddEventHandler( buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"), - ); err != nil { - return err - } + ) handlers = append(handlers, handlerRegistration) } case framework.ResourceClass: diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index f0254df4095..a99146cf567 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -26,9 +26,12 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2/ktesting" "k8s.io/apimachinery/pkg/runtime" @@ -38,6 +41,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" @@ -46,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/queue" st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) func TestNodeAllocatableChanged(t *testing.T) { @@ -362,6 +367,7 @@ func TestAddAllEventHandlers(t *testing.T) { tests := []struct { name string gvkMap map[framework.GVK]framework.ActionType + enableDRA bool expectStaticInformers map[reflect.Type]bool expectDynamicInformers map[schema.GroupVersionResource]bool }{ @@ -375,6 +381,44 @@ func TestAddAllEventHandlers(t *testing.T) { }, expectDynamicInformers: map[schema.GroupVersionResource]bool{}, }, + { + name: "DRA events disabled", + gvkMap: map[framework.GVK]framework.ActionType{ + framework.PodSchedulingContext: framework.Add, + framework.ResourceClaim: framework.Add, + framework.ResourceClass: framework.Add, + framework.ResourceClaimParameters: framework.Add, + framework.ResourceClassParameters: framework.Add, + }, + expectStaticInformers: map[reflect.Type]bool{ + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Namespace{}): true, + }, + expectDynamicInformers: map[schema.GroupVersionResource]bool{}, + }, + { + name: "DRA events enabled", + gvkMap: map[framework.GVK]framework.ActionType{ + framework.PodSchedulingContext: framework.Add, + framework.ResourceClaim: framework.Add, + framework.ResourceClass: framework.Add, + framework.ResourceClaimParameters: framework.Add, + framework.ResourceClassParameters: framework.Add, + }, + enableDRA: true, + expectStaticInformers: map[reflect.Type]bool{ + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Namespace{}): true, + reflect.TypeOf(&resourcev1alpha2.PodSchedulingContext{}): true, + reflect.TypeOf(&resourcev1alpha2.ResourceClaim{}): true, + reflect.TypeOf(&resourcev1alpha2.ResourceClaimParameters{}): true, + reflect.TypeOf(&resourcev1alpha2.ResourceClass{}): true, + reflect.TypeOf(&resourcev1alpha2.ResourceClassParameters{}): true, + }, + expectDynamicInformers: map[schema.GroupVersionResource]bool{}, + }, { name: "add GVKs handlers defined in framework dynamically", gvkMap: map[framework.GVK]framework.ActionType{ @@ -433,6 +477,7 @@ func TestAddAllEventHandlers(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, tt.enableDRA) logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -447,8 +492,13 @@ func TestAddAllEventHandlers(t *testing.T) { dynclient := dyfake.NewSimpleDynamicClient(scheme) dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0) + var resourceClaimCache *assumecache.AssumeCache + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer() + resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil) + } - if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap); err != nil { + if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, resourceClaimCache, tt.gvkMap); err != nil { t.Fatalf("Add event handlers failed, error = %v", err) } diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index c5b7d89ceed..34fb5ed440a 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -38,6 +38,7 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) // NodeScoreList declares a list of nodes and their scores. @@ -703,6 +704,11 @@ type Handle interface { SharedInformerFactory() informers.SharedInformerFactory + // ResourceClaimInfos returns an assume cache of ResourceClaim objects + // which gets populated by the shared informer factory and the dynamic resources + // plugin. + ResourceClaimCache() *assumecache.AssumeCache + // RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node. RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index a4da1f7780d..106ce0bdf58 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -276,7 +276,6 @@ type dynamicResources struct { enabled bool fh framework.Handle clientset kubernetes.Interface - claimLister resourcev1alpha2listers.ResourceClaimLister classLister resourcev1alpha2listers.ResourceClassLister podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister claimParametersLister resourcev1alpha2listers.ResourceClaimParametersLister @@ -354,12 +353,10 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe return &dynamicResources{}, nil } - logger := klog.FromContext(ctx) pl := &dynamicResources{ enabled: true, fh: fh, clientset: fh.ClientSet(), - claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(), classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(), podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(), claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(), @@ -368,7 +365,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe classParametersIndexer: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Informer().GetIndexer(), resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(), claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()), - claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), + claimAssumeCache: fh.ResourceClaimCache(), } if err := pl.claimParametersIndexer.AddIndexers(cache.Indexers{generatedFromIndex: claimParametersGeneratedFromIndexFunc}); err != nil { @@ -651,21 +648,6 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po // // TODO (https://github.com/kubernetes/kubernetes/issues/123697): // check that the pending claims depend on structured parameters (depends on refactoring foreachPodResourceClaim, see other TODO). - // - // There is a small race here: - // - The dynamicresources plugin allocates claim A and updates the assume cache. - // - A second pod gets marked as unschedulable based on that assume cache. - // - Before the informer cache here catches up, the pod runs, terminates and - // the claim gets deallocated without ever sending the claim status with - // allocation to the scheduler. - // - The comparison below is for a *very* old claim with no allocation and the - // new claim where the allocation is already removed again, so no - // RemovedClaimAllocation event gets emitted. - // - // This is extremely unlikely and thus a fix is not needed for alpha in Kubernetes 1.30. - // TODO (https://github.com/kubernetes/kubernetes/issues/123698): The solution is to somehow integrate the assume cache - // into the event mechanism. This can be tackled together with adding autoscaler - // support, which also needs to do something with the assume cache. logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) return framework.Queue, nil } @@ -852,11 +834,16 @@ func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podReso if claimName == nil { continue } - claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(*claimName) + obj, err := pl.claimAssumeCache.Get(pod.Namespace + "/" + *claimName) if err != nil { return err } + claim, ok := obj.(*resourcev1alpha2.ResourceClaim) + if !ok { + return fmt.Errorf("unexpected object type %T for assumed object %s/%s", obj, pod.Namespace, *claimName) + } + if claim.DeletionTimestamp != nil { return fmt.Errorf("resourceclaim %q is being deleted", claim.Name) } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 18b6b832a1b..8b93b7d63cb 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/ptr" ) @@ -1217,12 +1218,13 @@ func TestPlugin(t *testing.T) { } type testContext struct { - ctx context.Context - client *fake.Clientset - informerFactory informers.SharedInformerFactory - p *dynamicResources - nodeInfos []*framework.NodeInfo - state *framework.CycleState + ctx context.Context + client *fake.Clientset + informerFactory informers.SharedInformerFactory + claimAssumeCache *assumecache.AssumeCache + p *dynamicResources + nodeInfos []*framework.NodeInfo + state *framework.CycleState } func (tc *testContext) verify(t *testing.T, expected result, initialObjects []metav1.Object, result interface{}, status *framework.Status) { @@ -1388,10 +1390,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters")) tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) - + tc.claimAssumeCache = assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil) opts := []runtime.Option{ runtime.WithClientSet(tc.client), runtime.WithInformerFactory(tc.informerFactory), + runtime.WithResourceClaimCache(tc.claimAssumeCache), } fh, err := runtime.NewFramework(tCtx, nil, nil, opts...) if err != nil { @@ -1558,6 +1561,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { }, "backoff-wrong-old-object": { pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, oldObj: "not-a-claim", newObj: pendingImmediateClaim, expectedErr: true, @@ -1586,15 +1590,10 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { }, "structured-claim-deallocate": { pod: podWithClaimName, - claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, - oldObj: func() *resourcev1alpha2.ResourceClaim { - claim := structuredAllocatedClaim.DeepCopy() - claim.Name += "-other" - return claim - }(), + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim, otherStructuredAllocatedClaim}, + oldObj: otherStructuredAllocatedClaim, newObj: func() *resourcev1alpha2.ResourceClaim { - claim := structuredAllocatedClaim.DeepCopy() - claim.Name += "-other" + claim := otherStructuredAllocatedClaim.DeepCopy() claim.Status.Allocation = nil return claim }(), @@ -1606,18 +1605,48 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { for name, tc := range testcases { t.Run(name, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + logger, tCtx := ktesting.NewTestContext(t) testCtx := setup(t, nil, tc.claims, nil, nil, nil) + oldObj := tc.oldObj + newObj := tc.newObj if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok { - // Update the informer because the lister gets called and must have the claim. - store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore() + // Add or update through the client and wait until the event is processed. + claimKey := claim.Namespace + "/" + claim.Name if tc.oldObj == nil { - require.NoError(t, store.Add(claim)) + // Some test claims already have it. Clear for create. + createClaim := claim.DeepCopy() + createClaim.UID = "" + storedClaim, err := testCtx.client.ResourceV1alpha2().ResourceClaims(createClaim.Namespace).Create(tCtx, createClaim, metav1.CreateOptions{}) + require.NoError(t, err, "create claim") + claim = storedClaim } else { - require.NoError(t, store.Update(claim)) + cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey) + require.NoError(t, err, "retrieve old claim") + updateClaim := claim.DeepCopy() + // The test claim doesn't have those (generated dynamically), so copy them. + updateClaim.UID = cachedClaim.(*resourcev1alpha2.ResourceClaim).UID + updateClaim.ResourceVersion = cachedClaim.(*resourcev1alpha2.ResourceClaim).ResourceVersion + + storedClaim, err := testCtx.client.ResourceV1alpha2().ResourceClaims(updateClaim.Namespace).Update(tCtx, updateClaim, metav1.UpdateOptions{}) + require.NoError(t, err, "update claim") + claim = storedClaim } + + // Eventually the assume cache will have it, too. + require.EventuallyWithT(t, func(t *assert.CollectT) { + cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey) + require.NoError(t, err, "retrieve claim") + if cachedClaim.(*resourcev1alpha2.ResourceClaim).ResourceVersion != claim.ResourceVersion { + t.Errorf("cached claim not updated yet") + } + }, time.Minute, time.Second, "claim assume cache must have new or updated claim") + + // This has the actual UID and ResourceVersion, + // which is relevant for + // isSchedulableAfterClaimChange. + newObj = claim } - actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, tc.oldObj, tc.newObj) + actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, oldObj, newObj) if tc.expectedErr { require.Error(t, err) return diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index ed1b76a585a..f0c31170119 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/metrics" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/pkg/util/slice" ) @@ -71,11 +72,12 @@ type frameworkImpl struct { // pluginsMap contains all plugins, by name. pluginsMap map[string]framework.Plugin - clientSet clientset.Interface - kubeConfig *restclient.Config - eventRecorder events.EventRecorder - informerFactory informers.SharedInformerFactory - logger klog.Logger + clientSet clientset.Interface + kubeConfig *restclient.Config + eventRecorder events.EventRecorder + informerFactory informers.SharedInformerFactory + resourceClaimCache *assumecache.AssumeCache + logger klog.Logger metricsRecorder *metrics.MetricAsyncRecorder profileName string @@ -126,6 +128,7 @@ type frameworkOptions struct { kubeConfig *restclient.Config eventRecorder events.EventRecorder informerFactory informers.SharedInformerFactory + resourceClaimCache *assumecache.AssumeCache snapshotSharedLister framework.SharedLister metricsRecorder *metrics.MetricAsyncRecorder podNominator framework.PodNominator @@ -177,6 +180,13 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option } } +// WithResourceClaimCache sets the resource claim cache for the scheduling frameworkImpl. +func WithResourceClaimCache(resourceClaimCache *assumecache.AssumeCache) Option { + return func(o *frameworkOptions) { + o.resourceClaimCache = resourceClaimCache + } +} + // WithSnapshotSharedLister sets the SharedLister of the snapshot. func WithSnapshotSharedLister(snapshotSharedLister framework.SharedLister) Option { return func(o *frameworkOptions) { @@ -267,6 +277,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler kubeConfig: options.kubeConfig, eventRecorder: options.eventRecorder, informerFactory: options.informerFactory, + resourceClaimCache: options.resourceClaimCache, metricsRecorder: options.metricsRecorder, extenders: options.extenders, PodNominator: options.podNominator, @@ -1606,6 +1617,10 @@ func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory return f.informerFactory } +func (f *frameworkImpl) ResourceClaimCache() *assumecache.AssumeCache { + return f.resourceClaimCache +} + func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] { pgSet := sets.Set[string]{} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index ad302067439..22ac5054d9c 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -48,6 +48,7 @@ import ( internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/profile" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) const ( @@ -294,11 +295,18 @@ func New(ctx context.Context, // waitingPods holds all the pods that are in the scheduler and waiting in the permit stage waitingPods := frameworkruntime.NewWaitingPodsMap() + var resourceClaimCache *assumecache.AssumeCache + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer() + resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil) + } + profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), frameworkruntime.WithClientSet(client), frameworkruntime.WithKubeConfig(options.kubeConfig), frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithResourceClaimCache(resourceClaimCache), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)), frameworkruntime.WithParallelism(int(options.parallelism)), @@ -358,7 +366,7 @@ func New(ctx context.Context, sched.NextPod = podQueue.Pop sched.applyDefaultHandlers() - if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil { + if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, resourceClaimCache, unionedGVKs(queueingHintsPerProfile)); err != nil { return nil, fmt.Errorf("adding event handlers: %w", err) } diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index 69ec1175f03..c7392129cbd 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -25,7 +25,9 @@ import ( "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/api/meta" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/scheduler/util/queue" ) // Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon. @@ -119,9 +121,34 @@ type AssumeCache struct { // Will be used for all operations. logger klog.Logger - // Synchronizes updates to store + // Synchronizes updates to all fields below. rwMutex sync.RWMutex + // All registered event handlers. + eventHandlers []cache.ResourceEventHandler + handlerRegistration cache.ResourceEventHandlerRegistration + + // The eventQueue contains functions which deliver an event to one + // event handler. + // + // These functions must be invoked while *not locking* rwMutex because + // the event handlers are allowed to access the assume cache. Holding + // rwMutex then would cause a deadlock. + // + // New functions get added as part of processing a cache update while + // the rwMutex is locked. Each function which adds something to the queue + // also drains the queue before returning, therefore it is guaranteed + // that all event handlers get notified immediately (useful for unit + // testing). + // + // A channel cannot be used here because it cannot have an unbounded + // capacity. This could lead to a deadlock (writer holds rwMutex, + // gets blocked because capacity is exhausted, reader is in a handler + // which tries to lock the rwMutex). Writing into such a channel + // while not holding the rwMutex doesn't work because in-order delivery + // of events would no longer be guaranteed. + eventQueue queue.FIFO[func()] + // describes the object stored description string @@ -177,7 +204,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam // Unit tests don't use informers if informer != nil { // Cannot fail in practice?! No-one bothers checking the error. - _, _ = informer.AddEventHandler( + c.handlerRegistration, _ = informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: c.add, UpdateFunc: c.update, @@ -199,9 +226,11 @@ func (c *AssumeCache) add(obj interface{}) { return } + defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() + var oldObj interface{} if objInfo, _ := c.getObjInfo(name); objInfo != nil { newVersion, err := c.getObjVersion(name, obj) if err != nil { @@ -221,6 +250,7 @@ func (c *AssumeCache) add(obj interface{}) { c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion) return } + oldObj = objInfo.latestObj } objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj} @@ -228,6 +258,7 @@ func (c *AssumeCache) add(obj interface{}) { c.logger.Info("Error occurred while updating stored object", "err", err) } else { c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) + c.pushEvent(oldObj, obj) } } @@ -246,14 +277,50 @@ func (c *AssumeCache) delete(obj interface{}) { return } + defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() + var oldObj interface{} + if len(c.eventHandlers) > 0 { + if objInfo, _ := c.getObjInfo(name); objInfo != nil { + oldObj = objInfo.latestObj + } + } + objInfo := &objInfo{name: name} err = c.store.Delete(objInfo) if err != nil { c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) } + + c.pushEvent(oldObj, nil) +} + +// pushEvent gets called while the mutex is locked for writing. +// It ensures that all currently registered event handlers get +// notified about a change when the caller starts delivering +// those with emitEvents. +// +// For a delete event, newObj is nil. For an add, oldObj is nil. +// An update has both as non-nil. +func (c *AssumeCache) pushEvent(oldObj, newObj interface{}) { + for _, handler := range c.eventHandlers { + handler := handler + if oldObj == nil { + c.eventQueue.Push(func() { + handler.OnAdd(newObj, false) + }) + } else if newObj == nil { + c.eventQueue.Push(func() { + handler.OnDelete(oldObj) + }) + } else { + c.eventQueue.Push(func() { + handler.OnUpdate(oldObj, newObj) + }) + } + } } func (c *AssumeCache) getObjVersion(name string, obj interface{}) (int64, error) { @@ -315,6 +382,10 @@ func (c *AssumeCache) List(indexObj interface{}) []interface{} { c.rwMutex.RLock() defer c.rwMutex.RUnlock() + return c.listLocked(indexObj) +} + +func (c *AssumeCache) listLocked(indexObj interface{}) []interface{} { allObjs := []interface{}{} var objs []interface{} if c.indexName != "" { @@ -358,6 +429,7 @@ func (c *AssumeCache) Assume(obj interface{}) error { return &ObjectNameError{err} } + defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() @@ -380,6 +452,8 @@ func (c *AssumeCache) Assume(obj interface{}) error { return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) } + c.pushEvent(objInfo.latestObj, obj) + // Only update the cached object objInfo.latestObj = obj c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion) @@ -388,6 +462,7 @@ func (c *AssumeCache) Assume(obj interface{}) error { // Restore the informer cache's version of the object. func (c *AssumeCache) Restore(objName string) { + defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() @@ -396,7 +471,61 @@ func (c *AssumeCache) Restore(objName string) { // This could be expected if object got deleted c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) } else { - objInfo.latestObj = objInfo.apiObj + if objInfo.latestObj != objInfo.apiObj { + c.pushEvent(objInfo.latestObj, objInfo.apiObj) + objInfo.latestObj = objInfo.apiObj + } c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) } } + +// AddEventHandler adds an event handler to the cache. Events to a +// single handler are delivered sequentially, but there is no +// coordination between different handlers. A handler may use the +// cache. +// +// The return value can be used to wait for cache synchronization. +func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache.ResourceEventHandlerRegistration { + defer c.emitEvents() + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + + c.eventHandlers = append(c.eventHandlers, handler) + allObjs := c.listLocked(nil) + for _, obj := range allObjs { + c.eventQueue.Push(func() { + handler.OnAdd(obj, true) + }) + } + + if c.handlerRegistration == nil { + // No informer, so immediately synced. + return syncedHandlerRegistration{} + } + + return c.handlerRegistration +} + +// emitEvents delivers all pending events that are in the queue, in the order +// in which they were stored there (FIFO). +func (c *AssumeCache) emitEvents() { + for { + c.rwMutex.Lock() + deliver, ok := c.eventQueue.Pop() + c.rwMutex.Unlock() + + if !ok { + return + } + func() { + defer utilruntime.HandleCrash() + deliver() + }() + } +} + +// syncedHandlerRegistration is an implementation of ResourceEventHandlerRegistration +// which always returns true. +type syncedHandlerRegistration struct{} + +func (syncedHandlerRegistration) HasSynced() bool { return true } diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go index 6c11ac275fa..a18d51e0a87 100644 --- a/pkg/scheduler/util/assumecache/assume_cache_test.go +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -19,6 +19,8 @@ package assumecache import ( "fmt" "slices" + "sort" + "sync" "testing" "github.com/google/go-cmp/cmp" @@ -118,6 +120,79 @@ func verifyList(tCtx ktesting.TContext, assumeCache *AssumeCache, expectedObjs [ } } +type mockEventHandler struct { + mutex sync.Mutex + events []event + cache *AssumeCache + block <-chan struct{} +} + +type event struct { + What string + OldObj, Obj interface{} + InitialList bool +} + +func (m *mockEventHandler) OnAdd(obj interface{}, initialList bool) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.events = append(m.events, event{ + What: "add", + Obj: obj, + InitialList: initialList, + }) + + if m.cache != nil { + // Must not deadlock! + m.cache.List(nil) + } + if m.block != nil { + <-m.block + } +} + +func (m *mockEventHandler) OnUpdate(oldObj, obj interface{}) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.events = append(m.events, event{ + What: "update", + OldObj: oldObj, + Obj: obj, + }) +} + +func (m *mockEventHandler) OnDelete(obj interface{}) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.events = append(m.events, event{ + What: "delete", + Obj: obj, + }) +} + +func (m *mockEventHandler) verifyAndFlush(tCtx ktesting.TContext, expectedEvents []event) { + m.mutex.Lock() + defer m.mutex.Unlock() + + tCtx.Helper() + if diff := cmp.Diff(expectedEvents, m.events); diff != "" { + tCtx.Fatalf("unexpected events (- expected, + actual):\n%s", diff) + } + m.events = nil +} + +func (m *mockEventHandler) sortEvents(cmp func(objI, objJ interface{}) bool) { + m.mutex.Lock() + defer m.mutex.Unlock() + + sort.Slice(m.events, func(i, j int) bool { + return cmp(m.events[i].Obj, m.events[j].Obj) + }) +} + func TestAssume(t *testing.T) { scenarios := map[string]struct { oldObj metav1.Object @@ -162,6 +237,8 @@ func TestAssume(t *testing.T) { for name, scenario := range scenarios { t.Run(name, func(t *testing.T) { tCtx, cache, informer := newTest(t) + var events mockEventHandler + cache.AddEventHandler(&events) // Add old object to cache. informer.add(scenario.oldObj) @@ -173,18 +250,25 @@ func TestAssume(t *testing.T) { t.Errorf("Assume() returned error: %v\ndiff (- expected, + actual):\n%s", err, diff) } - // Check that Get returns correct object. + // Check that Get returns correct object and + // that events were delivered correctly. + expectEvents := []event{{What: "add", Obj: scenario.oldObj}} expectedObj := scenario.newObj if scenario.expectErr != nil { expectedObj = scenario.oldObj + } else { + expectEvents = append(expectEvents, event{What: "update", OldObj: scenario.oldObj, Obj: scenario.newObj}) } verify(tCtx, cache, scenario.oldObj.GetName(), expectedObj, scenario.oldObj) + events.verifyAndFlush(tCtx, expectEvents) }) } } func TestRestore(t *testing.T) { tCtx, cache, informer := newTest(t) + var events mockEventHandler + cache.AddEventHandler(&events) // This test assumes an object with the same version as the API object. // The assume cache supports that, but doing so in real code suffers from @@ -194,25 +278,40 @@ func TestRestore(t *testing.T) { newObj := makeObj("pvc1", "5", "") // Restore object that doesn't exist - cache.Restore("nothing") + ktesting.Step(tCtx, "empty cache", func(tCtx ktesting.TContext) { + cache.Restore("nothing") + events.verifyAndFlush(tCtx, nil) + }) // Add old object to cache. - informer.add(oldObj) - verify(ktesting.WithStep(tCtx, "after initial update"), cache, oldObj.GetName(), oldObj, oldObj) + ktesting.Step(tCtx, "initial update", func(tCtx ktesting.TContext) { + informer.add(oldObj) + verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) + events.verifyAndFlush(tCtx, []event{{What: "add", Obj: oldObj}}) + }) - // Restore object. - cache.Restore(oldObj.GetName()) - verify(ktesting.WithStep(tCtx, "after initial Restore"), cache, oldObj.GetName(), oldObj, oldObj) + // Restore the same object. + ktesting.Step(tCtx, "initial Restore", func(tCtx ktesting.TContext) { + cache.Restore(oldObj.GetName()) + verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) + events.verifyAndFlush(tCtx, nil) + }) // Assume new object. - if err := cache.Assume(newObj); err != nil { - t.Fatalf("Assume() returned error %v", err) - } - verify(ktesting.WithStep(tCtx, "after Assume"), cache, oldObj.GetName(), newObj, oldObj) + ktesting.Step(tCtx, "Assume", func(tCtx ktesting.TContext) { + if err := cache.Assume(newObj); err != nil { + tCtx.Fatalf("Assume() returned error %v", err) + } + verify(tCtx, cache, oldObj.GetName(), newObj, oldObj) + events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) + }) - // Restore object. - cache.Restore(oldObj.GetName()) - verify(ktesting.WithStep(tCtx, "after second Restore"), cache, oldObj.GetName(), oldObj, oldObj) + // Restore the same object. + ktesting.Step(tCtx, "second Restore", func(tCtx ktesting.TContext) { + cache.Restore(oldObj.GetName()) + verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) + events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: newObj, Obj: oldObj}}) + }) } func TestEvents(t *testing.T) { @@ -226,27 +325,162 @@ func TestEvents(t *testing.T) { informer.add(oldObj) verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, oldObj, oldObj) + // Receive initial list. + var events mockEventHandler + cache.AddEventHandler(&events) + events.verifyAndFlush(ktesting.WithStep(tCtx, "initial list"), []event{{What: "add", Obj: oldObj, InitialList: true}}) + // Update object. - informer.update(newObj) - verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, newObj, newObj) + ktesting.Step(tCtx, "initial update", func(tCtx ktesting.TContext) { + informer.update(newObj) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) + }) // Some error cases (don't occur in practice). - informer.add(1) - verify(ktesting.WithStep(tCtx, "after nop add"), cache, key, newObj, newObj) - informer.add(nil) - verify(ktesting.WithStep(tCtx, "after nil add"), cache, key, newObj, newObj) - informer.update(oldObj) - verify(ktesting.WithStep(tCtx, "after nop update"), cache, key, newObj, newObj) - informer.update(nil) - verify(ktesting.WithStep(tCtx, "after nil update"), cache, key, newObj, newObj) - informer.delete(nil) - verify(ktesting.WithStep(tCtx, "after nop delete"), cache, key, newObj, newObj) + ktesting.Step(tCtx, "nop add", func(tCtx ktesting.TContext) { + informer.add(1) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) + }) + ktesting.Step(tCtx, "nil add", func(tCtx ktesting.TContext) { + informer.add(nil) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) + }) + ktesting.Step(tCtx, "nop update", func(tCtx ktesting.TContext) { + informer.update(oldObj) + events.verifyAndFlush(tCtx, nil) + verify(tCtx, cache, key, newObj, newObj) + }) + ktesting.Step(tCtx, "nil update", func(tCtx ktesting.TContext) { + informer.update(nil) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) + }) + ktesting.Step(tCtx, "nop delete", func(tCtx ktesting.TContext) { + informer.delete(nil) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) + }) // Delete object. - informer.delete(oldObj) - _, err := cache.Get(key) - if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" { - t.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff) + ktesting.Step(tCtx, "delete", func(tCtx ktesting.TContext) { + informer.delete(oldObj) + events.verifyAndFlush(tCtx, []event{{What: "delete", Obj: newObj}}) + _, err := cache.Get(key) + if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" { + tCtx.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff) + } + }) +} + +func TestEventHandlers(t *testing.T) { + tCtx, cache, informer := newTest(t) + handlers := make([]mockEventHandler, 5) + + var objs []metav1.Object + for i := 0; i < 5; i++ { + objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) + informer.add(objs[i]) + } + + // Accessing cache during OnAdd must not deadlock! + handlers[0].cache = cache + + // Order of delivered events is random, we must ensure + // increasing order by name ourselves. + var expectedEvents []event + for _, obj := range objs { + expectedEvents = append(expectedEvents, + event{ + What: "add", + Obj: obj, + InitialList: true, + }, + ) + } + for i := range handlers { + cache.AddEventHandler(&handlers[i]) + handlers[i].sortEvents(func(objI, objJ interface{}) bool { + return objI.(*metav1.ObjectMeta).Name < + objJ.(*metav1.ObjectMeta).Name + }) + handlers[i].verifyAndFlush(tCtx, expectedEvents) + } + + for i := 5; i < 7; i++ { + objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) + informer.add(objs[i]) + for e := range handlers { + handlers[e].verifyAndFlush(tCtx, []event{{What: "add", Obj: objs[i]}}) + } + } + + for i, oldObj := range objs { + newObj := makeObj(fmt.Sprintf("test-pvc%v", i), "2", "") + objs[i] = newObj + informer.update(newObj) + for e := range handlers { + handlers[e].verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) + } + } + + for _, obj := range objs { + informer.delete(obj) + for e := range handlers { + handlers[e].verifyAndFlush(tCtx, []event{{What: "delete", Obj: obj}}) + } + } +} + +func TestEventHandlerConcurrency(t *testing.T) { + tCtx, cache, informer := newTest(t) + handlers := make([]mockEventHandler, 5) + + var objs []metav1.Object + for i := 0; i < 5; i++ { + objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) + } + + // Accessing cache during OnAdd must not deadlock! + handlers[0].cache = cache + + // Each add blocks until this gets cancelled. + tCancelCtx := ktesting.WithCancel(tCtx) + var wg sync.WaitGroup + + for i := range handlers { + handlers[i].block = tCancelCtx.Done() + cache.AddEventHandler(&handlers[i]) + } + + // Execution of the add calls is random, therefore + // we have to sort again. + var expectedEvents []event + for _, obj := range objs { + wg.Add(1) + go func() { + defer wg.Done() + informer.add(obj) + }() + expectedEvents = append(expectedEvents, + event{ + What: "add", + Obj: obj, + }, + ) + } + + tCancelCtx.Cancel("proceed") + wg.Wait() + + for i := range handlers { + handlers[i].sortEvents(func(objI, objJ interface{}) bool { + return objI.(*metav1.ObjectMeta).Name < + objJ.(*metav1.ObjectMeta).Name + }) + handlers[i].verifyAndFlush(tCtx, expectedEvents) } } @@ -254,7 +488,7 @@ func TestListNoIndexer(t *testing.T) { tCtx, cache, informer := newTest(t) // Add a bunch of objects. - objs := make([]interface{}, 0, 10) + var objs []interface{} for i := 0; i < 10; i++ { obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "") objs = append(objs, obj) @@ -293,7 +527,7 @@ func TestListWithIndexer(t *testing.T) { // Add a bunch of objects. ns := "ns1" - objs := make([]interface{}, 0, 10) + var objs []interface{} for i := 0; i < 10; i++ { obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns) objs = append(objs, obj) diff --git a/pkg/scheduler/util/queue/fifo.go b/pkg/scheduler/util/queue/fifo.go new file mode 100644 index 00000000000..ee66733fe43 --- /dev/null +++ b/pkg/scheduler/util/queue/fifo.go @@ -0,0 +1,110 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +const ( + // normalSize limits the size of the buffer that is kept + // for reuse. + normalSize = 4 +) + +// FIFO implements a first-in-first-out queue with unbounded size. +// The null FIFO is a valid empty queue. +// +// Access must be protected by the caller when used concurrently by +// different goroutines, the queue itself implements no locking. +type FIFO[T any] struct { + // elements contains a buffer for elements which have been + // pushed and not popped yet. Two scenarios are possible: + // - one chunk in the middle (start <= end) + // - one chunk at the end, followed by one chunk at the + // beginning (end <= start) + // + // start == end can be either an empty queue or a completely + // full one (with two chunks). + elements []T + + // len counts the number of elements which have been pushed and + // not popped yet. + len int + + // start is the index of the first valid element. + start int + + // end is the index after the last valid element. + end int +} + +func (q *FIFO[T]) Len() int { + return q.len +} + +func (q *FIFO[T]) Push(element T) { + size := len(q.elements) + if q.len == size { + // Need larger buffer. + newSize := size * 2 + if newSize == 0 { + newSize = normalSize + } + elements := make([]T, newSize) + if q.start == 0 { + copy(elements, q.elements) + } else { + copy(elements, q.elements[q.start:]) + copy(elements[len(q.elements)-q.start:], q.elements[0:q.end]) + } + q.start = 0 + q.end = q.len + q.elements = elements + size = newSize + } + if q.end == size { + // Wrap around. + q.elements[0] = element + q.end = 1 + q.len++ + return + } + q.elements[q.end] = element + q.end++ + q.len++ +} + +func (q *FIFO[T]) Pop() (element T, ok bool) { + if q.len == 0 { + return + } + element = q.elements[q.start] + q.start++ + if q.start == len(q.elements) { + // Wrap around. + q.start = 0 + } + q.len-- + + // Once it is empty, shrink down to avoid hanging onto + // a large buffer forever. + if q.len == 0 && len(q.elements) > normalSize { + q.elements = make([]T, normalSize) + q.start = 0 + q.end = 0 + } + + ok = true + return +} diff --git a/pkg/scheduler/util/queue/fifo_test.go b/pkg/scheduler/util/queue/fifo_test.go new file mode 100644 index 00000000000..3d272a90248 --- /dev/null +++ b/pkg/scheduler/util/queue/fifo_test.go @@ -0,0 +1,117 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func verifyPop(t *testing.T, expectedValue int, expectedOk bool, queue *FIFO[int]) { + t.Helper() + actual, ok := queue.Pop() + require.Equal(t, expectedOk, ok) + require.Equal(t, expectedValue, actual) +} + +func verifyEmpty(t *testing.T, queue *FIFO[int]) { + t.Helper() + require.Equal(t, 0, queue.Len()) + verifyPop(t, 0, false, queue) +} + +func TestNull(t *testing.T) { + var queue FIFO[int] + verifyEmpty(t, &queue) +} + +func TestOnePushPop(t *testing.T) { + var queue FIFO[int] + + expected := 10 + queue.Push(10) + require.Equal(t, 1, queue.Len()) + verifyPop(t, expected, true, &queue) + verifyEmpty(t, &queue) +} + +// Pushes some elements, pops all of them, then the same again. +func TestWrapAroundEmpty(t *testing.T) { + var queue FIFO[int] + + for i := 0; i < 5; i++ { + queue.Push(i) + } + require.Equal(t, 5, queue.Len()) + for i := 0; i < 5; i++ { + verifyPop(t, i, true, &queue) + } + verifyEmpty(t, &queue) + + for i := 5; i < 10; i++ { + queue.Push(i) + } + for i := 5; i < 10; i++ { + verifyPop(t, i, true, &queue) + } + verifyEmpty(t, &queue) +} + +// Pushes some elements, pops one, adds more, then pops all. +func TestWrapAroundPartial(t *testing.T) { + var queue FIFO[int] + + for i := 0; i < 5; i++ { + queue.Push(i) + } + require.Equal(t, 5, queue.Len()) + verifyPop(t, 0, true, &queue) + + for i := 5; i < 10; i++ { + queue.Push(i) + } + for i := 1; i < 10; i++ { + verifyPop(t, i, true, &queue) + } + verifyEmpty(t, &queue) +} + +// Push an unusual amount of elements, pop all, and verify that +// the FIFO shrinks back again. +func TestShrink(t *testing.T) { + var queue FIFO[int] + + for i := 0; i < normalSize*2; i++ { + queue.Push(i) + } + require.Equal(t, normalSize*2, queue.Len()) + require.LessOrEqual(t, 2*normalSize, len(queue.elements)) + + // Pop all, should be shrunken when done. + for i := 0; i < normalSize*2; i++ { + verifyPop(t, i, true, &queue) + } + require.Equal(t, 0, queue.Len()) + require.Equal(t, normalSize, len(queue.elements)) + + // Still usable after shrinking? + queue.Push(42) + verifyPop(t, 42, true, &queue) + require.Equal(t, 0, queue.Len()) + require.Equal(t, normalSize, len(queue.elements)) +} diff --git a/test/utils/ktesting/stepcontext.go b/test/utils/ktesting/stepcontext.go index 7271f3da789..b9ff6b1fc3d 100644 --- a/test/utils/ktesting/stepcontext.go +++ b/test/utils/ktesting/stepcontext.go @@ -41,6 +41,24 @@ func WithStep(tCtx TContext, what string) TContext { return WithLogger(sCtx, klog.LoggerWithName(sCtx.Logger(), what)) } +// Step is useful when the context with the step information is +// used more than once: +// +// ktesting.Step(tCtx, "step 1", func(tCtx ktesting.TContext) { +// tCtx.Log(...) +// if (... ) { +// tCtx.Failf(...) +// } +// )} +// +// Inside the callback, the tCtx variable is the one where the step +// has been added. This avoids the need to introduce multiple different +// context variables and risk of using the wrong one. +func Step(tCtx TContext, what string, cb func(tCtx TContext)) { + tCtx.Helper() + cb(WithStep(tCtx, what)) +} + type stepContext struct { TContext what string