Merge pull request #124344 from wojtek-t/fix_transformer
Fix race in informer transformers
This commit is contained in:
		@@ -20,6 +20,7 @@ import (
 | 
				
			|||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"math/rand"
 | 
						"math/rand"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
 | 
						"sync/atomic"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -575,6 +576,114 @@ func TestTransformingInformer(t *testing.T) {
 | 
				
			|||||||
	close(stopCh)
 | 
						close(stopCh)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestTransformingInformerRace(t *testing.T) {
 | 
				
			||||||
 | 
						// source simulates an apiserver object endpoint.
 | 
				
			||||||
 | 
						source := fcache.NewFakeControllerSource()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						label := "to-be-transformed"
 | 
				
			||||||
 | 
						makePod := func(name string) *v1.Pod {
 | 
				
			||||||
 | 
							return &v1.Pod{
 | 
				
			||||||
 | 
								ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
									Name:      name,
 | 
				
			||||||
 | 
									Namespace: "namespace",
 | 
				
			||||||
 | 
									Labels:    map[string]string{label: "true"},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								Spec: v1.PodSpec{
 | 
				
			||||||
 | 
									Hostname: "hostname",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						badTransform := atomic.Bool{}
 | 
				
			||||||
 | 
						podTransformer := func(obj interface{}) (interface{}, error) {
 | 
				
			||||||
 | 
							pod, ok := obj.(*v1.Pod)
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								return nil, fmt.Errorf("unexpected object type: %T", obj)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if pod.ObjectMeta.Labels[label] != "true" {
 | 
				
			||||||
 | 
								badTransform.Store(true)
 | 
				
			||||||
 | 
								return nil, fmt.Errorf("object already transformed: %#v", obj)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							pod.ObjectMeta.Labels[label] = "false"
 | 
				
			||||||
 | 
							return pod, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						numObjs := 5
 | 
				
			||||||
 | 
						for i := 0; i < numObjs; i++ {
 | 
				
			||||||
 | 
							source.Add(makePod(fmt.Sprintf("pod-%d", i)))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						type event struct{}
 | 
				
			||||||
 | 
						events := make(chan event, numObjs)
 | 
				
			||||||
 | 
						recordEvent := func(eventType watch.EventType, previous, current interface{}) {
 | 
				
			||||||
 | 
							events <- event{}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						checkEvents := func(count int) {
 | 
				
			||||||
 | 
							for i := 0; i < count; i++ {
 | 
				
			||||||
 | 
								<-events
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						store, controller := NewTransformingInformer(
 | 
				
			||||||
 | 
							source,
 | 
				
			||||||
 | 
							&v1.Pod{},
 | 
				
			||||||
 | 
							5*time.Millisecond,
 | 
				
			||||||
 | 
							ResourceEventHandlerDetailedFuncs{
 | 
				
			||||||
 | 
								AddFunc:    func(obj interface{}, isInInitialList bool) { recordEvent(watch.Added, nil, obj) },
 | 
				
			||||||
 | 
								UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) },
 | 
				
			||||||
 | 
								DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) },
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							podTransformer,
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
 | 
						go controller.Run(stopCh)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						checkEvents(numObjs)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Periodically fetch objects to ensure no access races.
 | 
				
			||||||
 | 
						wg := sync.WaitGroup{}
 | 
				
			||||||
 | 
						errors := make(chan error, numObjs)
 | 
				
			||||||
 | 
						for i := 0; i < numObjs; i++ {
 | 
				
			||||||
 | 
							wg.Add(1)
 | 
				
			||||||
 | 
							go func(index int) {
 | 
				
			||||||
 | 
								defer wg.Done()
 | 
				
			||||||
 | 
								key := fmt.Sprintf("namespace/pod-%d", index)
 | 
				
			||||||
 | 
								for {
 | 
				
			||||||
 | 
									select {
 | 
				
			||||||
 | 
									case <-stopCh:
 | 
				
			||||||
 | 
										return
 | 
				
			||||||
 | 
									default:
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									obj, ok, err := store.GetByKey(key)
 | 
				
			||||||
 | 
									if !ok || err != nil {
 | 
				
			||||||
 | 
										errors <- fmt.Errorf("couldn't get the object for %v", key)
 | 
				
			||||||
 | 
										return
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									pod := obj.(*v1.Pod)
 | 
				
			||||||
 | 
									if pod.ObjectMeta.Labels[label] != "false" {
 | 
				
			||||||
 | 
										errors <- fmt.Errorf("unexpected object: %#v", pod)
 | 
				
			||||||
 | 
										return
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}(i)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Let resyncs to happen for some time.
 | 
				
			||||||
 | 
						time.Sleep(time.Second)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						close(stopCh)
 | 
				
			||||||
 | 
						wg.Wait()
 | 
				
			||||||
 | 
						close(errors)
 | 
				
			||||||
 | 
						for err := range errors {
 | 
				
			||||||
 | 
							t.Error(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if badTransform.Load() {
 | 
				
			||||||
 | 
							t.Errorf("unexpected transformation happened")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestDeletionHandlingObjectToName(t *testing.T) {
 | 
					func TestDeletionHandlingObjectToName(t *testing.T) {
 | 
				
			||||||
	cm := &v1.ConfigMap{
 | 
						cm := &v1.ConfigMap{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{
 | 
							ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -139,20 +139,17 @@ type DeltaFIFO struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TransformFunc allows for transforming an object before it will be processed.
 | 
					// TransformFunc allows for transforming an object before it will be processed.
 | 
				
			||||||
// TransformFunc (similarly to ResourceEventHandler functions) should be able
 | 
					 | 
				
			||||||
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// New in v1.27: In such cases, the contained object will already have gone
 | 
					 | 
				
			||||||
// through the transform object separately (when it was added / updated prior
 | 
					 | 
				
			||||||
// to the delete), so the TransformFunc can likely safely ignore such objects
 | 
					 | 
				
			||||||
// (i.e., just return the input object).
 | 
					 | 
				
			||||||
//
 | 
					//
 | 
				
			||||||
// The most common usage pattern is to clean-up some parts of the object to
 | 
					// The most common usage pattern is to clean-up some parts of the object to
 | 
				
			||||||
// reduce component memory usage if a given component doesn't care about them.
 | 
					// reduce component memory usage if a given component doesn't care about them.
 | 
				
			||||||
//
 | 
					//
 | 
				
			||||||
// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
 | 
					// New in v1.27: TransformFunc sees the object before any other actor, and it
 | 
				
			||||||
// sees the object before any other actor, and it is now safe to mutate the
 | 
					// is now safe to mutate the object in place instead of making a copy.
 | 
				
			||||||
// object in place instead of making a copy.
 | 
					//
 | 
				
			||||||
 | 
					// It's recommended for the TransformFunc to be idempotent.
 | 
				
			||||||
 | 
					// It MUST be idempotent if objects already present in the cache are passed to
 | 
				
			||||||
 | 
					// the Replace() to avoid re-mutating them. Default informers do not pass
 | 
				
			||||||
 | 
					// existing objects to Replace though.
 | 
				
			||||||
//
 | 
					//
 | 
				
			||||||
// Note that TransformFunc is called while inserting objects into the
 | 
					// Note that TransformFunc is called while inserting objects into the
 | 
				
			||||||
// notification queue and is therefore extremely performance sensitive; please
 | 
					// notification queue and is therefore extremely performance sensitive; please
 | 
				
			||||||
@@ -440,22 +437,38 @@ func isDeletionDup(a, b *Delta) *Delta {
 | 
				
			|||||||
// queueActionLocked appends to the delta list for the object.
 | 
					// queueActionLocked appends to the delta list for the object.
 | 
				
			||||||
// Caller must lock first.
 | 
					// Caller must lock first.
 | 
				
			||||||
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
 | 
					func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
 | 
				
			||||||
 | 
						return f.queueActionInternalLocked(actionType, actionType, obj)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// queueActionInternalLocked appends to the delta list for the object.
 | 
				
			||||||
 | 
					// The actionType is emitted and must honor emitDeltaTypeReplaced.
 | 
				
			||||||
 | 
					// The internalActionType is only used within this function and must
 | 
				
			||||||
 | 
					// ignore emitDeltaTypeReplaced.
 | 
				
			||||||
 | 
					// Caller must lock first.
 | 
				
			||||||
 | 
					func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType DeltaType, obj interface{}) error {
 | 
				
			||||||
	id, err := f.KeyOf(obj)
 | 
						id, err := f.KeyOf(obj)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return KeyError{obj, err}
 | 
							return KeyError{obj, err}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Every object comes through this code path once, so this is a good
 | 
						// Every object comes through this code path once, so this is a good
 | 
				
			||||||
	// place to call the transform func.  If obj is a
 | 
						// place to call the transform func.
 | 
				
			||||||
	// DeletedFinalStateUnknown tombstone, then the containted inner object
 | 
						//
 | 
				
			||||||
	// will already have gone through the transformer, but we document that
 | 
						// If obj is a DeletedFinalStateUnknown tombstone or the action is a Sync,
 | 
				
			||||||
	// this can happen. In cases involving Replace(), such an object can
 | 
						// then the object have already gone through the transformer.
 | 
				
			||||||
	// come through multiple times.
 | 
						//
 | 
				
			||||||
 | 
						// If the objects already present in the cache are passed to Replace(),
 | 
				
			||||||
 | 
						// the transformer must be idempotent to avoid re-mutating them,
 | 
				
			||||||
 | 
						// or coordinate with all readers from the cache to avoid data races.
 | 
				
			||||||
 | 
						// Default informers do not pass existing objects to Replace.
 | 
				
			||||||
	if f.transformer != nil {
 | 
						if f.transformer != nil {
 | 
				
			||||||
		var err error
 | 
							_, isTombstone := obj.(DeletedFinalStateUnknown)
 | 
				
			||||||
		obj, err = f.transformer(obj)
 | 
							if !isTombstone && internalActionType != Sync {
 | 
				
			||||||
		if err != nil {
 | 
								var err error
 | 
				
			||||||
			return err
 | 
								obj, err = f.transformer(obj)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return err
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -638,7 +651,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
 | 
				
			|||||||
			return KeyError{item, err}
 | 
								return KeyError{item, err}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		keys.Insert(key)
 | 
							keys.Insert(key)
 | 
				
			||||||
		if err := f.queueActionLocked(action, item); err != nil {
 | 
							if err := f.queueActionInternalLocked(action, Replaced, item); err != nil {
 | 
				
			||||||
			return fmt.Errorf("couldn't enqueue object: %v", err)
 | 
								return fmt.Errorf("couldn't enqueue object: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user