dra: handle scheduled pods in kube-controller-manager

When someone decides that a Pod should definitely run on a specific node, they
can create the Pod with spec.nodeName already set. Some custom scheduler might
do that. Then kubelet starts to check the pod and (if DRA is enabled) will
refuse to run it, either because the claims are still waiting for the first
consumer or the pod wasn't added to reservedFor. Both are things the scheduler
normally does.

Also, if a pod got scheduled while the DRA feature was off in the
kube-scheduler, a pod can reach the same state.

The resource claim controller can handle these two cases by taking over for the
kube-scheduler when nodeName is set. Triggering an allocation is simpler than
in the scheduler because all it takes is creating the right
PodSchedulingContext with spec.selectedNode set. There's no need to list nodes
because that choice was already made, permanently. Adding the pod to
reservedFor also isn't hard.

What's currently missing is triggering de-allocation of claims to re-allocate
them for the desired node. This is not important for claims that get created
for the pod from a template and then only get used once, but it might be
worthwhile to add de-allocation in the future.
This commit is contained in:
Patrick Ohly
2023-05-22 19:44:58 +02:00
parent cffbb1f1b2
commit 80ab8f0542
6 changed files with 337 additions and 71 deletions

View File

@@ -40,6 +40,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics"
"k8s.io/utils/pointer"
)
var (
@@ -50,44 +51,53 @@ var (
podResourceClaimName = "acme-resource"
templateName = "my-template"
className = "my-resource-class"
nodeName = "worker"
testPod = makePod(testPodName, testNamespace, testPodUID)
testPodWithResource = makePod(testPodName, testNamespace, testPodUID, *makePodResourceClaim(podResourceClaimName, templateName))
otherTestPod = makePod(testPodName+"-II", testNamespace, testPodUID+"-II")
testClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, makeOwnerReference(testPodWithResource, true))
generatedTestClaim = makeGeneratedClaim(podResourceClaimName, testPodName+"-"+podResourceClaimName, testNamespace, className, 1, makeOwnerReference(testPodWithResource, true))
testClaimAllocated = func() *resourcev1alpha2.ResourceClaim {
claim := testClaim.DeepCopy()
claim.Status.Allocation = &resourcev1alpha2.AllocationResult{
Shareable: true,
}
return claim
}()
testClaimReserved = func() *resourcev1alpha2.ResourceClaim {
claim := testClaimAllocated.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,
resourcev1alpha2.ResourceClaimConsumerReference{
Resource: "pods",
Name: testPodWithResource.Name,
UID: testPodWithResource.UID,
},
)
return claim
}()
testClaimReservedTwice = func() *resourcev1alpha2.ResourceClaim {
claim := testClaimReserved.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,
resourcev1alpha2.ResourceClaimConsumerReference{
Resource: "pods",
Name: otherTestPod.Name,
UID: otherTestPod.UID,
},
)
return claim
}()
testClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, makeOwnerReference(testPodWithResource, true))
testClaimAllocated = allocateClaim(testClaim)
testClaimReserved = reserveClaim(testClaimAllocated, testPodWithResource)
testClaimReservedTwice = reserveClaim(testClaimReserved, otherTestPod)
generatedTestClaim = makeGeneratedClaim(podResourceClaimName, testPodName+"-"+podResourceClaimName, testNamespace, className, 1, makeOwnerReference(testPodWithResource, true))
generatedTestClaimAllocated = allocateClaim(generatedTestClaim)
generatedTestClaimReserved = reserveClaim(generatedTestClaimAllocated, testPodWithResource)
conflictingClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, nil)
otherNamespaceClaim = makeClaim(testPodName+"-"+podResourceClaimName, otherNamespace, className, nil)
template = makeTemplate(templateName, testNamespace, className)
testPodWithNodeName = func() *v1.Pod {
pod := testPodWithResource.DeepCopy()
pod.Spec.NodeName = nodeName
pod.Status.ResourceClaimStatuses = append(pod.Status.ResourceClaimStatuses, v1.PodResourceClaimStatus{
Name: pod.Spec.ResourceClaims[0].Name,
ResourceClaimName: &generatedTestClaim.Name,
})
return pod
}()
podSchedulingContext = resourcev1alpha2.PodSchedulingContext{
ObjectMeta: metav1.ObjectMeta{
Name: testPodName,
Namespace: testNamespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: testPodName,
UID: testPodUID,
Controller: pointer.Bool(true),
},
},
},
Spec: resourcev1alpha2.PodSchedulingContextSpec{
SelectedNode: nodeName,
},
}
)
func init() {
@@ -96,17 +106,18 @@ func init() {
func TestSyncHandler(t *testing.T) {
tests := []struct {
name string
key string
claims []*resourcev1alpha2.ResourceClaim
claimsInCache []*resourcev1alpha2.ResourceClaim
pods []*v1.Pod
podsLater []*v1.Pod
templates []*resourcev1alpha2.ResourceClaimTemplate
expectedClaims []resourcev1alpha2.ResourceClaim
expectedStatuses map[string][]v1.PodResourceClaimStatus
expectedError bool
expectedMetrics expectedMetrics
name string
key string
claims []*resourcev1alpha2.ResourceClaim
claimsInCache []*resourcev1alpha2.ResourceClaim
pods []*v1.Pod
podsLater []*v1.Pod
templates []*resourcev1alpha2.ResourceClaimTemplate
expectedClaims []resourcev1alpha2.ResourceClaim
expectedPodSchedulingContexts []resourcev1alpha2.PodSchedulingContext
expectedStatuses map[string][]v1.PodResourceClaimStatus
expectedError bool
expectedMetrics expectedMetrics
}{
{
name: "create",
@@ -361,6 +372,35 @@ func TestSyncHandler(t *testing.T) {
expectedClaims: nil,
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "trigger-allocation",
pods: []*v1.Pod{testPodWithNodeName},
key: podKey(testPodWithNodeName),
templates: []*resourcev1alpha2.ResourceClaimTemplate{template},
claims: []*resourcev1alpha2.ResourceClaim{generatedTestClaim},
expectedClaims: []resourcev1alpha2.ResourceClaim{*generatedTestClaim},
expectedStatuses: map[string][]v1.PodResourceClaimStatus{
testPodWithNodeName.Name: {
{Name: testPodWithNodeName.Spec.ResourceClaims[0].Name, ResourceClaimName: &generatedTestClaim.Name},
},
},
expectedPodSchedulingContexts: []resourcev1alpha2.PodSchedulingContext{podSchedulingContext},
expectedMetrics: expectedMetrics{0, 0},
},
{
name: "add-reserved",
pods: []*v1.Pod{testPodWithNodeName},
key: podKey(testPodWithNodeName),
templates: []*resourcev1alpha2.ResourceClaimTemplate{template},
claims: []*resourcev1alpha2.ResourceClaim{generatedTestClaimAllocated},
expectedClaims: []resourcev1alpha2.ResourceClaim{*generatedTestClaimReserved},
expectedStatuses: map[string][]v1.PodResourceClaimStatus{
testPodWithNodeName.Name: {
{Name: testPodWithNodeName.Spec.ResourceClaims[0].Name, ResourceClaimName: &generatedTestClaim.Name},
},
},
expectedMetrics: expectedMetrics{0, 0},
},
}
for _, tc := range tests {
@@ -389,10 +429,11 @@ func TestSyncHandler(t *testing.T) {
setupMetrics()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
podInformer := informerFactory.Core().V1().Pods()
podSchedulingInformer := informerFactory.Resource().V1alpha2().PodSchedulingContexts()
claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims()
templateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates()
ec, err := NewController(klog.FromContext(ctx), fakeKubeClient, podInformer, claimInformer, templateInformer)
ec, err := NewController(klog.FromContext(ctx), fakeKubeClient, podInformer, podSchedulingInformer, claimInformer, templateInformer)
if err != nil {
t.Fatalf("error creating ephemeral controller : %v", err)
}
@@ -451,6 +492,12 @@ func TestSyncHandler(t *testing.T) {
}
assert.Equal(t, tc.expectedStatuses, actualStatuses, "pod resource claim statuses")
scheduling, err := fakeKubeClient.ResourceV1alpha2().PodSchedulingContexts("").List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("unexpected error while listing claims: %v", err)
}
assert.Equal(t, normalizeScheduling(tc.expectedPodSchedulingContexts), normalizeScheduling(scheduling.Items))
expectMetrics(t, tc.expectedMetrics)
})
}
@@ -481,6 +528,7 @@ func makeGeneratedClaim(podClaimName, generateName, namespace, classname string,
},
Spec: resourcev1alpha2.ResourceClaimSpec{
ResourceClassName: classname,
AllocationMode: resourcev1alpha2.AllocationModeWaitForFirstConsumer,
},
}
if owner != nil {
@@ -490,6 +538,26 @@ func makeGeneratedClaim(podClaimName, generateName, namespace, classname string,
return claim
}
func allocateClaim(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
claim = claim.DeepCopy()
claim.Status.Allocation = &resourcev1alpha2.AllocationResult{
Shareable: true,
}
return claim
}
func reserveClaim(claim *resourcev1alpha2.ResourceClaim, pod *v1.Pod) *resourcev1alpha2.ResourceClaim {
claim = claim.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,
resourcev1alpha2.ResourceClaimConsumerReference{
Resource: "pods",
Name: pod.Name,
UID: pod.UID,
},
)
return claim
}
func makePodResourceClaim(name, templateName string) *v1.PodResourceClaim {
return &v1.PodResourceClaim{
Name: name,
@@ -564,6 +632,14 @@ func normalizeClaims(claims []resourcev1alpha2.ResourceClaim) []resourcev1alpha2
return claims
}
func normalizeScheduling(scheduling []resourcev1alpha2.PodSchedulingContext) []resourcev1alpha2.PodSchedulingContext {
sort.Slice(scheduling, func(i, j int) bool {
return scheduling[i].Namespace < scheduling[j].Namespace ||
scheduling[i].Name < scheduling[j].Name
})
return scheduling
}
func createTestClient(objects ...runtime.Object) *fake.Clientset {
fakeClient := fake.NewSimpleClientset(objects...)
fakeClient.PrependReactor("create", "resourceclaims", createResourceClaimReactor())