Merge pull request #125000 from Gekko0114/vz_pvc
volumezone: scheduler queueing hints: pvc
This commit is contained in:
		@@ -33,6 +33,7 @@ import (
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/util"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// VolumeZone is a plugin that checks volume zone.
 | 
			
		||||
@@ -105,7 +106,8 @@ func (pl *VolumeZone) Name() string {
 | 
			
		||||
// Currently, this is only supported with PersistentVolumeClaims,
 | 
			
		||||
// and only looks for the bound PersistentVolume.
 | 
			
		||||
func (pl *VolumeZone) PreFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
 | 
			
		||||
	podPVTopologies, status := pl.getPVbyPod(ctx, pod)
 | 
			
		||||
	logger := klog.FromContext(ctx)
 | 
			
		||||
	podPVTopologies, status := pl.getPVbyPod(logger, pod)
 | 
			
		||||
	if !status.IsSuccess() {
 | 
			
		||||
		return nil, status
 | 
			
		||||
	}
 | 
			
		||||
@@ -116,16 +118,12 @@ func (pl *VolumeZone) PreFilter(ctx context.Context, cs *framework.CycleState, p
 | 
			
		||||
	return nil, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *VolumeZone) getPVbyPod(ctx context.Context, pod *v1.Pod) ([]pvTopology, *framework.Status) {
 | 
			
		||||
	logger := klog.FromContext(ctx)
 | 
			
		||||
// getPVbyPod gets PVTopology from pod
 | 
			
		||||
func (pl *VolumeZone) getPVbyPod(logger klog.Logger, pod *v1.Pod) ([]pvTopology, *framework.Status) {
 | 
			
		||||
	podPVTopologies := make([]pvTopology, 0)
 | 
			
		||||
 | 
			
		||||
	for i := range pod.Spec.Volumes {
 | 
			
		||||
		volume := pod.Spec.Volumes[i]
 | 
			
		||||
		if volume.PersistentVolumeClaim == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		pvcName := volume.PersistentVolumeClaim.ClaimName
 | 
			
		||||
	pvcNames := pl.getPersistentVolumeClaimNameFromPod(pod)
 | 
			
		||||
	for _, pvcName := range pvcNames {
 | 
			
		||||
		if pvcName == "" {
 | 
			
		||||
			return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no name")
 | 
			
		||||
		}
 | 
			
		||||
@@ -212,7 +210,7 @@ func (pl *VolumeZone) Filter(ctx context.Context, cs *framework.CycleState, pod
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		// Fallback to calculate pv list here
 | 
			
		||||
		var status *framework.Status
 | 
			
		||||
		podPVTopologies, status = pl.getPVbyPod(ctx, pod)
 | 
			
		||||
		podPVTopologies, status = pl.getPVbyPod(logger, pod)
 | 
			
		||||
		if !status.IsSuccess() {
 | 
			
		||||
			return status
 | 
			
		||||
		}
 | 
			
		||||
@@ -291,13 +289,59 @@ func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint {
 | 
			
		||||
		// See: https://github.com/kubernetes/kubernetes/issues/110175
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
 | 
			
		||||
		// A new pvc may make a pod schedulable.
 | 
			
		||||
		// Due to fields are immutable except `spec.resources`, pvc update events are ignored.
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
 | 
			
		||||
		// Also, if pvc's VolumeName is filled, that also could make a pod schedulable.
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange},
 | 
			
		||||
		// A new pv or updating a pv's volume zone labels may make a pod schedulable.
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getPersistentVolumeClaimNameFromPod gets pvc names bound to a pod.
 | 
			
		||||
func (pl *VolumeZone) getPersistentVolumeClaimNameFromPod(pod *v1.Pod) []string {
 | 
			
		||||
	var pvcNames []string
 | 
			
		||||
	for i := range pod.Spec.Volumes {
 | 
			
		||||
		volume := pod.Spec.Volumes[i]
 | 
			
		||||
		if volume.PersistentVolumeClaim == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		pvcName := volume.PersistentVolumeClaim.ClaimName
 | 
			
		||||
		pvcNames = append(pvcNames, pvcName)
 | 
			
		||||
	}
 | 
			
		||||
	return pvcNames
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// isSchedulableAfterPersistentVolumeClaimChange is invoked whenever a PersistentVolumeClaim added or updated.
 | 
			
		||||
// It checks whether the change of PVC has made a previously unschedulable pod schedulable.
 | 
			
		||||
func (pl *VolumeZone) isSchedulableAfterPersistentVolumeClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
 | 
			
		||||
	_, modifiedPVC, err := util.As[*v1.PersistentVolumeClaim](oldObj, newObj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPersistentVolumeClaimChange: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
	if pl.isPVCRequestedFromPod(logger, modifiedPVC, pod) {
 | 
			
		||||
		logger.V(5).Info("PVC that is referred from the pod was created or updated, which might make this pod schedulable", "pod", klog.KObj(pod), "PVC", klog.KObj(modifiedPVC))
 | 
			
		||||
		return framework.Queue, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	logger.V(5).Info("PVC irrelevant to the Pod was created or updated, which doesn't make this pod schedulable", "pod", klog.KObj(pod), "PVC", klog.KObj(modifiedPVC))
 | 
			
		||||
	return framework.QueueSkip, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// isPVCRequestedFromPod verifies if the PVC is requested from a given Pod.
 | 
			
		||||
func (pl *VolumeZone) isPVCRequestedFromPod(logger klog.Logger, pvc *v1.PersistentVolumeClaim, pod *v1.Pod) bool {
 | 
			
		||||
	if (pvc == nil) || (pod.Namespace != pvc.Namespace) {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	pvcNames := pl.getPersistentVolumeClaimNameFromPod(pod)
 | 
			
		||||
	for _, pvcName := range pvcNames {
 | 
			
		||||
		if pvc.Name == pvcName {
 | 
			
		||||
			logger.V(5).Info("PVC is referred from the pod", "pod", klog.KObj(pod), "PVC", klog.KObj(pvc))
 | 
			
		||||
			return true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	logger.V(5).Info("PVC is not referred from the pod", "pod", klog.KObj(pod), "PVC", klog.KObj(pvc))
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// New initializes a new plugin and returns it.
 | 
			
		||||
func New(_ context.Context, _ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
 | 
			
		||||
	informerFactory := handle.SharedInformerFactory()
 | 
			
		||||
 
 | 
			
		||||
@@ -22,6 +22,7 @@ import (
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/google/go-cmp/cmp"
 | 
			
		||||
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	storagev1 "k8s.io/api/storage/v1"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
@@ -539,6 +540,93 @@ func TestWithBinding(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestIsSchedulableAfterPersistentVolumeClaimAdded(t *testing.T) {
 | 
			
		||||
	testcases := map[string]struct {
 | 
			
		||||
		pod            *v1.Pod
 | 
			
		||||
		oldObj, newObj interface{}
 | 
			
		||||
		expectedHint   framework.QueueingHint
 | 
			
		||||
		expectedErr    bool
 | 
			
		||||
	}{
 | 
			
		||||
		"error-wrong-new-object": {
 | 
			
		||||
			pod:          createPodWithVolume("pod_1", "PVC_1"),
 | 
			
		||||
			newObj:       "not-a-pvc",
 | 
			
		||||
			expectedHint: framework.Queue,
 | 
			
		||||
			expectedErr:  true,
 | 
			
		||||
		},
 | 
			
		||||
		"pvc-was-added-but-pod-refers-no-pvc": {
 | 
			
		||||
			pod: st.MakePod().Name("pod_1").Namespace("default").Obj(),
 | 
			
		||||
			newObj: &v1.PersistentVolumeClaim{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Name: "PVC_1", Namespace: "default"},
 | 
			
		||||
				Spec:       v1.PersistentVolumeClaimSpec{VolumeName: "Vol_1"},
 | 
			
		||||
			},
 | 
			
		||||
			expectedHint: framework.QueueSkip,
 | 
			
		||||
		},
 | 
			
		||||
		"pvc-was-added-and-pod-was-bound-to-different-pvc": {
 | 
			
		||||
			pod: createPodWithVolume("pod_1", "PVC_2"),
 | 
			
		||||
			newObj: &v1.PersistentVolumeClaim{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Name: "PVC_1", Namespace: "default"},
 | 
			
		||||
				Spec:       v1.PersistentVolumeClaimSpec{VolumeName: "Vol_1"},
 | 
			
		||||
			},
 | 
			
		||||
			expectedHint: framework.QueueSkip,
 | 
			
		||||
		},
 | 
			
		||||
		"pvc-was-added-and-pod-was-bound-to-pvc-but-different-ns": {
 | 
			
		||||
			pod: createPodWithVolume("pod_1", "PVC_1"),
 | 
			
		||||
			newObj: &v1.PersistentVolumeClaim{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Name: "PVC_1", Namespace: "ns1"},
 | 
			
		||||
				Spec:       v1.PersistentVolumeClaimSpec{VolumeName: "Vol_1"},
 | 
			
		||||
			},
 | 
			
		||||
			expectedHint: framework.QueueSkip,
 | 
			
		||||
		},
 | 
			
		||||
		"pvc-was-added-and-pod-was-bound-to-the-pvc": {
 | 
			
		||||
			pod: createPodWithVolume("pod_1", "PVC_1"),
 | 
			
		||||
			newObj: &v1.PersistentVolumeClaim{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Name: "PVC_1", Namespace: "default"},
 | 
			
		||||
				Spec:       v1.PersistentVolumeClaimSpec{VolumeName: "Vol_1"},
 | 
			
		||||
			},
 | 
			
		||||
			expectedHint: framework.Queue,
 | 
			
		||||
		},
 | 
			
		||||
		"pvc-was-updated-and-pod-was-bound-to-the-pvc": {
 | 
			
		||||
			pod: createPodWithVolume("pod_1", "PVC_1"),
 | 
			
		||||
			oldObj: &v1.PersistentVolumeClaim{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Name: "PVC_1", Namespace: "default"},
 | 
			
		||||
				Spec:       v1.PersistentVolumeClaimSpec{VolumeName: ""},
 | 
			
		||||
			},
 | 
			
		||||
			newObj: &v1.PersistentVolumeClaim{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Name: "PVC_1", Namespace: "default"},
 | 
			
		||||
				Spec:       v1.PersistentVolumeClaimSpec{VolumeName: "Vol_1"},
 | 
			
		||||
			},
 | 
			
		||||
			expectedHint: framework.Queue,
 | 
			
		||||
		},
 | 
			
		||||
		"pvc-was-updated-but-pod-refers-no-pvc": {
 | 
			
		||||
			pod: st.MakePod().Name("pod_1").Namespace(metav1.NamespaceDefault).Obj(),
 | 
			
		||||
			oldObj: &v1.PersistentVolumeClaim{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Name: "PVC_1", Namespace: "default"},
 | 
			
		||||
				Spec:       v1.PersistentVolumeClaimSpec{VolumeName: ""},
 | 
			
		||||
			},
 | 
			
		||||
			newObj: &v1.PersistentVolumeClaim{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Name: "PVC_1", Namespace: "default"},
 | 
			
		||||
				Spec:       v1.PersistentVolumeClaimSpec{VolumeName: "Vol_1"},
 | 
			
		||||
			},
 | 
			
		||||
			expectedHint: framework.QueueSkip,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for name, tc := range testcases {
 | 
			
		||||
		t.Run(name, func(t *testing.T) {
 | 
			
		||||
			logger, _ := ktesting.NewTestContext(t)
 | 
			
		||||
			p := &VolumeZone{}
 | 
			
		||||
 | 
			
		||||
			got, err := p.isSchedulableAfterPersistentVolumeClaimChange(logger, tc.pod, tc.oldObj, tc.newObj)
 | 
			
		||||
			if err != nil && !tc.expectedErr {
 | 
			
		||||
				t.Errorf("unexpected error: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			if got != tc.expectedHint {
 | 
			
		||||
				t.Errorf("isSchedulableAfterPersistentVolumeClaimChange() = %v, want %v", got, tc.expectedHint)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func BenchmarkVolumeZone(b *testing.B) {
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		Name      string
 | 
			
		||||
@@ -572,7 +660,7 @@ func BenchmarkVolumeZone(b *testing.B) {
 | 
			
		||||
			defer cancel()
 | 
			
		||||
			nodes := makeNodesWithTopologyZone(tt.NumNodes)
 | 
			
		||||
			pl := newPluginWithListers(ctx, b, []*v1.Pod{tt.Pod}, nodes, makePVCsWithPV(tt.NumPVC), makePVsWithZoneLabel(tt.NumPV))
 | 
			
		||||
			nodeInfos := make([]*framework.NodeInfo, len(nodes), len(nodes))
 | 
			
		||||
			nodeInfos := make([]*framework.NodeInfo, len(nodes))
 | 
			
		||||
			for i := 0; i < len(nodes); i++ {
 | 
			
		||||
				nodeInfo := &framework.NodeInfo{}
 | 
			
		||||
				nodeInfo.SetNode(nodes[i])
 | 
			
		||||
@@ -609,7 +697,7 @@ func newPluginWithListers(ctx context.Context, tb testing.TB, pods []*v1.Pod, no
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func makePVsWithZoneLabel(num int) []*v1.PersistentVolume {
 | 
			
		||||
	pvList := make([]*v1.PersistentVolume, num, num)
 | 
			
		||||
	pvList := make([]*v1.PersistentVolume, num)
 | 
			
		||||
	for i := 0; i < len(pvList); i++ {
 | 
			
		||||
		pvName := fmt.Sprintf("Vol_Stable_%d", i)
 | 
			
		||||
		zone := fmt.Sprintf("us-west-%d", i)
 | 
			
		||||
@@ -621,7 +709,7 @@ func makePVsWithZoneLabel(num int) []*v1.PersistentVolume {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func makePVCsWithPV(num int) []*v1.PersistentVolumeClaim {
 | 
			
		||||
	pvcList := make([]*v1.PersistentVolumeClaim, num, num)
 | 
			
		||||
	pvcList := make([]*v1.PersistentVolumeClaim, num)
 | 
			
		||||
	for i := 0; i < len(pvcList); i++ {
 | 
			
		||||
		pvcName := fmt.Sprintf("PVC_Stable_%d", i)
 | 
			
		||||
		pvName := fmt.Sprintf("Vol_Stable_%d", i)
 | 
			
		||||
@@ -634,10 +722,10 @@ func makePVCsWithPV(num int) []*v1.PersistentVolumeClaim {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func makeNodesWithTopologyZone(num int) []*v1.Node {
 | 
			
		||||
	nodeList := make([]*v1.Node, num, num)
 | 
			
		||||
	nodeList := make([]*v1.Node, num)
 | 
			
		||||
	for i := 0; i < len(nodeList); i++ {
 | 
			
		||||
		nodeName := fmt.Sprintf("host_%d", i)
 | 
			
		||||
		zone := fmt.Sprintf("us-west-0")
 | 
			
		||||
		zone := "us-west-0"
 | 
			
		||||
		nodeList[i] = &v1.Node{
 | 
			
		||||
			ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
				Name:   nodeName,
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user