Merge pull request #55957 from jsafrane/protection-predicate
Automatic merge from submit-queue (batch tested with PRs 57211, 56150, 56368, 56271, 55957). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Skip pods that refer to PVCs that are being deleted **What this PR does / why we need it**: New check was added to `Schedule()` to make sure that a scheduled pod refers to existing PVCs that are not being deleted. In 1.9 we plan to add a new feature that uses finalizers on PVC to protect PVCs that are used by a running pod from being deleted. This finalizer will be removed when all pods that use a PVC are finished or deleted. See https://github.com/kubernetes/community/blob/master/contributors/design-proposals/storage/postpone-pvc-deletion-if-used-in-a-pod.md for details. I needed to pass `pvcLister` to `GenericScheduler`. UX: ``` $ kubectl describe pod ... Type Reason Age From Message ---- ------ ---- ---- ------- Warning FailedScheduling 5s (x4 over 8s) default-scheduler persistentvolumeclaim "myclaim" is being deleted Warning FailedScheduling 1s (x2 over 1s) default-scheduler persistentvolumeclaim "myclaim" not found ``` **Release note**: ```release-note Scheduler skips pods that use a PVC that either does not exist or is being deleted. ``` /sig scheduling /kind feature
This commit is contained in:
		@@ -64,6 +64,7 @@ go_library(
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apiserver/pkg/util/trace:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/rest:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/tools/cache:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/util/workqueue:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -317,7 +317,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
 | 
			
		||||
		}
 | 
			
		||||
		queue := NewSchedulingQueue()
 | 
			
		||||
		scheduler := NewGenericScheduler(
 | 
			
		||||
			cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders, nil)
 | 
			
		||||
			cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{})
 | 
			
		||||
		podIgnored := &v1.Pod{}
 | 
			
		||||
		machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
 | 
			
		||||
		if test.expectsErr {
 | 
			
		||||
 
 | 
			
		||||
@@ -31,6 +31,7 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/errors"
 | 
			
		||||
	utiltrace "k8s.io/apiserver/pkg/util/trace"
 | 
			
		||||
	corelisters "k8s.io/client-go/listers/core/v1"
 | 
			
		||||
	"k8s.io/client-go/util/workqueue"
 | 
			
		||||
	"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
 | 
			
		||||
	"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
 | 
			
		||||
@@ -101,6 +102,7 @@ type genericScheduler struct {
 | 
			
		||||
 | 
			
		||||
	cachedNodeInfoMap map[string]*schedulercache.NodeInfo
 | 
			
		||||
	volumeBinder      *volumebinder.VolumeBinder
 | 
			
		||||
	pvcLister         corelisters.PersistentVolumeClaimLister
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Schedule tries to schedule the given pod to one of node in the node list.
 | 
			
		||||
@@ -110,6 +112,10 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
 | 
			
		||||
	trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
 | 
			
		||||
	defer trace.LogIfLong(100 * time.Millisecond)
 | 
			
		||||
 | 
			
		||||
	if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
 | 
			
		||||
		return "", err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	nodes, err := nodeLister.List()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", err
 | 
			
		||||
@@ -995,6 +1001,32 @@ func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedule
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// podPassesBasicChecks makes sanity checks on the pod if it can be scheduled.
 | 
			
		||||
func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeClaimLister) error {
 | 
			
		||||
	// Check PVCs used by the pod
 | 
			
		||||
	namespace := pod.Namespace
 | 
			
		||||
	manifest := &(pod.Spec)
 | 
			
		||||
	for i := range manifest.Volumes {
 | 
			
		||||
		volume := &manifest.Volumes[i]
 | 
			
		||||
		if volume.PersistentVolumeClaim == nil {
 | 
			
		||||
			// Volume is not a PVC, ignore
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		pvcName := volume.PersistentVolumeClaim.ClaimName
 | 
			
		||||
		pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			// The error has already enough context ("persistentvolumeclaim "myclaim" not found")
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if pvc.DeletionTimestamp != nil {
 | 
			
		||||
			return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewGenericScheduler(
 | 
			
		||||
	cache schedulercache.Cache,
 | 
			
		||||
	eCache *EquivalenceCache,
 | 
			
		||||
@@ -1004,7 +1036,8 @@ func NewGenericScheduler(
 | 
			
		||||
	prioritizers []algorithm.PriorityConfig,
 | 
			
		||||
	priorityMetaProducer algorithm.MetadataProducer,
 | 
			
		||||
	extenders []algorithm.SchedulerExtender,
 | 
			
		||||
	volumeBinder *volumebinder.VolumeBinder) algorithm.ScheduleAlgorithm {
 | 
			
		||||
	volumeBinder *volumebinder.VolumeBinder,
 | 
			
		||||
	pvcLister corelisters.PersistentVolumeClaimLister) algorithm.ScheduleAlgorithm {
 | 
			
		||||
	return &genericScheduler{
 | 
			
		||||
		cache:                 cache,
 | 
			
		||||
		equivalenceCache:      eCache,
 | 
			
		||||
@@ -1016,5 +1049,6 @@ func NewGenericScheduler(
 | 
			
		||||
		extenders:             extenders,
 | 
			
		||||
		cachedNodeInfoMap:     make(map[string]*schedulercache.NodeInfo),
 | 
			
		||||
		volumeBinder:          volumeBinder,
 | 
			
		||||
		pvcLister:             pvcLister,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -186,6 +186,7 @@ func TestGenericScheduler(t *testing.T) {
 | 
			
		||||
		predicates    map[string]algorithm.FitPredicate
 | 
			
		||||
		prioritizers  []algorithm.PriorityConfig
 | 
			
		||||
		nodes         []string
 | 
			
		||||
		pvcs          []*v1.PersistentVolumeClaim
 | 
			
		||||
		pod           *v1.Pod
 | 
			
		||||
		pods          []*v1.Pod
 | 
			
		||||
		expectedHosts sets.String
 | 
			
		||||
@@ -300,6 +301,77 @@ func TestGenericScheduler(t *testing.T) {
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// Pod with existing PVC
 | 
			
		||||
			predicates:   map[string]algorithm.FitPredicate{"true": truePredicate},
 | 
			
		||||
			prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
 | 
			
		||||
			nodes:        []string{"machine1", "machine2"},
 | 
			
		||||
			pvcs:         []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC"}}},
 | 
			
		||||
			pod: &v1.Pod{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Name: "ignore"},
 | 
			
		||||
				Spec: v1.PodSpec{
 | 
			
		||||
					Volumes: []v1.Volume{
 | 
			
		||||
						{
 | 
			
		||||
							VolumeSource: v1.VolumeSource{
 | 
			
		||||
								PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
 | 
			
		||||
									ClaimName: "existingPVC",
 | 
			
		||||
								},
 | 
			
		||||
							},
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expectedHosts: sets.NewString("machine1", "machine2"),
 | 
			
		||||
			name:          "existing PVC",
 | 
			
		||||
			wErr:          nil,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// Pod with non existing PVC
 | 
			
		||||
			predicates:   map[string]algorithm.FitPredicate{"true": truePredicate},
 | 
			
		||||
			prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
 | 
			
		||||
			nodes:        []string{"machine1", "machine2"},
 | 
			
		||||
			pod: &v1.Pod{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Name: "ignore"},
 | 
			
		||||
				Spec: v1.PodSpec{
 | 
			
		||||
					Volumes: []v1.Volume{
 | 
			
		||||
						{
 | 
			
		||||
							VolumeSource: v1.VolumeSource{
 | 
			
		||||
								PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
 | 
			
		||||
									ClaimName: "unknownPVC",
 | 
			
		||||
								},
 | 
			
		||||
							},
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			name:       "unknown PVC",
 | 
			
		||||
			expectsErr: true,
 | 
			
		||||
			wErr:       fmt.Errorf("persistentvolumeclaim \"unknownPVC\" not found"),
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// Pod with deleting PVC
 | 
			
		||||
			predicates:   map[string]algorithm.FitPredicate{"true": truePredicate},
 | 
			
		||||
			prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
 | 
			
		||||
			nodes:        []string{"machine1", "machine2"},
 | 
			
		||||
			pvcs:         []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", DeletionTimestamp: &metav1.Time{}}}},
 | 
			
		||||
			pod: &v1.Pod{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Name: "ignore"},
 | 
			
		||||
				Spec: v1.PodSpec{
 | 
			
		||||
					Volumes: []v1.Volume{
 | 
			
		||||
						{
 | 
			
		||||
							VolumeSource: v1.VolumeSource{
 | 
			
		||||
								PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
 | 
			
		||||
									ClaimName: "existingPVC",
 | 
			
		||||
								},
 | 
			
		||||
							},
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			name:       "deleted PVC",
 | 
			
		||||
			expectsErr: true,
 | 
			
		||||
			wErr:       fmt.Errorf("persistentvolumeclaim \"existingPVC\" is being deleted"),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
		cache := schedulercache.New(time.Duration(0), wait.NeverStop)
 | 
			
		||||
@@ -309,9 +381,14 @@ func TestGenericScheduler(t *testing.T) {
 | 
			
		||||
		for _, name := range test.nodes {
 | 
			
		||||
			cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
 | 
			
		||||
		}
 | 
			
		||||
		pvcs := []*v1.PersistentVolumeClaim{}
 | 
			
		||||
		for _, pvc := range test.pvcs {
 | 
			
		||||
			pvcs = append(pvcs, pvc)
 | 
			
		||||
		}
 | 
			
		||||
		pvcLister := schedulertesting.FakePersistentVolumeClaimLister(pvcs)
 | 
			
		||||
 | 
			
		||||
		scheduler := NewGenericScheduler(
 | 
			
		||||
			cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil)
 | 
			
		||||
			cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil, pvcLister)
 | 
			
		||||
		machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
 | 
			
		||||
 | 
			
		||||
		if !reflect.DeepEqual(err, test.wErr) {
 | 
			
		||||
@@ -1190,7 +1267,7 @@ func TestPreempt(t *testing.T) {
 | 
			
		||||
			extenders = append(extenders, extender)
 | 
			
		||||
		}
 | 
			
		||||
		scheduler := NewGenericScheduler(
 | 
			
		||||
			cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders, nil)
 | 
			
		||||
			cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{})
 | 
			
		||||
		// Call Preempt and check the expected results.
 | 
			
		||||
		node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -933,7 +933,7 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
 | 
			
		||||
		glog.Info("Created equivalence class cache")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, f.volumeBinder)
 | 
			
		||||
	algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, f.volumeBinder, f.pVCLister)
 | 
			
		||||
 | 
			
		||||
	podBackoff := util.CreateDefaultPodBackoff()
 | 
			
		||||
	return &scheduler.Config{
 | 
			
		||||
 
 | 
			
		||||
@@ -532,7 +532,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
 | 
			
		||||
		[]algorithm.PriorityConfig{},
 | 
			
		||||
		algorithm.EmptyMetadataProducer,
 | 
			
		||||
		[]algorithm.SchedulerExtender{},
 | 
			
		||||
		nil)
 | 
			
		||||
		nil,
 | 
			
		||||
		schedulertesting.FakePersistentVolumeClaimLister{})
 | 
			
		||||
	bindingChan := make(chan *v1.Binding, 1)
 | 
			
		||||
	errChan := make(chan error, 1)
 | 
			
		||||
	configurator := &FakeConfigurator{
 | 
			
		||||
@@ -575,7 +576,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
 | 
			
		||||
		[]algorithm.PriorityConfig{},
 | 
			
		||||
		algorithm.EmptyMetadataProducer,
 | 
			
		||||
		[]algorithm.SchedulerExtender{},
 | 
			
		||||
		nil)
 | 
			
		||||
		nil,
 | 
			
		||||
		schedulertesting.FakePersistentVolumeClaimLister{})
 | 
			
		||||
	bindingChan := make(chan *v1.Binding, 2)
 | 
			
		||||
	configurator := &FakeConfigurator{
 | 
			
		||||
		Config: &Config{
 | 
			
		||||
 
 | 
			
		||||
@@ -22,6 +22,7 @@ go_library(
 | 
			
		||||
        "//vendor/k8s.io/api/policy/v1beta1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -24,6 +24,7 @@ import (
 | 
			
		||||
	extensions "k8s.io/api/extensions/v1beta1"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
	corelisters "k8s.io/client-go/listers/core/v1"
 | 
			
		||||
	. "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
 | 
			
		||||
	"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
 | 
			
		||||
)
 | 
			
		||||
@@ -176,3 +177,38 @@ func (f FakeStatefulSetLister) GetPodStatefulSets(pod *v1.Pod) (sss []*apps.Stat
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// FakePersistentVolumeClaimLister implements PersistentVolumeClaimLister on []*v1.PersistentVolumeClaim for test purposes.
 | 
			
		||||
type FakePersistentVolumeClaimLister []*v1.PersistentVolumeClaim
 | 
			
		||||
 | 
			
		||||
var _ corelisters.PersistentVolumeClaimLister = FakePersistentVolumeClaimLister{}
 | 
			
		||||
 | 
			
		||||
func (f FakePersistentVolumeClaimLister) List(selector labels.Selector) (ret []*v1.PersistentVolumeClaim, err error) {
 | 
			
		||||
	return nil, fmt.Errorf("not implemented")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f FakePersistentVolumeClaimLister) PersistentVolumeClaims(namespace string) corelisters.PersistentVolumeClaimNamespaceLister {
 | 
			
		||||
	return &fakePersistentVolumeClaimNamespaceLister{
 | 
			
		||||
		pvcs:      f,
 | 
			
		||||
		namespace: namespace,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// fakePersistentVolumeClaimNamespaceLister is implementation of PersistentVolumeClaimNamespaceLister returned by List() above.
 | 
			
		||||
type fakePersistentVolumeClaimNamespaceLister struct {
 | 
			
		||||
	pvcs      []*v1.PersistentVolumeClaim
 | 
			
		||||
	namespace string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *fakePersistentVolumeClaimNamespaceLister) Get(name string) (*v1.PersistentVolumeClaim, error) {
 | 
			
		||||
	for _, pvc := range f.pvcs {
 | 
			
		||||
		if pvc.Name == name && pvc.Namespace == f.namespace {
 | 
			
		||||
			return pvc, nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil, fmt.Errorf("persistentvolumeclaim %q not found", name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f fakePersistentVolumeClaimNamespaceLister) List(selector labels.Selector) (ret []*v1.PersistentVolumeClaim, err error) {
 | 
			
		||||
	return nil, fmt.Errorf("not implemented")
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user