QueueingHint for CSILimit when deleting pods (#121508)
Signed-off-by: utam0k <k0ma@utam0k.jp>
This commit is contained in:
		@@ -34,6 +34,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/util"
 | 
			
		||||
	volumeutil "k8s.io/kubernetes/pkg/volume/util"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -73,18 +74,42 @@ func (pl *CSILimits) Name() string {
 | 
			
		||||
	return CSIName
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// EventsToRegister returns the possible events that may make a Pod
 | 
			
		||||
// EventsToRegister returns the possible events that may make a Pod.
 | 
			
		||||
// failed by this plugin schedulable.
 | 
			
		||||
func (pl *CSILimits) EventsToRegister() []framework.ClusterEventWithHint {
 | 
			
		||||
	return []framework.ClusterEventWithHint{
 | 
			
		||||
		// We don't register any `QueueingHintFn` intentionally
 | 
			
		||||
		// because any new CSINode could make pods that were rejected by CSI volumes schedulable.
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add}},
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *CSILimits) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
 | 
			
		||||
	deletedPod, _, err := util.As[*v1.Pod](oldObj, newObj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPodDeleted: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(deletedPod.Spec.Volumes) == 0 {
 | 
			
		||||
		return framework.QueueSkip, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if deletedPod.Spec.NodeName == "" {
 | 
			
		||||
		return framework.QueueSkip, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, vol := range deletedPod.Spec.Volumes {
 | 
			
		||||
		if vol.PersistentVolumeClaim != nil || vol.Ephemeral != nil || pl.translator.IsInlineMigratable(&vol) {
 | 
			
		||||
			return framework.Queue, nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	logger.V(5).Info("The deleted pod does not impact the scheduling of the unscheduled pod", "deletedPod", klog.KObj(pod), "pod", klog.KObj(deletedPod))
 | 
			
		||||
	return framework.QueueSkip, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// PreFilter invoked at the prefilter extension point
 | 
			
		||||
//
 | 
			
		||||
// If the pod haven't those types of volumes, we'll skip the Filter phase
 | 
			
		||||
 
 | 
			
		||||
@@ -641,6 +641,71 @@ func TestCSILimits(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCSILimitsQHint(t *testing.T) {
 | 
			
		||||
	podEbs := st.MakePod().PVC("csi-ebs.csi.aws.com-2")
 | 
			
		||||
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		newPod                 *v1.Pod
 | 
			
		||||
		deletedPod             *v1.Pod
 | 
			
		||||
		deletedPodNotScheduled bool
 | 
			
		||||
		test                   string
 | 
			
		||||
		wantQHint              framework.QueueingHint
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			newPod:     podEbs.Obj(),
 | 
			
		||||
			deletedPod: st.MakePod().PVC("placeholder").Obj(),
 | 
			
		||||
			test:       "return a Queue when a deleted pod has a PVC",
 | 
			
		||||
			wantQHint:  framework.Queue,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			newPod:     podEbs.Obj(),
 | 
			
		||||
			deletedPod: st.MakePod().Volume(v1.Volume{VolumeSource: v1.VolumeSource{AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{}}}).Obj(),
 | 
			
		||||
			test:       "return a Queue when a deleted pod has a inline migratable volume",
 | 
			
		||||
			wantQHint:  framework.Queue,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			newPod:     podEbs.Obj(),
 | 
			
		||||
			deletedPod: st.MakePod().Obj(),
 | 
			
		||||
			test:       "return a QueueSkip when a deleted pod doesn't have any volume",
 | 
			
		||||
			wantQHint:  framework.QueueSkip,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			newPod:                 podEbs.Obj(),
 | 
			
		||||
			deletedPod:             st.MakePod().PVC("csi-ebs.csi.aws.com-0").Obj(),
 | 
			
		||||
			deletedPodNotScheduled: true,
 | 
			
		||||
			test:                   "return a QueueSkip when a deleted pod is not scheduled.",
 | 
			
		||||
			wantQHint:              framework.QueueSkip,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
		t.Run(test.test, func(t *testing.T) {
 | 
			
		||||
			node, csiNode := getNodeWithPodAndVolumeLimits("csiNode", []*v1.Pod{}, 1, "")
 | 
			
		||||
			if csiNode != nil {
 | 
			
		||||
				enableMigrationOnNode(csiNode, csilibplugins.AWSEBSDriverName)
 | 
			
		||||
			}
 | 
			
		||||
			if !test.deletedPodNotScheduled {
 | 
			
		||||
				test.deletedPod.Spec.NodeName = node.Node().Name
 | 
			
		||||
			} else {
 | 
			
		||||
				test.deletedPod.Spec.NodeName = ""
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			p := &CSILimits{
 | 
			
		||||
				randomVolumeIDPrefix: rand.String(32),
 | 
			
		||||
				translator:           csitrans.New(),
 | 
			
		||||
			}
 | 
			
		||||
			logger, _ := ktesting.NewTestContext(t)
 | 
			
		||||
			qhint, err := p.isSchedulableAfterPodDeleted(logger, test.newPod, test.deletedPod, nil)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("isSchedulableAfterPodDeleted failed: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			if qhint != test.wantQHint {
 | 
			
		||||
				t.Errorf("QHint does not match: %v, want: %v", qhint, test.wantQHint)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getFakeCSIPVLister(volumeName string, driverNames ...string) tf.PersistentVolumeLister {
 | 
			
		||||
	pvLister := tf.PersistentVolumeLister{}
 | 
			
		||||
	for _, driver := range driverNames {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user