DRA: add DRAControlPlaneController feature gate for "classic DRA"
In the API, the effect of the feature gate is that alpha fields get dropped on
create. They get preserved during updates if already set. The
PodSchedulingContext registration is *not* restricted by the feature gate.
This enables deleting stale PodSchedulingContext objects after disabling
the feature gate.
The scheduler checks the new feature gate before setting up an informer for
PodSchedulingContext objects and when deciding whether it can schedule a
pod. If any claim depends on a control plane controller, the scheduler bails
out, leading to:
Status: Pending
...
Warning FailedScheduling 73s default-scheduler 0/1 nodes are available: resourceclaim depends on disabled DRAControlPlaneController feature. no new claims to deallocate, preemption: 0/1 nodes are available: 1 Preemption is not helpful for scheduling.
The rest of the changes prepare for testing the new feature separately from
"structured parameters". The goal is to have base "dra" jobs which just enable
and test those, then "classic-dra" jobs which add DRAControlPlaneController.
This commit is contained in:
@@ -146,6 +146,9 @@ func (p *podSchedulingState) isDirty() bool {
|
||||
// init checks whether there is already a PodSchedulingContext object.
|
||||
// Must not be called concurrently,
|
||||
func (p *podSchedulingState) init(ctx context.Context, pod *v1.Pod, podSchedulingContextLister resourcelisters.PodSchedulingContextLister) error {
|
||||
if podSchedulingContextLister == nil {
|
||||
return nil
|
||||
}
|
||||
schedulingCtx, err := podSchedulingContextLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
|
||||
switch {
|
||||
case apierrors.IsNotFound(err):
|
||||
@@ -267,11 +270,13 @@ func statusForClaim(schedulingCtx *resourceapi.PodSchedulingContext, podClaimNam
|
||||
|
||||
// dynamicResources is a plugin that ensures that ResourceClaims are allocated.
|
||||
type dynamicResources struct {
|
||||
enabled bool
|
||||
enabled bool
|
||||
controlPlaneControllerEnabled bool
|
||||
|
||||
fh framework.Handle
|
||||
clientset kubernetes.Interface
|
||||
classLister resourcelisters.DeviceClassLister
|
||||
podSchedulingContextLister resourcelisters.PodSchedulingContextLister
|
||||
podSchedulingContextLister resourcelisters.PodSchedulingContextLister // nil if and only if DRAControlPlaneController is disabled
|
||||
sliceLister resourcelisters.ResourceSliceLister
|
||||
|
||||
// claimAssumeCache enables temporarily storing a newer claim object
|
||||
@@ -338,13 +343,17 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
|
||||
}
|
||||
|
||||
pl := &dynamicResources{
|
||||
enabled: true,
|
||||
fh: fh,
|
||||
clientset: fh.ClientSet(),
|
||||
classLister: fh.SharedInformerFactory().Resource().V1alpha3().DeviceClasses().Lister(),
|
||||
podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha3().PodSchedulingContexts().Lister(),
|
||||
sliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(),
|
||||
claimAssumeCache: fh.ResourceClaimCache(),
|
||||
enabled: true,
|
||||
controlPlaneControllerEnabled: fts.EnableDRAControlPlaneController,
|
||||
|
||||
fh: fh,
|
||||
clientset: fh.ClientSet(),
|
||||
classLister: fh.SharedInformerFactory().Resource().V1alpha3().DeviceClasses().Lister(),
|
||||
sliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(),
|
||||
claimAssumeCache: fh.ResourceClaimCache(),
|
||||
}
|
||||
if pl.controlPlaneControllerEnabled {
|
||||
pl.podSchedulingContextLister = fh.SharedInformerFactory().Resource().V1alpha3().PodSchedulingContexts().Lister()
|
||||
}
|
||||
|
||||
return pl, nil
|
||||
@@ -375,9 +384,6 @@ func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.Clu
|
||||
events := []framework.ClusterEventWithHint{
|
||||
// Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable.
|
||||
{Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange},
|
||||
// When a driver has provided additional information, a pod waiting for that information
|
||||
// may be schedulable.
|
||||
{Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodSchedulingContextChange},
|
||||
// A resource might depend on node labels for topology filtering.
|
||||
// A new or updated node may make pods schedulable.
|
||||
//
|
||||
@@ -393,6 +399,15 @@ func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.Clu
|
||||
// A pod might be waiting for a class to get created or modified.
|
||||
{Event: framework.ClusterEvent{Resource: framework.DeviceClass, ActionType: framework.Add | framework.Update}},
|
||||
}
|
||||
|
||||
if pl.podSchedulingContextLister != nil {
|
||||
events = append(events,
|
||||
// When a driver has provided additional information, a pod waiting for that information
|
||||
// may be schedulable.
|
||||
framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodSchedulingContextChange},
|
||||
)
|
||||
}
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
@@ -400,6 +415,10 @@ func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.Clu
|
||||
// scheduled. When this fails, one of the registered events can trigger another
|
||||
// attempt.
|
||||
func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status *framework.Status) {
|
||||
if !pl.enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := pl.foreachPodResourceClaim(pod, nil); err != nil {
|
||||
return statusUnschedulable(klog.FromContext(ctx), err.Error())
|
||||
}
|
||||
@@ -679,6 +698,7 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
|
||||
}
|
||||
|
||||
// Fetch PodSchedulingContext, it's going to be needed when checking claims.
|
||||
// Doesn't do anything when DRAControlPlaneController is disabled.
|
||||
if err := s.podSchedulingState.init(ctx, pod, pl.podSchedulingContextLister); err != nil {
|
||||
return nil, statusError(logger, err)
|
||||
}
|
||||
@@ -688,6 +708,16 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
|
||||
|
||||
s.informationsForClaim = make([]informationForClaim, len(claims))
|
||||
for index, claim := range claims {
|
||||
if claim.Spec.Controller != "" &&
|
||||
!pl.controlPlaneControllerEnabled {
|
||||
// This keeps the pod as unschedulable until the
|
||||
// scheduler gets restarted with "classic DRA" enabled
|
||||
// or the claim gets replaced with one which doesn't
|
||||
// need the feature. That is a cluster event that
|
||||
// re-enqueues the pod.
|
||||
return nil, statusUnschedulable(logger, "resourceclaim depends on disabled DRAControlPlaneController feature", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
|
||||
}
|
||||
|
||||
if claim.Status.DeallocationRequested {
|
||||
// This will get resolved by the resource driver.
|
||||
return nil, statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
|
||||
|
||||
@@ -316,7 +316,11 @@ func TestPlugin(t *testing.T) {
|
||||
|
||||
prepare prepare
|
||||
want want
|
||||
disable bool
|
||||
|
||||
// Feature gates. False is chosen so that the uncommon case
|
||||
// doesn't need to be set.
|
||||
disableDRA bool
|
||||
disableClassicDRA bool
|
||||
}{
|
||||
"empty": {
|
||||
pod: st.MakePod().Name("foo").Namespace("default").Obj(),
|
||||
@@ -912,7 +916,7 @@ func TestPlugin(t *testing.T) {
|
||||
pod: podWithClaimName,
|
||||
claims: []*resourceapi.ResourceClaim{inUseClaim},
|
||||
},
|
||||
"disable": {
|
||||
"DRA-disabled": {
|
||||
pod: podWithClaimName,
|
||||
claims: []*resourceapi.ResourceClaim{inUseClaim},
|
||||
want: want{
|
||||
@@ -920,7 +924,7 @@ func TestPlugin(t *testing.T) {
|
||||
status: framework.NewStatus(framework.Skip),
|
||||
},
|
||||
},
|
||||
disable: true,
|
||||
disableDRA: true,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -933,8 +937,11 @@ func TestPlugin(t *testing.T) {
|
||||
if nodes == nil {
|
||||
nodes = []*v1.Node{workerNode}
|
||||
}
|
||||
testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings, tc.objs)
|
||||
testCtx.p.enabled = !tc.disable
|
||||
features := feature.Features{
|
||||
EnableDynamicResourceAllocation: !tc.disableDRA,
|
||||
EnableDRAControlPlaneController: !tc.disableClassicDRA,
|
||||
}
|
||||
testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings, tc.objs, features)
|
||||
initialObjects := testCtx.listAll(t)
|
||||
|
||||
status := testCtx.p.PreEnqueue(testCtx.ctx, tc.pod)
|
||||
@@ -1136,6 +1143,9 @@ func (tc *testContext) listAll(t *testing.T) (objects []metav1.Object) {
|
||||
}
|
||||
|
||||
func (tc *testContext) listAssumedClaims() []metav1.Object {
|
||||
if tc.p.claimAssumeCache == nil {
|
||||
return nil
|
||||
}
|
||||
var assumedClaims []metav1.Object
|
||||
for _, obj := range tc.p.claimAssumeCache.List(nil) {
|
||||
claim := obj.(*resourceapi.ResourceClaim)
|
||||
@@ -1219,7 +1229,7 @@ func update(t *testing.T, objects []metav1.Object, updates change) []metav1.Obje
|
||||
return updated
|
||||
}
|
||||
|
||||
func setup(t *testing.T, nodes []*v1.Node, claims []*resourceapi.ResourceClaim, classes []*resourceapi.DeviceClass, schedulings []*resourceapi.PodSchedulingContext, objs []apiruntime.Object) (result *testContext) {
|
||||
func setup(t *testing.T, nodes []*v1.Node, claims []*resourceapi.ResourceClaim, classes []*resourceapi.DeviceClass, schedulings []*resourceapi.PodSchedulingContext, objs []apiruntime.Object, features feature.Features) (result *testContext) {
|
||||
t.Helper()
|
||||
|
||||
tc := &testContext{}
|
||||
@@ -1242,7 +1252,7 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourceapi.ResourceClaim,
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pl, err := New(tCtx, nil, fh, feature.Features{EnableDynamicResourceAllocation: true})
|
||||
pl, err := New(tCtx, nil, fh, features)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -1436,7 +1446,10 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
|
||||
for name, tc := range testcases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
logger, tCtx := ktesting.NewTestContext(t)
|
||||
testCtx := setup(t, nil, tc.claims, nil, nil, nil)
|
||||
features := feature.Features{
|
||||
EnableDynamicResourceAllocation: true,
|
||||
}
|
||||
testCtx := setup(t, nil, tc.claims, nil, nil, nil, features)
|
||||
oldObj := tc.oldObj
|
||||
newObj := tc.newObj
|
||||
if claim, ok := tc.newObj.(*resourceapi.ResourceClaim); ok {
|
||||
@@ -1604,7 +1617,11 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testCtx := setup(t, nil, tc.claims, nil, tc.schedulings, nil)
|
||||
features := feature.Features{
|
||||
EnableDynamicResourceAllocation: true,
|
||||
EnableDRAControlPlaneController: true,
|
||||
}
|
||||
testCtx := setup(t, nil, tc.claims, nil, tc.schedulings, nil, features)
|
||||
actualHint, err := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj)
|
||||
if tc.expectedErr {
|
||||
require.Error(t, err)
|
||||
|
||||
@@ -20,6 +20,7 @@ package feature
|
||||
// This struct allows us to break the dependency of the plugins on
|
||||
// the internal k8s features pkg.
|
||||
type Features struct {
|
||||
EnableDRAControlPlaneController bool
|
||||
EnableDynamicResourceAllocation bool
|
||||
EnableVolumeCapacityPriority bool
|
||||
EnableNodeInclusionPolicyInPodTopologySpread bool
|
||||
|
||||
@@ -46,6 +46,7 @@ import (
|
||||
// through the WithFrameworkOutOfTreeRegistry option.
|
||||
func NewInTreeRegistry() runtime.Registry {
|
||||
fts := plfeature.Features{
|
||||
EnableDRAControlPlaneController: feature.DefaultFeatureGate.Enabled(features.DRAControlPlaneController),
|
||||
EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation),
|
||||
EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
|
||||
EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
|
||||
|
||||
Reference in New Issue
Block a user