volumebinding: scheduler queueing hints - CSIStorageCapacity (#124961)
* volumebinding: scheduler queueing hints - CSIStorageCapacity * Fixed points mentioned in the review * Fixed points mentioned in the review * Update pkg/scheduler/framework/plugins/volumebinding/volume_binding.go Co-authored-by: Kensei Nakada <handbomusic@gmail.com> * Update pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go Co-authored-by: Kensei Nakada <handbomusic@gmail.com> * Fixed points mentioned in the review * volume_binding.go を更新 Co-authored-by: Kensei Nakada <handbomusic@gmail.com> --------- Co-authored-by: Kensei Nakada <handbomusic@gmail.com>
This commit is contained in:
		@@ -27,6 +27,7 @@ import (
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	storagev1 "k8s.io/api/storage/v1"
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/resource"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
@@ -1048,12 +1049,16 @@ func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func capacitySufficient(capacity *storagev1.CSIStorageCapacity, sizeInBytes int64) bool {
 | 
			
		||||
	limit := capacity.Capacity
 | 
			
		||||
	limit := volumeLimit(capacity)
 | 
			
		||||
	return limit != nil && limit.Value() >= sizeInBytes
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func volumeLimit(capacity *storagev1.CSIStorageCapacity) *resource.Quantity {
 | 
			
		||||
	if capacity.MaximumVolumeSize != nil {
 | 
			
		||||
		// Prefer MaximumVolumeSize if available, it is more precise.
 | 
			
		||||
		limit = capacity.MaximumVolumeSize
 | 
			
		||||
		return capacity.MaximumVolumeSize
 | 
			
		||||
	}
 | 
			
		||||
	return limit != nil && limit.Value() >= sizeInBytes
 | 
			
		||||
	return capacity.Capacity
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *volumeBinder) nodeHasAccess(logger klog.Logger, node *v1.Node, capacity *storagev1.CSIStorageCapacity) bool {
 | 
			
		||||
 
 | 
			
		||||
@@ -126,7 +126,7 @@ func (pl *VolumeBinding) EventsToRegister(_ context.Context) ([]framework.Cluste
 | 
			
		||||
		// When CSIStorageCapacity is enabled, pods may become schedulable
 | 
			
		||||
		// on CSI driver & storage capacity changes.
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update}},
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}},
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterCSIStorageCapacityChange},
 | 
			
		||||
	}
 | 
			
		||||
	return events, nil
 | 
			
		||||
}
 | 
			
		||||
@@ -228,6 +228,47 @@ func (pl *VolumeBinding) isSchedulableAfterStorageClassChange(logger klog.Logger
 | 
			
		||||
	return framework.QueueSkip, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// isSchedulableAfterCSIStorageCapacityChange checks whether a CSIStorageCapacity event
 | 
			
		||||
// might make a Pod schedulable or not.
 | 
			
		||||
// Any CSIStorageCapacity addition and a CSIStorageCapacity update to volume limit
 | 
			
		||||
// (calculated based on capacity and maximumVolumeSize) might make a Pod schedulable.
 | 
			
		||||
// Note that an update to nodeTopology and storageClassName is not allowed and
 | 
			
		||||
// we don't have to consider while examining the update event.
 | 
			
		||||
func (pl *VolumeBinding) isSchedulableAfterCSIStorageCapacityChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
 | 
			
		||||
	oldCap, newCap, err := util.As[*storagev1.CSIStorageCapacity](oldObj, newObj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return framework.Queue, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if oldCap == nil {
 | 
			
		||||
		logger.V(5).Info(
 | 
			
		||||
			"A new CSIStorageCapacity was created, which could make a Pod schedulable",
 | 
			
		||||
			"Pod", klog.KObj(pod),
 | 
			
		||||
			"CSIStorageCapacity", klog.KObj(newCap),
 | 
			
		||||
		)
 | 
			
		||||
		return framework.Queue, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	oldLimit := volumeLimit(oldCap)
 | 
			
		||||
	newLimit := volumeLimit(newCap)
 | 
			
		||||
 | 
			
		||||
	logger = klog.LoggerWithValues(
 | 
			
		||||
		logger,
 | 
			
		||||
		"Pod", klog.KObj(pod),
 | 
			
		||||
		"CSIStorageCapacity", klog.KObj(newCap),
 | 
			
		||||
		"volumeLimit(new)", newLimit,
 | 
			
		||||
		"volumeLimit(old)", oldLimit,
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	if newLimit != nil && (oldLimit == nil || newLimit.Value() > oldLimit.Value()) {
 | 
			
		||||
		logger.V(5).Info("VolumeLimit was increased, which could make a Pod schedulable")
 | 
			
		||||
		return framework.Queue, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	logger.V(5).Info("CSIStorageCapacity was updated, but it doesn't make this pod schedulable")
 | 
			
		||||
	return framework.QueueSkip, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// podHasPVCs returns 2 values:
 | 
			
		||||
// - the first one to denote if the given "pod" has any PVC defined.
 | 
			
		||||
// - the second one to return any error if the requested PVC is illegal.
 | 
			
		||||
 
 | 
			
		||||
@@ -1181,3 +1181,232 @@ func TestIsSchedulableAfterStorageClassChange(t *testing.T) {
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestIsSchedulableAfterCSIStorageCapacityChange(t *testing.T) {
 | 
			
		||||
	table := []struct {
 | 
			
		||||
		name    string
 | 
			
		||||
		pod     *v1.Pod
 | 
			
		||||
		oldCap  interface{}
 | 
			
		||||
		newCap  interface{}
 | 
			
		||||
		wantErr bool
 | 
			
		||||
		expect  framework.QueueingHint
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name:   "When a new CSIStorageCapacity is created, it returns Queue",
 | 
			
		||||
			pod:    makePod("pod-a").Pod,
 | 
			
		||||
			oldCap: nil,
 | 
			
		||||
			newCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
			expect:  framework.Queue,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "When the volume limit is increase(Capacity set), it returns Queue",
 | 
			
		||||
			pod:  makePod("pod-a").Pod,
 | 
			
		||||
			oldCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			newCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity: resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
			expect:  framework.Queue,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "When the volume limit is increase(MaximumVolumeSize set), it returns Queue",
 | 
			
		||||
			pod:  makePod("pod-a").Pod,
 | 
			
		||||
			oldCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			newCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				MaximumVolumeSize: resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
			expect:  framework.Queue,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "When the volume limit is increase(Capacity increase), it returns Queue",
 | 
			
		||||
			pod:  makePod("pod-a").Pod,
 | 
			
		||||
			oldCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity: resource.NewQuantity(50, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			newCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity: resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
			expect:  framework.Queue,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "When the volume limit is increase(MaximumVolumeSize unset), it returns Queue",
 | 
			
		||||
			pod:  makePod("pod-a").Pod,
 | 
			
		||||
			oldCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity:          resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
				MaximumVolumeSize: resource.NewQuantity(50, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			newCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity: resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
			expect:  framework.Queue,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "When the volume limit is increase(MaximumVolumeSize increase), it returns Queue",
 | 
			
		||||
			pod:  makePod("pod-a").Pod,
 | 
			
		||||
			oldCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity:          resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
				MaximumVolumeSize: resource.NewQuantity(50, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			newCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity:          resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
				MaximumVolumeSize: resource.NewQuantity(60, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
			expect:  framework.Queue,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "When there are no changes to the CSIStorageCapacity, it returns QueueSkip",
 | 
			
		||||
			pod:  makePod("pod-a").Pod,
 | 
			
		||||
			oldCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity:          resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
				MaximumVolumeSize: resource.NewQuantity(50, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			newCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity:          resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
				MaximumVolumeSize: resource.NewQuantity(50, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
			expect:  framework.QueueSkip,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "When the volume limit is equal(Capacity), it returns QueueSkip",
 | 
			
		||||
			pod:  makePod("pod-a").Pod,
 | 
			
		||||
			oldCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity: resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			newCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity: resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
			expect:  framework.QueueSkip,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "When the volume limit is equal(MaximumVolumeSize unset), it returns QueueSkip",
 | 
			
		||||
			pod:  makePod("pod-a").Pod,
 | 
			
		||||
			oldCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity:          resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
				MaximumVolumeSize: resource.NewQuantity(50, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			newCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity: resource.NewQuantity(50, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
			expect:  framework.QueueSkip,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "When the volume limit is decrease(Capacity), it returns QueueSkip",
 | 
			
		||||
			pod:  makePod("pod-a").Pod,
 | 
			
		||||
			oldCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity: resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			newCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				Capacity: resource.NewQuantity(90, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
			expect:  framework.QueueSkip,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "When the volume limit is decrease(MaximumVolumeSize), it returns QueueSkip",
 | 
			
		||||
			pod:  makePod("pod-a").Pod,
 | 
			
		||||
			oldCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				MaximumVolumeSize: resource.NewQuantity(100, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			newCap: &storagev1.CSIStorageCapacity{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "cap-a",
 | 
			
		||||
				},
 | 
			
		||||
				MaximumVolumeSize: resource.NewQuantity(90, resource.DecimalSI),
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
			expect:  framework.QueueSkip,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:    "type conversion error",
 | 
			
		||||
			oldCap:  new(struct{}),
 | 
			
		||||
			newCap:  new(struct{}),
 | 
			
		||||
			wantErr: true,
 | 
			
		||||
			expect:  framework.Queue,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, item := range table {
 | 
			
		||||
		t.Run(item.name, func(t *testing.T) {
 | 
			
		||||
			pl := &VolumeBinding{}
 | 
			
		||||
			logger, _ := ktesting.NewTestContext(t)
 | 
			
		||||
			qhint, err := pl.isSchedulableAfterCSIStorageCapacityChange(logger, item.pod, item.oldCap, item.newCap)
 | 
			
		||||
			if (err != nil) != item.wantErr {
 | 
			
		||||
				t.Errorf("error is unexpectedly returned or not returned from isSchedulableAfterCSIStorageCapacityChange. wantErr: %v actual error: %q", item.wantErr, err)
 | 
			
		||||
			}
 | 
			
		||||
			if qhint != item.expect {
 | 
			
		||||
				t.Errorf("QHint does not match: %v, want: %v", qhint, item.expect)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user