Merge pull request #124595 from pohly/dra-scheduler-assume-cache-eventhandlers

DRA: scheduler event handlers via assume cache
This commit is contained in:
Kubernetes Prow Robot
2024-06-25 11:56:28 -07:00
committed by GitHub
12 changed files with 790 additions and 87 deletions

View File

@@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
) )
func (sched *Scheduler) onStorageClassAdd(obj interface{}) { func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
@@ -288,6 +289,7 @@ func addAllEventHandlers(
sched *Scheduler, sched *Scheduler,
informerFactory informers.SharedInformerFactory, informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
resourceClaimCache *assumecache.AssumeCache,
gvkMap map[framework.GVK]framework.ActionType, gvkMap map[framework.GVK]framework.ActionType,
) error { ) error {
var ( var (
@@ -456,11 +458,9 @@ func addAllEventHandlers(
} }
case framework.ResourceClaim: case framework.ResourceClaim:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler( handlerRegistration = resourceClaimCache.AddEventHandler(
buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"), buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"),
); err != nil { )
return err
}
handlers = append(handlers, handlerRegistration) handlers = append(handlers, handlerRegistration)
} }
case framework.ResourceClass: case framework.ResourceClass:

View File

@@ -26,9 +26,12 @@ import (
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
storagev1 "k8s.io/api/storage/v1" storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@@ -38,6 +41,7 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
@@ -46,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/internal/queue"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
) )
func TestNodeAllocatableChanged(t *testing.T) { func TestNodeAllocatableChanged(t *testing.T) {
@@ -362,6 +367,7 @@ func TestAddAllEventHandlers(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
gvkMap map[framework.GVK]framework.ActionType gvkMap map[framework.GVK]framework.ActionType
enableDRA bool
expectStaticInformers map[reflect.Type]bool expectStaticInformers map[reflect.Type]bool
expectDynamicInformers map[schema.GroupVersionResource]bool expectDynamicInformers map[schema.GroupVersionResource]bool
}{ }{
@@ -375,6 +381,44 @@ func TestAddAllEventHandlers(t *testing.T) {
}, },
expectDynamicInformers: map[schema.GroupVersionResource]bool{}, expectDynamicInformers: map[schema.GroupVersionResource]bool{},
}, },
{
name: "DRA events disabled",
gvkMap: map[framework.GVK]framework.ActionType{
framework.PodSchedulingContext: framework.Add,
framework.ResourceClaim: framework.Add,
framework.ResourceClass: framework.Add,
framework.ResourceClaimParameters: framework.Add,
framework.ResourceClassParameters: framework.Add,
},
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
{
name: "DRA events enabled",
gvkMap: map[framework.GVK]framework.ActionType{
framework.PodSchedulingContext: framework.Add,
framework.ResourceClaim: framework.Add,
framework.ResourceClass: framework.Add,
framework.ResourceClaimParameters: framework.Add,
framework.ResourceClassParameters: framework.Add,
},
enableDRA: true,
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
reflect.TypeOf(&resourcev1alpha2.PodSchedulingContext{}): true,
reflect.TypeOf(&resourcev1alpha2.ResourceClaim{}): true,
reflect.TypeOf(&resourcev1alpha2.ResourceClaimParameters{}): true,
reflect.TypeOf(&resourcev1alpha2.ResourceClass{}): true,
reflect.TypeOf(&resourcev1alpha2.ResourceClassParameters{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
{ {
name: "add GVKs handlers defined in framework dynamically", name: "add GVKs handlers defined in framework dynamically",
gvkMap: map[framework.GVK]framework.ActionType{ gvkMap: map[framework.GVK]framework.ActionType{
@@ -433,6 +477,7 @@ func TestAddAllEventHandlers(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, tt.enableDRA)
logger, ctx := ktesting.NewTestContext(t) logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@@ -447,8 +492,13 @@ func TestAddAllEventHandlers(t *testing.T) {
dynclient := dyfake.NewSimpleDynamicClient(scheme) dynclient := dyfake.NewSimpleDynamicClient(scheme)
dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0) dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0)
var resourceClaimCache *assumecache.AssumeCache
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
}
if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap); err != nil { if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, resourceClaimCache, tt.gvkMap); err != nil {
t.Fatalf("Add event handlers failed, error = %v", err) t.Fatalf("Add event handlers failed, error = %v", err)
} }

View File

@@ -38,6 +38,7 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
) )
// NodeScoreList declares a list of nodes and their scores. // NodeScoreList declares a list of nodes and their scores.
@@ -703,6 +704,11 @@ type Handle interface {
SharedInformerFactory() informers.SharedInformerFactory SharedInformerFactory() informers.SharedInformerFactory
// ResourceClaimInfos returns an assume cache of ResourceClaim objects
// which gets populated by the shared informer factory and the dynamic resources
// plugin.
ResourceClaimCache() *assumecache.AssumeCache
// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node. // RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node.
RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status

View File

@@ -276,7 +276,6 @@ type dynamicResources struct {
enabled bool enabled bool
fh framework.Handle fh framework.Handle
clientset kubernetes.Interface clientset kubernetes.Interface
claimLister resourcev1alpha2listers.ResourceClaimLister
classLister resourcev1alpha2listers.ResourceClassLister classLister resourcev1alpha2listers.ResourceClassLister
podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister
claimParametersLister resourcev1alpha2listers.ResourceClaimParametersLister claimParametersLister resourcev1alpha2listers.ResourceClaimParametersLister
@@ -354,12 +353,10 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
return &dynamicResources{}, nil return &dynamicResources{}, nil
} }
logger := klog.FromContext(ctx)
pl := &dynamicResources{ pl := &dynamicResources{
enabled: true, enabled: true,
fh: fh, fh: fh,
clientset: fh.ClientSet(), clientset: fh.ClientSet(),
claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(),
classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(), classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(),
podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(), podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(),
claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(), claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(),
@@ -368,7 +365,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
classParametersIndexer: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Informer().GetIndexer(), classParametersIndexer: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Informer().GetIndexer(),
resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(), resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(),
claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()), claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()),
claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), claimAssumeCache: fh.ResourceClaimCache(),
} }
if err := pl.claimParametersIndexer.AddIndexers(cache.Indexers{generatedFromIndex: claimParametersGeneratedFromIndexFunc}); err != nil { if err := pl.claimParametersIndexer.AddIndexers(cache.Indexers{generatedFromIndex: claimParametersGeneratedFromIndexFunc}); err != nil {
@@ -651,21 +648,6 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
// //
// TODO (https://github.com/kubernetes/kubernetes/issues/123697): // TODO (https://github.com/kubernetes/kubernetes/issues/123697):
// check that the pending claims depend on structured parameters (depends on refactoring foreachPodResourceClaim, see other TODO). // check that the pending claims depend on structured parameters (depends on refactoring foreachPodResourceClaim, see other TODO).
//
// There is a small race here:
// - The dynamicresources plugin allocates claim A and updates the assume cache.
// - A second pod gets marked as unschedulable based on that assume cache.
// - Before the informer cache here catches up, the pod runs, terminates and
// the claim gets deallocated without ever sending the claim status with
// allocation to the scheduler.
// - The comparison below is for a *very* old claim with no allocation and the
// new claim where the allocation is already removed again, so no
// RemovedClaimAllocation event gets emitted.
//
// This is extremely unlikely and thus a fix is not needed for alpha in Kubernetes 1.30.
// TODO (https://github.com/kubernetes/kubernetes/issues/123698): The solution is to somehow integrate the assume cache
// into the event mechanism. This can be tackled together with adding autoscaler
// support, which also needs to do something with the assume cache.
logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
return framework.Queue, nil return framework.Queue, nil
} }
@@ -852,11 +834,16 @@ func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podReso
if claimName == nil { if claimName == nil {
continue continue
} }
claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(*claimName) obj, err := pl.claimAssumeCache.Get(pod.Namespace + "/" + *claimName)
if err != nil { if err != nil {
return err return err
} }
claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
if !ok {
return fmt.Errorf("unexpected object type %T for assumed object %s/%s", obj, pod.Namespace, *claimName)
}
if claim.DeletionTimestamp != nil { if claim.DeletionTimestamp != nil {
return fmt.Errorf("resourceclaim %q is being deleted", claim.Name) return fmt.Errorf("resourceclaim %q is being deleted", claim.Name)
} }

View File

@@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/test/utils/ktesting" "k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
) )
@@ -1220,6 +1221,7 @@ type testContext struct {
ctx context.Context ctx context.Context
client *fake.Clientset client *fake.Clientset
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
claimAssumeCache *assumecache.AssumeCache
p *dynamicResources p *dynamicResources
nodeInfos []*framework.NodeInfo nodeInfos []*framework.NodeInfo
state *framework.CycleState state *framework.CycleState
@@ -1388,10 +1390,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters")) tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters"))
tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)
tc.claimAssumeCache = assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil)
opts := []runtime.Option{ opts := []runtime.Option{
runtime.WithClientSet(tc.client), runtime.WithClientSet(tc.client),
runtime.WithInformerFactory(tc.informerFactory), runtime.WithInformerFactory(tc.informerFactory),
runtime.WithResourceClaimCache(tc.claimAssumeCache),
} }
fh, err := runtime.NewFramework(tCtx, nil, nil, opts...) fh, err := runtime.NewFramework(tCtx, nil, nil, opts...)
if err != nil { if err != nil {
@@ -1558,6 +1561,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
}, },
"backoff-wrong-old-object": { "backoff-wrong-old-object": {
pod: podWithClaimName, pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
oldObj: "not-a-claim", oldObj: "not-a-claim",
newObj: pendingImmediateClaim, newObj: pendingImmediateClaim,
expectedErr: true, expectedErr: true,
@@ -1586,15 +1590,10 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
}, },
"structured-claim-deallocate": { "structured-claim-deallocate": {
pod: podWithClaimName, pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim, otherStructuredAllocatedClaim},
oldObj: func() *resourcev1alpha2.ResourceClaim { oldObj: otherStructuredAllocatedClaim,
claim := structuredAllocatedClaim.DeepCopy()
claim.Name += "-other"
return claim
}(),
newObj: func() *resourcev1alpha2.ResourceClaim { newObj: func() *resourcev1alpha2.ResourceClaim {
claim := structuredAllocatedClaim.DeepCopy() claim := otherStructuredAllocatedClaim.DeepCopy()
claim.Name += "-other"
claim.Status.Allocation = nil claim.Status.Allocation = nil
return claim return claim
}(), }(),
@@ -1606,18 +1605,48 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
for name, tc := range testcases { for name, tc := range testcases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t) logger, tCtx := ktesting.NewTestContext(t)
testCtx := setup(t, nil, tc.claims, nil, nil, nil) testCtx := setup(t, nil, tc.claims, nil, nil, nil)
oldObj := tc.oldObj
newObj := tc.newObj
if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok { if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok {
// Update the informer because the lister gets called and must have the claim. // Add or update through the client and wait until the event is processed.
store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore() claimKey := claim.Namespace + "/" + claim.Name
if tc.oldObj == nil { if tc.oldObj == nil {
require.NoError(t, store.Add(claim)) // Some test claims already have it. Clear for create.
createClaim := claim.DeepCopy()
createClaim.UID = ""
storedClaim, err := testCtx.client.ResourceV1alpha2().ResourceClaims(createClaim.Namespace).Create(tCtx, createClaim, metav1.CreateOptions{})
require.NoError(t, err, "create claim")
claim = storedClaim
} else { } else {
require.NoError(t, store.Update(claim)) cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey)
require.NoError(t, err, "retrieve old claim")
updateClaim := claim.DeepCopy()
// The test claim doesn't have those (generated dynamically), so copy them.
updateClaim.UID = cachedClaim.(*resourcev1alpha2.ResourceClaim).UID
updateClaim.ResourceVersion = cachedClaim.(*resourcev1alpha2.ResourceClaim).ResourceVersion
storedClaim, err := testCtx.client.ResourceV1alpha2().ResourceClaims(updateClaim.Namespace).Update(tCtx, updateClaim, metav1.UpdateOptions{})
require.NoError(t, err, "update claim")
claim = storedClaim
} }
// Eventually the assume cache will have it, too.
require.EventuallyWithT(t, func(t *assert.CollectT) {
cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey)
require.NoError(t, err, "retrieve claim")
if cachedClaim.(*resourcev1alpha2.ResourceClaim).ResourceVersion != claim.ResourceVersion {
t.Errorf("cached claim not updated yet")
} }
actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, tc.oldObj, tc.newObj) }, time.Minute, time.Second, "claim assume cache must have new or updated claim")
// This has the actual UID and ResourceVersion,
// which is relevant for
// isSchedulableAfterClaimChange.
newObj = claim
}
actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, oldObj, newObj)
if tc.expectedErr { if tc.expectedErr {
require.Error(t, err) require.Error(t, err)
return return

View File

@@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/pkg/util/slice" "k8s.io/kubernetes/pkg/util/slice"
) )
@@ -75,6 +76,7 @@ type frameworkImpl struct {
kubeConfig *restclient.Config kubeConfig *restclient.Config
eventRecorder events.EventRecorder eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
resourceClaimCache *assumecache.AssumeCache
logger klog.Logger logger klog.Logger
metricsRecorder *metrics.MetricAsyncRecorder metricsRecorder *metrics.MetricAsyncRecorder
@@ -126,6 +128,7 @@ type frameworkOptions struct {
kubeConfig *restclient.Config kubeConfig *restclient.Config
eventRecorder events.EventRecorder eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
resourceClaimCache *assumecache.AssumeCache
snapshotSharedLister framework.SharedLister snapshotSharedLister framework.SharedLister
metricsRecorder *metrics.MetricAsyncRecorder metricsRecorder *metrics.MetricAsyncRecorder
podNominator framework.PodNominator podNominator framework.PodNominator
@@ -177,6 +180,13 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option
} }
} }
// WithResourceClaimCache sets the resource claim cache for the scheduling frameworkImpl.
func WithResourceClaimCache(resourceClaimCache *assumecache.AssumeCache) Option {
return func(o *frameworkOptions) {
o.resourceClaimCache = resourceClaimCache
}
}
// WithSnapshotSharedLister sets the SharedLister of the snapshot. // WithSnapshotSharedLister sets the SharedLister of the snapshot.
func WithSnapshotSharedLister(snapshotSharedLister framework.SharedLister) Option { func WithSnapshotSharedLister(snapshotSharedLister framework.SharedLister) Option {
return func(o *frameworkOptions) { return func(o *frameworkOptions) {
@@ -267,6 +277,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
kubeConfig: options.kubeConfig, kubeConfig: options.kubeConfig,
eventRecorder: options.eventRecorder, eventRecorder: options.eventRecorder,
informerFactory: options.informerFactory, informerFactory: options.informerFactory,
resourceClaimCache: options.resourceClaimCache,
metricsRecorder: options.metricsRecorder, metricsRecorder: options.metricsRecorder,
extenders: options.extenders, extenders: options.extenders,
PodNominator: options.podNominator, PodNominator: options.podNominator,
@@ -1606,6 +1617,10 @@ func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory
return f.informerFactory return f.informerFactory
} }
func (f *frameworkImpl) ResourceClaimCache() *assumecache.AssumeCache {
return f.resourceClaimCache
}
func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] { func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] {
pgSet := sets.Set[string]{} pgSet := sets.Set[string]{}

View File

@@ -48,6 +48,7 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
) )
const ( const (
@@ -294,11 +295,18 @@ func New(ctx context.Context,
// waitingPods holds all the pods that are in the scheduler and waiting in the permit stage // waitingPods holds all the pods that are in the scheduler and waiting in the permit stage
waitingPods := frameworkruntime.NewWaitingPodsMap() waitingPods := frameworkruntime.NewWaitingPodsMap()
var resourceClaimCache *assumecache.AssumeCache
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
}
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig), frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithResourceClaimCache(resourceClaimCache),
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithParallelism(int(options.parallelism)), frameworkruntime.WithParallelism(int(options.parallelism)),
@@ -358,7 +366,7 @@ func New(ctx context.Context,
sched.NextPod = podQueue.Pop sched.NextPod = podQueue.Pop
sched.applyDefaultHandlers() sched.applyDefaultHandlers()
if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil { if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, resourceClaimCache, unionedGVKs(queueingHintsPerProfile)); err != nil {
return nil, fmt.Errorf("adding event handlers: %w", err) return nil, fmt.Errorf("adding event handlers: %w", err)
} }

View File

@@ -25,7 +25,9 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/scheduler/util/queue"
) )
// Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon. // Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon.
@@ -119,9 +121,34 @@ type AssumeCache struct {
// Will be used for all operations. // Will be used for all operations.
logger klog.Logger logger klog.Logger
// Synchronizes updates to store // Synchronizes updates to all fields below.
rwMutex sync.RWMutex rwMutex sync.RWMutex
// All registered event handlers.
eventHandlers []cache.ResourceEventHandler
handlerRegistration cache.ResourceEventHandlerRegistration
// The eventQueue contains functions which deliver an event to one
// event handler.
//
// These functions must be invoked while *not locking* rwMutex because
// the event handlers are allowed to access the assume cache. Holding
// rwMutex then would cause a deadlock.
//
// New functions get added as part of processing a cache update while
// the rwMutex is locked. Each function which adds something to the queue
// also drains the queue before returning, therefore it is guaranteed
// that all event handlers get notified immediately (useful for unit
// testing).
//
// A channel cannot be used here because it cannot have an unbounded
// capacity. This could lead to a deadlock (writer holds rwMutex,
// gets blocked because capacity is exhausted, reader is in a handler
// which tries to lock the rwMutex). Writing into such a channel
// while not holding the rwMutex doesn't work because in-order delivery
// of events would no longer be guaranteed.
eventQueue queue.FIFO[func()]
// describes the object stored // describes the object stored
description string description string
@@ -177,7 +204,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam
// Unit tests don't use informers // Unit tests don't use informers
if informer != nil { if informer != nil {
// Cannot fail in practice?! No-one bothers checking the error. // Cannot fail in practice?! No-one bothers checking the error.
_, _ = informer.AddEventHandler( c.handlerRegistration, _ = informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: c.add, AddFunc: c.add,
UpdateFunc: c.update, UpdateFunc: c.update,
@@ -199,9 +226,11 @@ func (c *AssumeCache) add(obj interface{}) {
return return
} }
defer c.emitEvents()
c.rwMutex.Lock() c.rwMutex.Lock()
defer c.rwMutex.Unlock() defer c.rwMutex.Unlock()
var oldObj interface{}
if objInfo, _ := c.getObjInfo(name); objInfo != nil { if objInfo, _ := c.getObjInfo(name); objInfo != nil {
newVersion, err := c.getObjVersion(name, obj) newVersion, err := c.getObjVersion(name, obj)
if err != nil { if err != nil {
@@ -221,6 +250,7 @@ func (c *AssumeCache) add(obj interface{}) {
c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion) c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion)
return return
} }
oldObj = objInfo.latestObj
} }
objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj} objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
@@ -228,6 +258,7 @@ func (c *AssumeCache) add(obj interface{}) {
c.logger.Info("Error occurred while updating stored object", "err", err) c.logger.Info("Error occurred while updating stored object", "err", err)
} else { } else {
c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj)
c.pushEvent(oldObj, obj)
} }
} }
@@ -246,14 +277,50 @@ func (c *AssumeCache) delete(obj interface{}) {
return return
} }
defer c.emitEvents()
c.rwMutex.Lock() c.rwMutex.Lock()
defer c.rwMutex.Unlock() defer c.rwMutex.Unlock()
var oldObj interface{}
if len(c.eventHandlers) > 0 {
if objInfo, _ := c.getObjInfo(name); objInfo != nil {
oldObj = objInfo.latestObj
}
}
objInfo := &objInfo{name: name} objInfo := &objInfo{name: name}
err = c.store.Delete(objInfo) err = c.store.Delete(objInfo)
if err != nil { if err != nil {
c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name)
} }
c.pushEvent(oldObj, nil)
}
// pushEvent gets called while the mutex is locked for writing.
// It ensures that all currently registered event handlers get
// notified about a change when the caller starts delivering
// those with emitEvents.
//
// For a delete event, newObj is nil. For an add, oldObj is nil.
// An update has both as non-nil.
func (c *AssumeCache) pushEvent(oldObj, newObj interface{}) {
for _, handler := range c.eventHandlers {
handler := handler
if oldObj == nil {
c.eventQueue.Push(func() {
handler.OnAdd(newObj, false)
})
} else if newObj == nil {
c.eventQueue.Push(func() {
handler.OnDelete(oldObj)
})
} else {
c.eventQueue.Push(func() {
handler.OnUpdate(oldObj, newObj)
})
}
}
} }
func (c *AssumeCache) getObjVersion(name string, obj interface{}) (int64, error) { func (c *AssumeCache) getObjVersion(name string, obj interface{}) (int64, error) {
@@ -315,6 +382,10 @@ func (c *AssumeCache) List(indexObj interface{}) []interface{} {
c.rwMutex.RLock() c.rwMutex.RLock()
defer c.rwMutex.RUnlock() defer c.rwMutex.RUnlock()
return c.listLocked(indexObj)
}
func (c *AssumeCache) listLocked(indexObj interface{}) []interface{} {
allObjs := []interface{}{} allObjs := []interface{}{}
var objs []interface{} var objs []interface{}
if c.indexName != "" { if c.indexName != "" {
@@ -358,6 +429,7 @@ func (c *AssumeCache) Assume(obj interface{}) error {
return &ObjectNameError{err} return &ObjectNameError{err}
} }
defer c.emitEvents()
c.rwMutex.Lock() c.rwMutex.Lock()
defer c.rwMutex.Unlock() defer c.rwMutex.Unlock()
@@ -380,6 +452,8 @@ func (c *AssumeCache) Assume(obj interface{}) error {
return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
} }
c.pushEvent(objInfo.latestObj, obj)
// Only update the cached object // Only update the cached object
objInfo.latestObj = obj objInfo.latestObj = obj
c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion) c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion)
@@ -388,6 +462,7 @@ func (c *AssumeCache) Assume(obj interface{}) error {
// Restore the informer cache's version of the object. // Restore the informer cache's version of the object.
func (c *AssumeCache) Restore(objName string) { func (c *AssumeCache) Restore(objName string) {
defer c.emitEvents()
c.rwMutex.Lock() c.rwMutex.Lock()
defer c.rwMutex.Unlock() defer c.rwMutex.Unlock()
@@ -396,7 +471,61 @@ func (c *AssumeCache) Restore(objName string) {
// This could be expected if object got deleted // This could be expected if object got deleted
c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err)
} else { } else {
if objInfo.latestObj != objInfo.apiObj {
c.pushEvent(objInfo.latestObj, objInfo.apiObj)
objInfo.latestObj = objInfo.apiObj objInfo.latestObj = objInfo.apiObj
}
c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName)
} }
} }
// AddEventHandler adds an event handler to the cache. Events to a
// single handler are delivered sequentially, but there is no
// coordination between different handlers. A handler may use the
// cache.
//
// The return value can be used to wait for cache synchronization.
func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache.ResourceEventHandlerRegistration {
defer c.emitEvents()
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
c.eventHandlers = append(c.eventHandlers, handler)
allObjs := c.listLocked(nil)
for _, obj := range allObjs {
c.eventQueue.Push(func() {
handler.OnAdd(obj, true)
})
}
if c.handlerRegistration == nil {
// No informer, so immediately synced.
return syncedHandlerRegistration{}
}
return c.handlerRegistration
}
// emitEvents delivers all pending events that are in the queue, in the order
// in which they were stored there (FIFO).
func (c *AssumeCache) emitEvents() {
for {
c.rwMutex.Lock()
deliver, ok := c.eventQueue.Pop()
c.rwMutex.Unlock()
if !ok {
return
}
func() {
defer utilruntime.HandleCrash()
deliver()
}()
}
}
// syncedHandlerRegistration is an implementation of ResourceEventHandlerRegistration
// which always returns true.
type syncedHandlerRegistration struct{}
func (syncedHandlerRegistration) HasSynced() bool { return true }

View File

@@ -19,6 +19,8 @@ package assumecache
import ( import (
"fmt" "fmt"
"slices" "slices"
"sort"
"sync"
"testing" "testing"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
@@ -118,6 +120,79 @@ func verifyList(tCtx ktesting.TContext, assumeCache *AssumeCache, expectedObjs [
} }
} }
type mockEventHandler struct {
mutex sync.Mutex
events []event
cache *AssumeCache
block <-chan struct{}
}
type event struct {
What string
OldObj, Obj interface{}
InitialList bool
}
func (m *mockEventHandler) OnAdd(obj interface{}, initialList bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.events = append(m.events, event{
What: "add",
Obj: obj,
InitialList: initialList,
})
if m.cache != nil {
// Must not deadlock!
m.cache.List(nil)
}
if m.block != nil {
<-m.block
}
}
func (m *mockEventHandler) OnUpdate(oldObj, obj interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.events = append(m.events, event{
What: "update",
OldObj: oldObj,
Obj: obj,
})
}
func (m *mockEventHandler) OnDelete(obj interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.events = append(m.events, event{
What: "delete",
Obj: obj,
})
}
func (m *mockEventHandler) verifyAndFlush(tCtx ktesting.TContext, expectedEvents []event) {
m.mutex.Lock()
defer m.mutex.Unlock()
tCtx.Helper()
if diff := cmp.Diff(expectedEvents, m.events); diff != "" {
tCtx.Fatalf("unexpected events (- expected, + actual):\n%s", diff)
}
m.events = nil
}
func (m *mockEventHandler) sortEvents(cmp func(objI, objJ interface{}) bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
sort.Slice(m.events, func(i, j int) bool {
return cmp(m.events[i].Obj, m.events[j].Obj)
})
}
func TestAssume(t *testing.T) { func TestAssume(t *testing.T) {
scenarios := map[string]struct { scenarios := map[string]struct {
oldObj metav1.Object oldObj metav1.Object
@@ -162,6 +237,8 @@ func TestAssume(t *testing.T) {
for name, scenario := range scenarios { for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
tCtx, cache, informer := newTest(t) tCtx, cache, informer := newTest(t)
var events mockEventHandler
cache.AddEventHandler(&events)
// Add old object to cache. // Add old object to cache.
informer.add(scenario.oldObj) informer.add(scenario.oldObj)
@@ -173,18 +250,25 @@ func TestAssume(t *testing.T) {
t.Errorf("Assume() returned error: %v\ndiff (- expected, + actual):\n%s", err, diff) t.Errorf("Assume() returned error: %v\ndiff (- expected, + actual):\n%s", err, diff)
} }
// Check that Get returns correct object. // Check that Get returns correct object and
// that events were delivered correctly.
expectEvents := []event{{What: "add", Obj: scenario.oldObj}}
expectedObj := scenario.newObj expectedObj := scenario.newObj
if scenario.expectErr != nil { if scenario.expectErr != nil {
expectedObj = scenario.oldObj expectedObj = scenario.oldObj
} else {
expectEvents = append(expectEvents, event{What: "update", OldObj: scenario.oldObj, Obj: scenario.newObj})
} }
verify(tCtx, cache, scenario.oldObj.GetName(), expectedObj, scenario.oldObj) verify(tCtx, cache, scenario.oldObj.GetName(), expectedObj, scenario.oldObj)
events.verifyAndFlush(tCtx, expectEvents)
}) })
} }
} }
func TestRestore(t *testing.T) { func TestRestore(t *testing.T) {
tCtx, cache, informer := newTest(t) tCtx, cache, informer := newTest(t)
var events mockEventHandler
cache.AddEventHandler(&events)
// This test assumes an object with the same version as the API object. // This test assumes an object with the same version as the API object.
// The assume cache supports that, but doing so in real code suffers from // The assume cache supports that, but doing so in real code suffers from
@@ -194,25 +278,40 @@ func TestRestore(t *testing.T) {
newObj := makeObj("pvc1", "5", "") newObj := makeObj("pvc1", "5", "")
// Restore object that doesn't exist // Restore object that doesn't exist
ktesting.Step(tCtx, "empty cache", func(tCtx ktesting.TContext) {
cache.Restore("nothing") cache.Restore("nothing")
events.verifyAndFlush(tCtx, nil)
})
// Add old object to cache. // Add old object to cache.
ktesting.Step(tCtx, "initial update", func(tCtx ktesting.TContext) {
informer.add(oldObj) informer.add(oldObj)
verify(ktesting.WithStep(tCtx, "after initial update"), cache, oldObj.GetName(), oldObj, oldObj) verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj)
events.verifyAndFlush(tCtx, []event{{What: "add", Obj: oldObj}})
})
// Restore object. // Restore the same object.
ktesting.Step(tCtx, "initial Restore", func(tCtx ktesting.TContext) {
cache.Restore(oldObj.GetName()) cache.Restore(oldObj.GetName())
verify(ktesting.WithStep(tCtx, "after initial Restore"), cache, oldObj.GetName(), oldObj, oldObj) verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj)
events.verifyAndFlush(tCtx, nil)
})
// Assume new object. // Assume new object.
ktesting.Step(tCtx, "Assume", func(tCtx ktesting.TContext) {
if err := cache.Assume(newObj); err != nil { if err := cache.Assume(newObj); err != nil {
t.Fatalf("Assume() returned error %v", err) tCtx.Fatalf("Assume() returned error %v", err)
} }
verify(ktesting.WithStep(tCtx, "after Assume"), cache, oldObj.GetName(), newObj, oldObj) verify(tCtx, cache, oldObj.GetName(), newObj, oldObj)
events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}})
})
// Restore object. // Restore the same object.
ktesting.Step(tCtx, "second Restore", func(tCtx ktesting.TContext) {
cache.Restore(oldObj.GetName()) cache.Restore(oldObj.GetName())
verify(ktesting.WithStep(tCtx, "after second Restore"), cache, oldObj.GetName(), oldObj, oldObj) verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj)
events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: newObj, Obj: oldObj}})
})
} }
func TestEvents(t *testing.T) { func TestEvents(t *testing.T) {
@@ -226,27 +325,162 @@ func TestEvents(t *testing.T) {
informer.add(oldObj) informer.add(oldObj)
verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, oldObj, oldObj) verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, oldObj, oldObj)
// Receive initial list.
var events mockEventHandler
cache.AddEventHandler(&events)
events.verifyAndFlush(ktesting.WithStep(tCtx, "initial list"), []event{{What: "add", Obj: oldObj, InitialList: true}})
// Update object. // Update object.
ktesting.Step(tCtx, "initial update", func(tCtx ktesting.TContext) {
informer.update(newObj) informer.update(newObj)
verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, newObj, newObj) verify(tCtx, cache, key, newObj, newObj)
events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}})
})
// Some error cases (don't occur in practice). // Some error cases (don't occur in practice).
ktesting.Step(tCtx, "nop add", func(tCtx ktesting.TContext) {
informer.add(1) informer.add(1)
verify(ktesting.WithStep(tCtx, "after nop add"), cache, key, newObj, newObj) verify(tCtx, cache, key, newObj, newObj)
events.verifyAndFlush(tCtx, nil)
})
ktesting.Step(tCtx, "nil add", func(tCtx ktesting.TContext) {
informer.add(nil) informer.add(nil)
verify(ktesting.WithStep(tCtx, "after nil add"), cache, key, newObj, newObj) verify(tCtx, cache, key, newObj, newObj)
events.verifyAndFlush(tCtx, nil)
})
ktesting.Step(tCtx, "nop update", func(tCtx ktesting.TContext) {
informer.update(oldObj) informer.update(oldObj)
verify(ktesting.WithStep(tCtx, "after nop update"), cache, key, newObj, newObj) events.verifyAndFlush(tCtx, nil)
verify(tCtx, cache, key, newObj, newObj)
})
ktesting.Step(tCtx, "nil update", func(tCtx ktesting.TContext) {
informer.update(nil) informer.update(nil)
verify(ktesting.WithStep(tCtx, "after nil update"), cache, key, newObj, newObj) verify(tCtx, cache, key, newObj, newObj)
events.verifyAndFlush(tCtx, nil)
})
ktesting.Step(tCtx, "nop delete", func(tCtx ktesting.TContext) {
informer.delete(nil) informer.delete(nil)
verify(ktesting.WithStep(tCtx, "after nop delete"), cache, key, newObj, newObj) verify(tCtx, cache, key, newObj, newObj)
events.verifyAndFlush(tCtx, nil)
})
// Delete object. // Delete object.
ktesting.Step(tCtx, "delete", func(tCtx ktesting.TContext) {
informer.delete(oldObj) informer.delete(oldObj)
events.verifyAndFlush(tCtx, []event{{What: "delete", Obj: newObj}})
_, err := cache.Get(key) _, err := cache.Get(key)
if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" { if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" {
t.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff) tCtx.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff)
}
})
}
func TestEventHandlers(t *testing.T) {
tCtx, cache, informer := newTest(t)
handlers := make([]mockEventHandler, 5)
var objs []metav1.Object
for i := 0; i < 5; i++ {
objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", ""))
informer.add(objs[i])
}
// Accessing cache during OnAdd must not deadlock!
handlers[0].cache = cache
// Order of delivered events is random, we must ensure
// increasing order by name ourselves.
var expectedEvents []event
for _, obj := range objs {
expectedEvents = append(expectedEvents,
event{
What: "add",
Obj: obj,
InitialList: true,
},
)
}
for i := range handlers {
cache.AddEventHandler(&handlers[i])
handlers[i].sortEvents(func(objI, objJ interface{}) bool {
return objI.(*metav1.ObjectMeta).Name <
objJ.(*metav1.ObjectMeta).Name
})
handlers[i].verifyAndFlush(tCtx, expectedEvents)
}
for i := 5; i < 7; i++ {
objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", ""))
informer.add(objs[i])
for e := range handlers {
handlers[e].verifyAndFlush(tCtx, []event{{What: "add", Obj: objs[i]}})
}
}
for i, oldObj := range objs {
newObj := makeObj(fmt.Sprintf("test-pvc%v", i), "2", "")
objs[i] = newObj
informer.update(newObj)
for e := range handlers {
handlers[e].verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}})
}
}
for _, obj := range objs {
informer.delete(obj)
for e := range handlers {
handlers[e].verifyAndFlush(tCtx, []event{{What: "delete", Obj: obj}})
}
}
}
func TestEventHandlerConcurrency(t *testing.T) {
tCtx, cache, informer := newTest(t)
handlers := make([]mockEventHandler, 5)
var objs []metav1.Object
for i := 0; i < 5; i++ {
objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", ""))
}
// Accessing cache during OnAdd must not deadlock!
handlers[0].cache = cache
// Each add blocks until this gets cancelled.
tCancelCtx := ktesting.WithCancel(tCtx)
var wg sync.WaitGroup
for i := range handlers {
handlers[i].block = tCancelCtx.Done()
cache.AddEventHandler(&handlers[i])
}
// Execution of the add calls is random, therefore
// we have to sort again.
var expectedEvents []event
for _, obj := range objs {
wg.Add(1)
go func() {
defer wg.Done()
informer.add(obj)
}()
expectedEvents = append(expectedEvents,
event{
What: "add",
Obj: obj,
},
)
}
tCancelCtx.Cancel("proceed")
wg.Wait()
for i := range handlers {
handlers[i].sortEvents(func(objI, objJ interface{}) bool {
return objI.(*metav1.ObjectMeta).Name <
objJ.(*metav1.ObjectMeta).Name
})
handlers[i].verifyAndFlush(tCtx, expectedEvents)
} }
} }
@@ -254,7 +488,7 @@ func TestListNoIndexer(t *testing.T) {
tCtx, cache, informer := newTest(t) tCtx, cache, informer := newTest(t)
// Add a bunch of objects. // Add a bunch of objects.
objs := make([]interface{}, 0, 10) var objs []interface{}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "") obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")
objs = append(objs, obj) objs = append(objs, obj)
@@ -293,7 +527,7 @@ func TestListWithIndexer(t *testing.T) {
// Add a bunch of objects. // Add a bunch of objects.
ns := "ns1" ns := "ns1"
objs := make([]interface{}, 0, 10) var objs []interface{}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns) obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns)
objs = append(objs, obj) objs = append(objs, obj)

View File

@@ -0,0 +1,110 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
const (
// normalSize limits the size of the buffer that is kept
// for reuse.
normalSize = 4
)
// FIFO implements a first-in-first-out queue with unbounded size.
// The null FIFO is a valid empty queue.
//
// Access must be protected by the caller when used concurrently by
// different goroutines, the queue itself implements no locking.
type FIFO[T any] struct {
// elements contains a buffer for elements which have been
// pushed and not popped yet. Two scenarios are possible:
// - one chunk in the middle (start <= end)
// - one chunk at the end, followed by one chunk at the
// beginning (end <= start)
//
// start == end can be either an empty queue or a completely
// full one (with two chunks).
elements []T
// len counts the number of elements which have been pushed and
// not popped yet.
len int
// start is the index of the first valid element.
start int
// end is the index after the last valid element.
end int
}
func (q *FIFO[T]) Len() int {
return q.len
}
func (q *FIFO[T]) Push(element T) {
size := len(q.elements)
if q.len == size {
// Need larger buffer.
newSize := size * 2
if newSize == 0 {
newSize = normalSize
}
elements := make([]T, newSize)
if q.start == 0 {
copy(elements, q.elements)
} else {
copy(elements, q.elements[q.start:])
copy(elements[len(q.elements)-q.start:], q.elements[0:q.end])
}
q.start = 0
q.end = q.len
q.elements = elements
size = newSize
}
if q.end == size {
// Wrap around.
q.elements[0] = element
q.end = 1
q.len++
return
}
q.elements[q.end] = element
q.end++
q.len++
}
func (q *FIFO[T]) Pop() (element T, ok bool) {
if q.len == 0 {
return
}
element = q.elements[q.start]
q.start++
if q.start == len(q.elements) {
// Wrap around.
q.start = 0
}
q.len--
// Once it is empty, shrink down to avoid hanging onto
// a large buffer forever.
if q.len == 0 && len(q.elements) > normalSize {
q.elements = make([]T, normalSize)
q.start = 0
q.end = 0
}
ok = true
return
}

View File

@@ -0,0 +1,117 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"testing"
"github.com/stretchr/testify/require"
)
func verifyPop(t *testing.T, expectedValue int, expectedOk bool, queue *FIFO[int]) {
t.Helper()
actual, ok := queue.Pop()
require.Equal(t, expectedOk, ok)
require.Equal(t, expectedValue, actual)
}
func verifyEmpty(t *testing.T, queue *FIFO[int]) {
t.Helper()
require.Equal(t, 0, queue.Len())
verifyPop(t, 0, false, queue)
}
func TestNull(t *testing.T) {
var queue FIFO[int]
verifyEmpty(t, &queue)
}
func TestOnePushPop(t *testing.T) {
var queue FIFO[int]
expected := 10
queue.Push(10)
require.Equal(t, 1, queue.Len())
verifyPop(t, expected, true, &queue)
verifyEmpty(t, &queue)
}
// Pushes some elements, pops all of them, then the same again.
func TestWrapAroundEmpty(t *testing.T) {
var queue FIFO[int]
for i := 0; i < 5; i++ {
queue.Push(i)
}
require.Equal(t, 5, queue.Len())
for i := 0; i < 5; i++ {
verifyPop(t, i, true, &queue)
}
verifyEmpty(t, &queue)
for i := 5; i < 10; i++ {
queue.Push(i)
}
for i := 5; i < 10; i++ {
verifyPop(t, i, true, &queue)
}
verifyEmpty(t, &queue)
}
// Pushes some elements, pops one, adds more, then pops all.
func TestWrapAroundPartial(t *testing.T) {
var queue FIFO[int]
for i := 0; i < 5; i++ {
queue.Push(i)
}
require.Equal(t, 5, queue.Len())
verifyPop(t, 0, true, &queue)
for i := 5; i < 10; i++ {
queue.Push(i)
}
for i := 1; i < 10; i++ {
verifyPop(t, i, true, &queue)
}
verifyEmpty(t, &queue)
}
// Push an unusual amount of elements, pop all, and verify that
// the FIFO shrinks back again.
func TestShrink(t *testing.T) {
var queue FIFO[int]
for i := 0; i < normalSize*2; i++ {
queue.Push(i)
}
require.Equal(t, normalSize*2, queue.Len())
require.LessOrEqual(t, 2*normalSize, len(queue.elements))
// Pop all, should be shrunken when done.
for i := 0; i < normalSize*2; i++ {
verifyPop(t, i, true, &queue)
}
require.Equal(t, 0, queue.Len())
require.Equal(t, normalSize, len(queue.elements))
// Still usable after shrinking?
queue.Push(42)
verifyPop(t, 42, true, &queue)
require.Equal(t, 0, queue.Len())
require.Equal(t, normalSize, len(queue.elements))
}

View File

@@ -41,6 +41,24 @@ func WithStep(tCtx TContext, what string) TContext {
return WithLogger(sCtx, klog.LoggerWithName(sCtx.Logger(), what)) return WithLogger(sCtx, klog.LoggerWithName(sCtx.Logger(), what))
} }
// Step is useful when the context with the step information is
// used more than once:
//
// ktesting.Step(tCtx, "step 1", func(tCtx ktesting.TContext) {
// tCtx.Log(...)
// if (... ) {
// tCtx.Failf(...)
// }
// )}
//
// Inside the callback, the tCtx variable is the one where the step
// has been added. This avoids the need to introduce multiple different
// context variables and risk of using the wrong one.
func Step(tCtx TContext, what string, cb func(tCtx TContext)) {
tCtx.Helper()
cb(WithStep(tCtx, what))
}
type stepContext struct { type stepContext struct {
TContext TContext
what string what string