Create TransformingInformer
TransformingInfomer is like a regular Informer, but allows for applying custom transform functions on the objects received via list/watch API calls.
This commit is contained in:
		@@ -322,7 +322,7 @@ func NewInformer(
 | 
				
			|||||||
	// This will hold the client state, as we know it.
 | 
						// This will hold the client state, as we know it.
 | 
				
			||||||
	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
 | 
						clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
 | 
						return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewIndexerInformer returns an Indexer and a Controller for populating the index
 | 
					// NewIndexerInformer returns an Indexer and a Controller for populating the index
 | 
				
			||||||
@@ -351,7 +351,38 @@ func NewIndexerInformer(
 | 
				
			|||||||
	// This will hold the client state, as we know it.
 | 
						// This will hold the client state, as we know it.
 | 
				
			||||||
	clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
 | 
						clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
 | 
						return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TransformFunc allows for transforming an object before it will be processed
 | 
				
			||||||
 | 
					// and put into the controller cache and before the corresponding handlers will
 | 
				
			||||||
 | 
					// be called on it.
 | 
				
			||||||
 | 
					// TransformFunc (similarly to ResourceEventHandler functions) should be able
 | 
				
			||||||
 | 
					// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// The most common usage pattern is to clean-up some parts of the object to
 | 
				
			||||||
 | 
					// reduce component memory usage if a given component doesn't care about them.
 | 
				
			||||||
 | 
					// given controller doesn't care for them
 | 
				
			||||||
 | 
					type TransformFunc func(interface{}) (interface{}, error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewTransformingInformer returns a Store and a controller for populating
 | 
				
			||||||
 | 
					// the store while also providing event notifications. You should only used
 | 
				
			||||||
 | 
					// the returned Store for Get/List operations; Add/Modify/Deletes will cause
 | 
				
			||||||
 | 
					// the event notifications to be faulty.
 | 
				
			||||||
 | 
					// The given transform function will be called on all objects before they will
 | 
				
			||||||
 | 
					// put put into the Store and corresponding Add/Modify/Delete handlers will
 | 
				
			||||||
 | 
					// be invokved for them.
 | 
				
			||||||
 | 
					func NewTransformingInformer(
 | 
				
			||||||
 | 
						lw ListerWatcher,
 | 
				
			||||||
 | 
						objType runtime.Object,
 | 
				
			||||||
 | 
						resyncPeriod time.Duration,
 | 
				
			||||||
 | 
						h ResourceEventHandler,
 | 
				
			||||||
 | 
						transformer TransformFunc,
 | 
				
			||||||
 | 
					) (Store, Controller) {
 | 
				
			||||||
 | 
						// This will hold the client state, as we know it.
 | 
				
			||||||
 | 
						clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// newInformer returns a controller for populating the store while also
 | 
					// newInformer returns a controller for populating the store while also
 | 
				
			||||||
@@ -374,6 +405,7 @@ func newInformer(
 | 
				
			|||||||
	resyncPeriod time.Duration,
 | 
						resyncPeriod time.Duration,
 | 
				
			||||||
	h ResourceEventHandler,
 | 
						h ResourceEventHandler,
 | 
				
			||||||
	clientState Store,
 | 
						clientState Store,
 | 
				
			||||||
 | 
						transformer TransformFunc,
 | 
				
			||||||
) Controller {
 | 
					) Controller {
 | 
				
			||||||
	// This will hold incoming changes. Note how we pass clientState in as a
 | 
						// This will hold incoming changes. Note how we pass clientState in as a
 | 
				
			||||||
	// KeyLister, that way resync operations will result in the correct set
 | 
						// KeyLister, that way resync operations will result in the correct set
 | 
				
			||||||
@@ -393,24 +425,33 @@ func newInformer(
 | 
				
			|||||||
		Process: func(obj interface{}) error {
 | 
							Process: func(obj interface{}) error {
 | 
				
			||||||
			// from oldest to newest
 | 
								// from oldest to newest
 | 
				
			||||||
			for _, d := range obj.(Deltas) {
 | 
								for _, d := range obj.(Deltas) {
 | 
				
			||||||
 | 
									obj := d.Object
 | 
				
			||||||
 | 
									if transformer != nil {
 | 
				
			||||||
 | 
										var err error
 | 
				
			||||||
 | 
										obj, err = transformer(obj)
 | 
				
			||||||
 | 
										if err != nil {
 | 
				
			||||||
 | 
											return err
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				switch d.Type {
 | 
									switch d.Type {
 | 
				
			||||||
				case Sync, Replaced, Added, Updated:
 | 
									case Sync, Replaced, Added, Updated:
 | 
				
			||||||
					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
 | 
										if old, exists, err := clientState.Get(obj); err == nil && exists {
 | 
				
			||||||
						if err := clientState.Update(d.Object); err != nil {
 | 
											if err := clientState.Update(obj); err != nil {
 | 
				
			||||||
							return err
 | 
												return err
 | 
				
			||||||
						}
 | 
											}
 | 
				
			||||||
						h.OnUpdate(old, d.Object)
 | 
											h.OnUpdate(old, obj)
 | 
				
			||||||
					} else {
 | 
										} else {
 | 
				
			||||||
						if err := clientState.Add(d.Object); err != nil {
 | 
											if err := clientState.Add(obj); err != nil {
 | 
				
			||||||
							return err
 | 
												return err
 | 
				
			||||||
						}
 | 
											}
 | 
				
			||||||
						h.OnAdd(d.Object)
 | 
											h.OnAdd(obj)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				case Deleted:
 | 
									case Deleted:
 | 
				
			||||||
					if err := clientState.Delete(d.Object); err != nil {
 | 
										if err := clientState.Delete(obj); err != nil {
 | 
				
			||||||
						return err
 | 
											return err
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
					h.OnDelete(d.Object)
 | 
										h.OnDelete(obj)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			return nil
 | 
								return nil
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -24,6 +24,7 @@ import (
 | 
				
			|||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/api/core/v1"
 | 
						"k8s.io/api/core/v1"
 | 
				
			||||||
 | 
						apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						"k8s.io/apimachinery/pkg/util/sets"
 | 
				
			||||||
@@ -451,3 +452,125 @@ func TestPanicPropagated(t *testing.T) {
 | 
				
			|||||||
		t.Errorf("timeout: the panic failed to propagate from the controller run method!")
 | 
							t.Errorf("timeout: the panic failed to propagate from the controller run method!")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestTransformingInformer(t *testing.T) {
 | 
				
			||||||
 | 
						// source simulates an apiserver object endpoint.
 | 
				
			||||||
 | 
						source := fcache.NewFakeControllerSource()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						makePod := func(name, generation string) *v1.Pod {
 | 
				
			||||||
 | 
							return &v1.Pod{
 | 
				
			||||||
 | 
								ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
									Name:      name,
 | 
				
			||||||
 | 
									Namespace: "namespace",
 | 
				
			||||||
 | 
									Labels:    map[string]string{"generation": generation},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								Spec: v1.PodSpec{
 | 
				
			||||||
 | 
									Hostname:  "hostname",
 | 
				
			||||||
 | 
									Subdomain: "subdomain",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						expectedPod := func(name, generation string) *v1.Pod {
 | 
				
			||||||
 | 
							pod := makePod(name, generation)
 | 
				
			||||||
 | 
							pod.Spec.Hostname = "new-hostname"
 | 
				
			||||||
 | 
							pod.Spec.Subdomain = ""
 | 
				
			||||||
 | 
							pod.Spec.NodeName = "nodename"
 | 
				
			||||||
 | 
							return pod
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						source.Add(makePod("pod1", "1"))
 | 
				
			||||||
 | 
						source.Modify(makePod("pod1", "2"))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						type event struct {
 | 
				
			||||||
 | 
							eventType watch.EventType
 | 
				
			||||||
 | 
							previous  interface{}
 | 
				
			||||||
 | 
							current   interface{}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						events := make(chan event, 10)
 | 
				
			||||||
 | 
						recordEvent := func(eventType watch.EventType, previous, current interface{}) {
 | 
				
			||||||
 | 
							events <- event{eventType: eventType, previous: previous, current: current}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						verifyEvent := func(eventType watch.EventType, previous, current interface{}) {
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case event := <-events:
 | 
				
			||||||
 | 
								if event.eventType != eventType {
 | 
				
			||||||
 | 
									t.Errorf("expected type %v, got %v", eventType, event.eventType)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if !apiequality.Semantic.DeepEqual(event.previous, previous) {
 | 
				
			||||||
 | 
									t.Errorf("expected previous object %#v, got %#v", previous, event.previous)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if !apiequality.Semantic.DeepEqual(event.current, current) {
 | 
				
			||||||
 | 
									t.Errorf("expected object %#v, got %#v", current, event.current)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							case <-time.After(wait.ForeverTestTimeout):
 | 
				
			||||||
 | 
								t.Errorf("failed to get event")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						podTransformer := func(obj interface{}) (interface{}, error) {
 | 
				
			||||||
 | 
							pod, ok := obj.(*v1.Pod)
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								return nil, fmt.Errorf("unexpected object type: %T", obj)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							pod.Spec.Hostname = "new-hostname"
 | 
				
			||||||
 | 
							pod.Spec.Subdomain = ""
 | 
				
			||||||
 | 
							pod.Spec.NodeName = "nodename"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Clear out ResourceVersion to simplify comparisons.
 | 
				
			||||||
 | 
							pod.ResourceVersion = ""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							return pod, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						store, controller := NewTransformingInformer(
 | 
				
			||||||
 | 
							source,
 | 
				
			||||||
 | 
							&v1.Pod{},
 | 
				
			||||||
 | 
							0,
 | 
				
			||||||
 | 
							ResourceEventHandlerFuncs{
 | 
				
			||||||
 | 
								AddFunc:    func(obj interface{}) { recordEvent(watch.Added, nil, obj) },
 | 
				
			||||||
 | 
								UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) },
 | 
				
			||||||
 | 
								DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) },
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							podTransformer,
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						verifyStore := func(expectedItems []interface{}) {
 | 
				
			||||||
 | 
							items := store.List()
 | 
				
			||||||
 | 
							if len(items) != len(expectedItems) {
 | 
				
			||||||
 | 
								t.Errorf("unexpected items %v, expected %v", items, expectedItems)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							for _, expectedItem := range expectedItems {
 | 
				
			||||||
 | 
								found := false
 | 
				
			||||||
 | 
								for _, item := range items {
 | 
				
			||||||
 | 
									if apiequality.Semantic.DeepEqual(item, expectedItem) {
 | 
				
			||||||
 | 
										found = true
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if !found {
 | 
				
			||||||
 | 
									t.Errorf("expected item %v not found in %v", expectedItem, items)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
 | 
						go controller.Run(stopCh)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						verifyEvent(watch.Added, nil, expectedPod("pod1", "2"))
 | 
				
			||||||
 | 
						verifyStore([]interface{}{expectedPod("pod1", "2")})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						source.Add(makePod("pod2", "1"))
 | 
				
			||||||
 | 
						verifyEvent(watch.Added, nil, expectedPod("pod2", "1"))
 | 
				
			||||||
 | 
						verifyStore([]interface{}{expectedPod("pod1", "2"), expectedPod("pod2", "1")})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						source.Add(makePod("pod3", "1"))
 | 
				
			||||||
 | 
						verifyEvent(watch.Added, nil, expectedPod("pod3", "1"))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						source.Modify(makePod("pod2", "2"))
 | 
				
			||||||
 | 
						verifyEvent(watch.Modified, expectedPod("pod2", "1"), expectedPod("pod2", "2"))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						source.Delete(makePod("pod1", "2"))
 | 
				
			||||||
 | 
						verifyEvent(watch.Deleted, expectedPod("pod1", "2"), nil)
 | 
				
			||||||
 | 
						verifyStore([]interface{}{expectedPod("pod2", "2"), expectedPod("pod3", "1")})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						close(stopCh)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user