Merge pull request #124931 from pohly/dra-scheduler-prebind-fix
DRA: fix scheduler/resource claim controller race
This commit is contained in:
@@ -18,7 +18,6 @@ package dynamicresources
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
@@ -40,6 +39,7 @@ import (
|
|||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
|
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
|
"k8s.io/client-go/util/retry"
|
||||||
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
|
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
|
||||||
"k8s.io/dynamic-resource-allocation/resourceclaim"
|
"k8s.io/dynamic-resource-allocation/resourceclaim"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
@@ -1460,7 +1460,9 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
|
|||||||
// The allocation would be enough. The full object is useful for
|
// The allocation would be enough. The full object is useful for
|
||||||
// debugging and testing, so let's make it realistic.
|
// debugging and testing, so let's make it realistic.
|
||||||
claim = claim.DeepCopy()
|
claim = claim.DeepCopy()
|
||||||
|
if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) {
|
||||||
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
|
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
|
||||||
|
}
|
||||||
claim.Status.DriverName = driverName
|
claim.Status.DriverName = driverName
|
||||||
claim.Status.Allocation = allocation
|
claim.Status.Allocation = allocation
|
||||||
pl.inFlightAllocations.Store(claim.UID, claim)
|
pl.inFlightAllocations.Store(claim.UID, claim)
|
||||||
@@ -1619,72 +1621,88 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat
|
|||||||
// and reservation are recorded. This finishes the work started in Reserve.
|
// and reservation are recorded. This finishes the work started in Reserve.
|
||||||
func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (patchedClaim *resourcev1alpha2.ResourceClaim, finalErr error) {
|
func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (patchedClaim *resourcev1alpha2.ResourceClaim, finalErr error) {
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
claim := state.claims[index]
|
|
||||||
allocationPatch := ""
|
|
||||||
|
|
||||||
allocation := state.informationsForClaim[index].allocation
|
|
||||||
logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation))
|
|
||||||
|
|
||||||
// Do we need to store an allocation result from Reserve?
|
|
||||||
if allocation != nil {
|
|
||||||
buffer, err := json.Marshal(allocation)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("marshaling AllocationResult failed: %v", err)
|
|
||||||
}
|
|
||||||
allocationPatch = fmt.Sprintf(`"driverName": %q, "allocation": %s, `, state.informationsForClaim[index].allocationDriverName, string(buffer))
|
|
||||||
|
|
||||||
// The finalizer needs to be added in a normal update. Using a simple update is fine
|
|
||||||
// because we don't expect concurrent modifications while the claim is not allocated
|
|
||||||
// yet. If there are any, we want to fail.
|
|
||||||
//
|
|
||||||
// If we were interrupted in the past, it might already be set and we simply continue.
|
|
||||||
if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) {
|
|
||||||
claim := state.claims[index].DeepCopy()
|
claim := state.claims[index].DeepCopy()
|
||||||
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
|
allocation := state.informationsForClaim[index].allocation
|
||||||
if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil {
|
defer func() {
|
||||||
return nil, fmt.Errorf("add finalizer: %v", err)
|
if allocation != nil {
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// The claim might be stale, for example because the claim can get shared and some
|
|
||||||
// other goroutine has updated it in the meantime. We therefore cannot use
|
|
||||||
// SSA here to add the pod because then we would have to send the entire slice
|
|
||||||
// or use different field manager strings for each entry.
|
|
||||||
//
|
|
||||||
// With a strategic-merge-patch, we can simply send one new entry. The apiserver
|
|
||||||
// validation will catch if two goroutines try to do that at the same time and
|
|
||||||
// the claim cannot be shared.
|
|
||||||
//
|
|
||||||
// Note that this also works when the allocation result gets added twice because
|
|
||||||
// two pods both started using a shared claim: the first pod to get here adds the
|
|
||||||
// allocation result. The second pod then only adds itself to reservedFor.
|
|
||||||
patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": {%s "reservedFor": [ {"resource": "pods", "name": %q, "uid": %q} ] }}`,
|
|
||||||
claim.UID,
|
|
||||||
allocationPatch,
|
|
||||||
pod.Name,
|
|
||||||
pod.UID,
|
|
||||||
)
|
|
||||||
if loggerV := logger.V(6); loggerV.Enabled() {
|
|
||||||
logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim), "patch", patch)
|
|
||||||
} else {
|
|
||||||
logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim))
|
|
||||||
}
|
|
||||||
claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status")
|
|
||||||
logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim), "err", err)
|
|
||||||
if allocationPatch != "" {
|
|
||||||
// The scheduler was handling allocation. Now that has
|
// The scheduler was handling allocation. Now that has
|
||||||
// completed, either successfully or with a failure.
|
// completed, either successfully or with a failure.
|
||||||
if err == nil {
|
if finalErr == nil {
|
||||||
// This can fail, but only for reasons that are okay (concurrent delete or update).
|
// This can fail, but only for reasons that are okay (concurrent delete or update).
|
||||||
// Shouldn't happen in this case.
|
// Shouldn't happen in this case.
|
||||||
if err := pl.claimAssumeCache.Assume(claim); err != nil {
|
if err := pl.claimAssumeCache.Assume(claim); err != nil {
|
||||||
logger.V(5).Info("Claim not stored in assume cache", "err", err)
|
logger.V(5).Info("Claim not stored in assume cache", "err", finalErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pl.inFlightAllocations.Delete(claim.UID)
|
pl.inFlightAllocations.Delete(claim.UID)
|
||||||
}
|
}
|
||||||
return claim, err
|
}()
|
||||||
|
|
||||||
|
logger.V(5).Info("preparing claim status update", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation))
|
||||||
|
|
||||||
|
// We may run into a ResourceVersion conflict because there may be some
|
||||||
|
// benign concurrent changes. In that case we get the latest claim and
|
||||||
|
// try again.
|
||||||
|
refreshClaim := false
|
||||||
|
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||||
|
if refreshClaim {
|
||||||
|
updatedClaim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Get(ctx, claim.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get updated claim %s after conflict: %w", klog.KObj(claim), err)
|
||||||
|
}
|
||||||
|
logger.V(5).Info("retrying update after conflict", "claim", klog.KObj(claim))
|
||||||
|
claim = updatedClaim
|
||||||
|
} else {
|
||||||
|
// All future retries must get a new claim first.
|
||||||
|
refreshClaim = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if claim.DeletionTimestamp != nil {
|
||||||
|
return fmt.Errorf("claim %s got deleted in the meantime", klog.KObj(claim))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do we need to store an allocation result from Reserve?
|
||||||
|
if allocation != nil {
|
||||||
|
if claim.Status.Allocation != nil {
|
||||||
|
return fmt.Errorf("claim %s got allocated elsewhere in the meantime", klog.KObj(claim))
|
||||||
|
}
|
||||||
|
|
||||||
|
// The finalizer needs to be added in a normal update.
|
||||||
|
// If we were interrupted in the past, it might already be set and we simply continue.
|
||||||
|
if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) {
|
||||||
|
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
|
||||||
|
updatedClaim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("add finalizer to claim %s: %w", klog.KObj(claim), err)
|
||||||
|
}
|
||||||
|
claim = updatedClaim
|
||||||
|
}
|
||||||
|
|
||||||
|
claim.Status.DriverName = state.informationsForClaim[index].allocationDriverName
|
||||||
|
claim.Status.Allocation = allocation
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can simply try to add the pod here without checking
|
||||||
|
// preconditions. The apiserver will tell us with a
|
||||||
|
// non-conflict error if this isn't possible.
|
||||||
|
claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: pod.Name, UID: pod.UID})
|
||||||
|
updatedClaim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
if allocation != nil {
|
||||||
|
return fmt.Errorf("add allocation and reservation to claim %s: %w", klog.KObj(claim), err)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("add reservation to claim %s: %w", klog.KObj(claim), err)
|
||||||
|
}
|
||||||
|
claim = updatedClaim
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return nil, retryErr
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim))
|
||||||
|
return claim, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PostBind is called after a pod is successfully bound to a node. Now we are
|
// PostBind is called after a pod is successfully bound to a node. Now we are
|
||||||
|
|||||||
@@ -580,6 +580,115 @@ func TestPlugin(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"delayed-allocation-structured-with-resources-has-finalizer": {
|
||||||
|
// As before. but the finalizer is already set. Could happen if
|
||||||
|
// the scheduler got interrupted.
|
||||||
|
pod: podWithClaimName,
|
||||||
|
claims: func() []*resourcev1alpha2.ResourceClaim {
|
||||||
|
claim := pendingDelayedClaim.DeepCopy()
|
||||||
|
claim.Finalizers = structuredAllocatedClaim.Finalizers
|
||||||
|
return []*resourcev1alpha2.ResourceClaim{claim}
|
||||||
|
}(),
|
||||||
|
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
|
||||||
|
objs: []apiruntime.Object{workerNodeSlice},
|
||||||
|
want: want{
|
||||||
|
reserve: result{
|
||||||
|
inFlightClaim: structuredAllocatedClaim,
|
||||||
|
},
|
||||||
|
prebind: result{
|
||||||
|
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
|
||||||
|
changes: change{
|
||||||
|
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
|
||||||
|
if claim.Name == claimName {
|
||||||
|
claim = claim.DeepCopy()
|
||||||
|
claim.Status = structuredInUseClaim.Status
|
||||||
|
}
|
||||||
|
return claim
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
postbind: result{
|
||||||
|
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"delayed-allocation-structured-with-resources-finalizer-gets-removed": {
|
||||||
|
// As before. but the finalizer is already set. Then it gets
|
||||||
|
// removed before the scheduler reaches PreBind.
|
||||||
|
pod: podWithClaimName,
|
||||||
|
claims: func() []*resourcev1alpha2.ResourceClaim {
|
||||||
|
claim := pendingDelayedClaim.DeepCopy()
|
||||||
|
claim.Finalizers = structuredAllocatedClaim.Finalizers
|
||||||
|
return []*resourcev1alpha2.ResourceClaim{claim}
|
||||||
|
}(),
|
||||||
|
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
|
||||||
|
objs: []apiruntime.Object{workerNodeSlice},
|
||||||
|
prepare: prepare{
|
||||||
|
prebind: change{
|
||||||
|
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
|
||||||
|
claim.Finalizers = nil
|
||||||
|
return claim
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: want{
|
||||||
|
reserve: result{
|
||||||
|
inFlightClaim: structuredAllocatedClaim,
|
||||||
|
},
|
||||||
|
prebind: result{
|
||||||
|
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
|
||||||
|
changes: change{
|
||||||
|
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
|
||||||
|
if claim.Name == claimName {
|
||||||
|
claim = claim.DeepCopy()
|
||||||
|
claim.Finalizers = structuredAllocatedClaim.Finalizers
|
||||||
|
claim.Status = structuredInUseClaim.Status
|
||||||
|
}
|
||||||
|
return claim
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
postbind: result{
|
||||||
|
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"delayed-allocation-structured-with-resources-finalizer-gets-added": {
|
||||||
|
// No finalizer initially, then it gets added before
|
||||||
|
// the scheduler reaches PreBind. Shouldn't happen?
|
||||||
|
pod: podWithClaimName,
|
||||||
|
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
|
||||||
|
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
|
||||||
|
objs: []apiruntime.Object{workerNodeSlice},
|
||||||
|
prepare: prepare{
|
||||||
|
prebind: change{
|
||||||
|
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
|
||||||
|
claim.Finalizers = structuredAllocatedClaim.Finalizers
|
||||||
|
return claim
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: want{
|
||||||
|
reserve: result{
|
||||||
|
inFlightClaim: structuredAllocatedClaim,
|
||||||
|
},
|
||||||
|
prebind: result{
|
||||||
|
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
|
||||||
|
changes: change{
|
||||||
|
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
|
||||||
|
if claim.Name == claimName {
|
||||||
|
claim = claim.DeepCopy()
|
||||||
|
claim.Status = structuredInUseClaim.Status
|
||||||
|
}
|
||||||
|
return claim
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
postbind: result{
|
||||||
|
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
"delayed-allocation-structured-skip-bind": {
|
"delayed-allocation-structured-skip-bind": {
|
||||||
pod: podWithClaimName,
|
pod: podWithClaimName,
|
||||||
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
|
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
|
||||||
|
|||||||
Reference in New Issue
Block a user