Fix race in informer transformers
This commit is contained in:
		@@ -20,6 +20,7 @@ import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
@@ -575,6 +576,114 @@ func TestTransformingInformer(t *testing.T) {
 | 
			
		||||
	close(stopCh)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestTransformingInformerRace(t *testing.T) {
 | 
			
		||||
	// source simulates an apiserver object endpoint.
 | 
			
		||||
	source := fcache.NewFakeControllerSource()
 | 
			
		||||
 | 
			
		||||
	label := "to-be-transformed"
 | 
			
		||||
	makePod := func(name string) *v1.Pod {
 | 
			
		||||
		return &v1.Pod{
 | 
			
		||||
			ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
				Name:      name,
 | 
			
		||||
				Namespace: "namespace",
 | 
			
		||||
				Labels:    map[string]string{label: "true"},
 | 
			
		||||
			},
 | 
			
		||||
			Spec: v1.PodSpec{
 | 
			
		||||
				Hostname: "hostname",
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	badTransform := atomic.Bool{}
 | 
			
		||||
	podTransformer := func(obj interface{}) (interface{}, error) {
 | 
			
		||||
		pod, ok := obj.(*v1.Pod)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return nil, fmt.Errorf("unexpected object type: %T", obj)
 | 
			
		||||
		}
 | 
			
		||||
		if pod.ObjectMeta.Labels[label] != "true" {
 | 
			
		||||
			badTransform.Store(true)
 | 
			
		||||
			return nil, fmt.Errorf("object already transformed: %#v", obj)
 | 
			
		||||
		}
 | 
			
		||||
		pod.ObjectMeta.Labels[label] = "false"
 | 
			
		||||
		return pod, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	numObjs := 5
 | 
			
		||||
	for i := 0; i < numObjs; i++ {
 | 
			
		||||
		source.Add(makePod(fmt.Sprintf("pod-%d", i)))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	type event struct{}
 | 
			
		||||
	events := make(chan event, numObjs)
 | 
			
		||||
	recordEvent := func(eventType watch.EventType, previous, current interface{}) {
 | 
			
		||||
		events <- event{}
 | 
			
		||||
	}
 | 
			
		||||
	checkEvents := func(count int) {
 | 
			
		||||
		for i := 0; i < count; i++ {
 | 
			
		||||
			<-events
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	store, controller := NewTransformingInformer(
 | 
			
		||||
		source,
 | 
			
		||||
		&v1.Pod{},
 | 
			
		||||
		5*time.Millisecond,
 | 
			
		||||
		ResourceEventHandlerDetailedFuncs{
 | 
			
		||||
			AddFunc:    func(obj interface{}, isInInitialList bool) { recordEvent(watch.Added, nil, obj) },
 | 
			
		||||
			UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) },
 | 
			
		||||
			DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) },
 | 
			
		||||
		},
 | 
			
		||||
		podTransformer,
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	stopCh := make(chan struct{})
 | 
			
		||||
	go controller.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
	checkEvents(numObjs)
 | 
			
		||||
 | 
			
		||||
	// Periodically fetch objects to ensure no access races.
 | 
			
		||||
	wg := sync.WaitGroup{}
 | 
			
		||||
	errors := make(chan error, numObjs)
 | 
			
		||||
	for i := 0; i < numObjs; i++ {
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		go func(index int) {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
			key := fmt.Sprintf("namespace/pod-%d", index)
 | 
			
		||||
			for {
 | 
			
		||||
				select {
 | 
			
		||||
				case <-stopCh:
 | 
			
		||||
					return
 | 
			
		||||
				default:
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				obj, ok, err := store.GetByKey(key)
 | 
			
		||||
				if !ok || err != nil {
 | 
			
		||||
					errors <- fmt.Errorf("couldn't get the object for %v", key)
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				pod := obj.(*v1.Pod)
 | 
			
		||||
				if pod.ObjectMeta.Labels[label] != "false" {
 | 
			
		||||
					errors <- fmt.Errorf("unexpected object: %#v", pod)
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}(i)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Let resyncs to happen for some time.
 | 
			
		||||
	time.Sleep(time.Second)
 | 
			
		||||
 | 
			
		||||
	close(stopCh)
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	close(errors)
 | 
			
		||||
	for err := range errors {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if badTransform.Load() {
 | 
			
		||||
		t.Errorf("unexpected transformation happened")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestDeletionHandlingObjectToName(t *testing.T) {
 | 
			
		||||
	cm := &v1.ConfigMap{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
 
 | 
			
		||||
@@ -139,20 +139,17 @@ type DeltaFIFO struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TransformFunc allows for transforming an object before it will be processed.
 | 
			
		||||
// TransformFunc (similarly to ResourceEventHandler functions) should be able
 | 
			
		||||
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
 | 
			
		||||
//
 | 
			
		||||
// New in v1.27: In such cases, the contained object will already have gone
 | 
			
		||||
// through the transform object separately (when it was added / updated prior
 | 
			
		||||
// to the delete), so the TransformFunc can likely safely ignore such objects
 | 
			
		||||
// (i.e., just return the input object).
 | 
			
		||||
//
 | 
			
		||||
// The most common usage pattern is to clean-up some parts of the object to
 | 
			
		||||
// reduce component memory usage if a given component doesn't care about them.
 | 
			
		||||
//
 | 
			
		||||
// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
 | 
			
		||||
// sees the object before any other actor, and it is now safe to mutate the
 | 
			
		||||
// object in place instead of making a copy.
 | 
			
		||||
// New in v1.27: TransformFunc sees the object before any other actor, and it
 | 
			
		||||
// is now safe to mutate the object in place instead of making a copy.
 | 
			
		||||
//
 | 
			
		||||
// It's recommended for the TransformFunc to be idempotent.
 | 
			
		||||
// It MUST be idempotent if objects already present in the cache are passed to
 | 
			
		||||
// the Replace() to avoid re-mutating them. Default informers do not pass
 | 
			
		||||
// existing objects to Replace though.
 | 
			
		||||
//
 | 
			
		||||
// Note that TransformFunc is called while inserting objects into the
 | 
			
		||||
// notification queue and is therefore extremely performance sensitive; please
 | 
			
		||||
@@ -440,22 +437,38 @@ func isDeletionDup(a, b *Delta) *Delta {
 | 
			
		||||
// queueActionLocked appends to the delta list for the object.
 | 
			
		||||
// Caller must lock first.
 | 
			
		||||
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
 | 
			
		||||
	return f.queueActionInternalLocked(actionType, actionType, obj)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// queueActionInternalLocked appends to the delta list for the object.
 | 
			
		||||
// The actionType is emitted and must honor emitDeltaTypeReplaced.
 | 
			
		||||
// The internalActionType is only used within this function and must
 | 
			
		||||
// ignore emitDeltaTypeReplaced.
 | 
			
		||||
// Caller must lock first.
 | 
			
		||||
func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType DeltaType, obj interface{}) error {
 | 
			
		||||
	id, err := f.KeyOf(obj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return KeyError{obj, err}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Every object comes through this code path once, so this is a good
 | 
			
		||||
	// place to call the transform func.  If obj is a
 | 
			
		||||
	// DeletedFinalStateUnknown tombstone, then the containted inner object
 | 
			
		||||
	// will already have gone through the transformer, but we document that
 | 
			
		||||
	// this can happen. In cases involving Replace(), such an object can
 | 
			
		||||
	// come through multiple times.
 | 
			
		||||
	// place to call the transform func.
 | 
			
		||||
	//
 | 
			
		||||
	// If obj is a DeletedFinalStateUnknown tombstone or the action is a Sync,
 | 
			
		||||
	// then the object have already gone through the transformer.
 | 
			
		||||
	//
 | 
			
		||||
	// If the objects already present in the cache are passed to Replace(),
 | 
			
		||||
	// the transformer must be idempotent to avoid re-mutating them,
 | 
			
		||||
	// or coordinate with all readers from the cache to avoid data races.
 | 
			
		||||
	// Default informers do not pass existing objects to Replace.
 | 
			
		||||
	if f.transformer != nil {
 | 
			
		||||
		var err error
 | 
			
		||||
		obj, err = f.transformer(obj)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		_, isTombstone := obj.(DeletedFinalStateUnknown)
 | 
			
		||||
		if !isTombstone && internalActionType != Sync {
 | 
			
		||||
			var err error
 | 
			
		||||
			obj, err = f.transformer(obj)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -638,7 +651,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
 | 
			
		||||
			return KeyError{item, err}
 | 
			
		||||
		}
 | 
			
		||||
		keys.Insert(key)
 | 
			
		||||
		if err := f.queueActionLocked(action, item); err != nil {
 | 
			
		||||
		if err := f.queueActionInternalLocked(action, Replaced, item); err != nil {
 | 
			
		||||
			return fmt.Errorf("couldn't enqueue object: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user