dra: store generated ResourceClaims in cache
This addresses the following bad sequence of events: - controller creates ResourceClaim - updating pod status fails - pod gets retried before the informer receives the created ResourceClaim - another ResourceClaim gets created Storing the generated ResourceClaim in a MutationCache ensures that the controller knows about it during the retry. A positive side effect is that ResourceClaims now get index by pod owner and thus iterating over existing ones becomes a bit more efficient.
This commit is contained in:
		@@ -27,7 +27,6 @@ import (
 | 
			
		||||
	resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
@@ -59,6 +58,10 @@ const (
 | 
			
		||||
	// and not documented as part of the Kubernetes API.
 | 
			
		||||
	podResourceClaimAnnotation = "resource.kubernetes.io/pod-claim-name"
 | 
			
		||||
 | 
			
		||||
	// claimPodOwnerIndex is used to find ResourceClaims which have
 | 
			
		||||
	// a specific pod as owner. Values for this index are the pod UID.
 | 
			
		||||
	claimPodOwnerIndex = "claim-pod-owner-index"
 | 
			
		||||
 | 
			
		||||
	// Field manager used to update the pod status.
 | 
			
		||||
	fieldManager = "ResourceClaimController"
 | 
			
		||||
 | 
			
		||||
@@ -76,6 +79,7 @@ type Controller struct {
 | 
			
		||||
	// therefore the ResourceClaim objects in its store should be treated as immutable.
 | 
			
		||||
	claimLister  resourcev1alpha2listers.ResourceClaimLister
 | 
			
		||||
	claimsSynced cache.InformerSynced
 | 
			
		||||
	claimCache   cache.MutationCache
 | 
			
		||||
 | 
			
		||||
	// podLister is the shared Pod lister used to fetch Pod
 | 
			
		||||
	// objects from the API server. It is shared with other controllers and
 | 
			
		||||
@@ -163,6 +167,28 @@ func NewController(
 | 
			
		||||
		return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// The mutation cache acts as an additional layer for the informer
 | 
			
		||||
	// cache and after a create made by the controller returns that
 | 
			
		||||
	// object until the informer catches up. That is necessary
 | 
			
		||||
	// when a ResourceClaim got created, updating the pod status fails,
 | 
			
		||||
	// and then a retry occurs before the informer cache is updated.
 | 
			
		||||
	// In that scenario, the controller would create another claim
 | 
			
		||||
	// instead of continuing with the existing one.
 | 
			
		||||
	claimInformerCache := claimInformer.Informer().GetIndexer()
 | 
			
		||||
	if err := claimInformerCache.AddIndexers(cache.Indexers{claimPodOwnerIndex: claimPodOwnerIndexFunc}); err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
	ec.claimCache = cache.NewIntegerResourceVersionMutationCache(claimInformerCache, claimInformerCache,
 | 
			
		||||
		// Very long time to live, unlikely to be needed because
 | 
			
		||||
		// the informer cache should get updated soon.
 | 
			
		||||
		time.Hour,
 | 
			
		||||
		// Allow storing objects not in the underlying cache - that's the point...
 | 
			
		||||
		// It's safe because in case of a race (claim is in mutation cache, claim
 | 
			
		||||
		// gets deleted, controller updates status based on mutation cache) the
 | 
			
		||||
		// "bad" pod status will get detected and fixed when the informer catches up.
 | 
			
		||||
		true,
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	return ec, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -487,6 +513,7 @@ func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.
 | 
			
		||||
			metrics.ResourceClaimCreateFailures.Inc()
 | 
			
		||||
			return fmt.Errorf("create ResourceClaim %s: %v", claimName, err)
 | 
			
		||||
		}
 | 
			
		||||
		ec.claimCache.Mutation(claim)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Remember the new ResourceClaim for a batch PodStatus update in our caller.
 | 
			
		||||
@@ -502,14 +529,16 @@ func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.
 | 
			
		||||
// annotation (ties it to the pod claim) and the right ownership (ties it to
 | 
			
		||||
// the pod).
 | 
			
		||||
func (ec *Controller) findPodResourceClaim(pod *v1.Pod, podClaim v1.PodResourceClaim) (*resourcev1alpha2.ResourceClaim, error) {
 | 
			
		||||
	claims, err := ec.claimLister.List(labels.Everything())
 | 
			
		||||
	// Only claims owned by the pod will get returned here.
 | 
			
		||||
	claims, err := ec.claimCache.ByIndex(claimPodOwnerIndex, string(pod.UID))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	deterministicName := pod.Name + "-" + podClaim.Name // Kubernetes <= 1.27 behavior.
 | 
			
		||||
	for _, claim := range claims {
 | 
			
		||||
		if err := resourceclaim.IsForPod(pod, claim); err != nil {
 | 
			
		||||
			continue
 | 
			
		||||
	for _, claimObj := range claims {
 | 
			
		||||
		claim, ok := claimObj.(*resourcev1alpha2.ResourceClaim)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return nil, fmt.Errorf("unexpected object of type %T returned by claim cache", claimObj)
 | 
			
		||||
		}
 | 
			
		||||
		podClaimName, ok := claim.Annotations[podResourceClaimAnnotation]
 | 
			
		||||
		if ok && podClaimName != podClaim.Name {
 | 
			
		||||
@@ -715,3 +744,22 @@ func isPodDone(pod *v1.Pod) bool {
 | 
			
		||||
		// Deleted and not scheduled:
 | 
			
		||||
		pod.DeletionTimestamp != nil && pod.Spec.NodeName == ""
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// claimPodOwnerIndexFunc is an index function that returns the pod UIDs of
 | 
			
		||||
// all pods which own the resource claim. Should only be one, though.
 | 
			
		||||
func claimPodOwnerIndexFunc(obj interface{}) ([]string, error) {
 | 
			
		||||
	claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return nil, nil
 | 
			
		||||
	}
 | 
			
		||||
	var keys []string
 | 
			
		||||
	for _, owner := range claim.OwnerReferences {
 | 
			
		||||
		if owner.Controller != nil &&
 | 
			
		||||
			*owner.Controller &&
 | 
			
		||||
			owner.APIVersion == "v1" &&
 | 
			
		||||
			owner.Kind == "Pod" {
 | 
			
		||||
			keys = append(keys, string(owner.UID))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return keys, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -92,6 +92,7 @@ func TestSyncHandler(t *testing.T) {
 | 
			
		||||
		name             string
 | 
			
		||||
		key              string
 | 
			
		||||
		claims           []*resourcev1alpha2.ResourceClaim
 | 
			
		||||
		claimsInCache    []*resourcev1alpha2.ResourceClaim
 | 
			
		||||
		pods             []*v1.Pod
 | 
			
		||||
		podsLater        []*v1.Pod
 | 
			
		||||
		templates        []*resourcev1alpha2.ResourceClaimTemplate
 | 
			
		||||
@@ -185,6 +186,18 @@ func TestSyncHandler(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
			expectedMetrics: expectedMetrics{0, 0},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:          "find-created-claim-in-cache",
 | 
			
		||||
			pods:          []*v1.Pod{testPodWithResource},
 | 
			
		||||
			key:           podKey(testPodWithResource),
 | 
			
		||||
			claimsInCache: []*resourcev1alpha2.ResourceClaim{generatedTestClaim},
 | 
			
		||||
			expectedStatuses: map[string][]v1.PodResourceClaimStatus{
 | 
			
		||||
				testPodWithResource.Name: {
 | 
			
		||||
					{Name: testPodWithResource.Spec.ResourceClaims[0].Name, ResourceClaimName: &generatedTestClaim.Name},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expectedMetrics: expectedMetrics{0, 0},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "no-such-pod",
 | 
			
		||||
			key:  podKey(testPodWithResource),
 | 
			
		||||
@@ -345,6 +358,11 @@ func TestSyncHandler(t *testing.T) {
 | 
			
		||||
			informerFactory.WaitForCacheSync(ctx.Done())
 | 
			
		||||
			cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, claimInformer.Informer().HasSynced, templateInformer.Informer().HasSynced)
 | 
			
		||||
 | 
			
		||||
			// Add claims that only exist in the mutation cache.
 | 
			
		||||
			for _, claim := range tc.claimsInCache {
 | 
			
		||||
				ec.claimCache.Mutation(claim)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Simulate race: stop informers, add more pods that the controller doesn't know about.
 | 
			
		||||
			stopInformers()
 | 
			
		||||
			for _, pod := range tc.podsLater {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user