From 639f86915b2a75f0839a0e57434a244af1427263 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 1 Apr 2024 14:50:58 +0200 Subject: [PATCH 1/5] scheduler: add FIFO queue This is a basic implementation of a first-in-first-out queue with unbounded size. It's useful for cases where a channel with fixed size might deadlock. The caller is responsible for locking. --- pkg/scheduler/util/queue/fifo.go | 110 ++++++++++++++++++++++++ pkg/scheduler/util/queue/fifo_test.go | 117 ++++++++++++++++++++++++++ 2 files changed, 227 insertions(+) create mode 100644 pkg/scheduler/util/queue/fifo.go create mode 100644 pkg/scheduler/util/queue/fifo_test.go diff --git a/pkg/scheduler/util/queue/fifo.go b/pkg/scheduler/util/queue/fifo.go new file mode 100644 index 00000000000..ee66733fe43 --- /dev/null +++ b/pkg/scheduler/util/queue/fifo.go @@ -0,0 +1,110 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +const ( + // normalSize limits the size of the buffer that is kept + // for reuse. + normalSize = 4 +) + +// FIFO implements a first-in-first-out queue with unbounded size. +// The null FIFO is a valid empty queue. +// +// Access must be protected by the caller when used concurrently by +// different goroutines, the queue itself implements no locking. +type FIFO[T any] struct { + // elements contains a buffer for elements which have been + // pushed and not popped yet. Two scenarios are possible: + // - one chunk in the middle (start <= end) + // - one chunk at the end, followed by one chunk at the + // beginning (end <= start) + // + // start == end can be either an empty queue or a completely + // full one (with two chunks). + elements []T + + // len counts the number of elements which have been pushed and + // not popped yet. + len int + + // start is the index of the first valid element. + start int + + // end is the index after the last valid element. + end int +} + +func (q *FIFO[T]) Len() int { + return q.len +} + +func (q *FIFO[T]) Push(element T) { + size := len(q.elements) + if q.len == size { + // Need larger buffer. + newSize := size * 2 + if newSize == 0 { + newSize = normalSize + } + elements := make([]T, newSize) + if q.start == 0 { + copy(elements, q.elements) + } else { + copy(elements, q.elements[q.start:]) + copy(elements[len(q.elements)-q.start:], q.elements[0:q.end]) + } + q.start = 0 + q.end = q.len + q.elements = elements + size = newSize + } + if q.end == size { + // Wrap around. + q.elements[0] = element + q.end = 1 + q.len++ + return + } + q.elements[q.end] = element + q.end++ + q.len++ +} + +func (q *FIFO[T]) Pop() (element T, ok bool) { + if q.len == 0 { + return + } + element = q.elements[q.start] + q.start++ + if q.start == len(q.elements) { + // Wrap around. + q.start = 0 + } + q.len-- + + // Once it is empty, shrink down to avoid hanging onto + // a large buffer forever. + if q.len == 0 && len(q.elements) > normalSize { + q.elements = make([]T, normalSize) + q.start = 0 + q.end = 0 + } + + ok = true + return +} diff --git a/pkg/scheduler/util/queue/fifo_test.go b/pkg/scheduler/util/queue/fifo_test.go new file mode 100644 index 00000000000..3d272a90248 --- /dev/null +++ b/pkg/scheduler/util/queue/fifo_test.go @@ -0,0 +1,117 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func verifyPop(t *testing.T, expectedValue int, expectedOk bool, queue *FIFO[int]) { + t.Helper() + actual, ok := queue.Pop() + require.Equal(t, expectedOk, ok) + require.Equal(t, expectedValue, actual) +} + +func verifyEmpty(t *testing.T, queue *FIFO[int]) { + t.Helper() + require.Equal(t, 0, queue.Len()) + verifyPop(t, 0, false, queue) +} + +func TestNull(t *testing.T) { + var queue FIFO[int] + verifyEmpty(t, &queue) +} + +func TestOnePushPop(t *testing.T) { + var queue FIFO[int] + + expected := 10 + queue.Push(10) + require.Equal(t, 1, queue.Len()) + verifyPop(t, expected, true, &queue) + verifyEmpty(t, &queue) +} + +// Pushes some elements, pops all of them, then the same again. +func TestWrapAroundEmpty(t *testing.T) { + var queue FIFO[int] + + for i := 0; i < 5; i++ { + queue.Push(i) + } + require.Equal(t, 5, queue.Len()) + for i := 0; i < 5; i++ { + verifyPop(t, i, true, &queue) + } + verifyEmpty(t, &queue) + + for i := 5; i < 10; i++ { + queue.Push(i) + } + for i := 5; i < 10; i++ { + verifyPop(t, i, true, &queue) + } + verifyEmpty(t, &queue) +} + +// Pushes some elements, pops one, adds more, then pops all. +func TestWrapAroundPartial(t *testing.T) { + var queue FIFO[int] + + for i := 0; i < 5; i++ { + queue.Push(i) + } + require.Equal(t, 5, queue.Len()) + verifyPop(t, 0, true, &queue) + + for i := 5; i < 10; i++ { + queue.Push(i) + } + for i := 1; i < 10; i++ { + verifyPop(t, i, true, &queue) + } + verifyEmpty(t, &queue) +} + +// Push an unusual amount of elements, pop all, and verify that +// the FIFO shrinks back again. +func TestShrink(t *testing.T) { + var queue FIFO[int] + + for i := 0; i < normalSize*2; i++ { + queue.Push(i) + } + require.Equal(t, normalSize*2, queue.Len()) + require.LessOrEqual(t, 2*normalSize, len(queue.elements)) + + // Pop all, should be shrunken when done. + for i := 0; i < normalSize*2; i++ { + verifyPop(t, i, true, &queue) + } + require.Equal(t, 0, queue.Len()) + require.Equal(t, normalSize, len(queue.elements)) + + // Still usable after shrinking? + queue.Push(42) + verifyPop(t, 42, true, &queue) + require.Equal(t, 0, queue.Len()) + require.Equal(t, normalSize, len(queue.elements)) +} From 171620765e7a319888eb1b324ada7f64bfc69b17 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Sun, 28 Apr 2024 14:38:29 +0200 Subject: [PATCH 2/5] ktesting: add Step Step simplifies using WithStep because it creates a local scope where the same tCtx variable is the one with the step name. --- test/utils/ktesting/stepcontext.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/utils/ktesting/stepcontext.go b/test/utils/ktesting/stepcontext.go index 7271f3da789..b9ff6b1fc3d 100644 --- a/test/utils/ktesting/stepcontext.go +++ b/test/utils/ktesting/stepcontext.go @@ -41,6 +41,24 @@ func WithStep(tCtx TContext, what string) TContext { return WithLogger(sCtx, klog.LoggerWithName(sCtx.Logger(), what)) } +// Step is useful when the context with the step information is +// used more than once: +// +// ktesting.Step(tCtx, "step 1", func(tCtx ktesting.TContext) { +// tCtx.Log(...) +// if (... ) { +// tCtx.Failf(...) +// } +// )} +// +// Inside the callback, the tCtx variable is the one where the step +// has been added. This avoids the need to introduce multiple different +// context variables and risk of using the wrong one. +func Step(tCtx TContext, what string, cb func(tCtx TContext)) { + tCtx.Helper() + cb(WithStep(tCtx, what)) +} + type stepContext struct { TContext what string From dea16757ef6c8b79805f76a35ca20274a34d53a3 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 1 Apr 2024 14:55:10 +0200 Subject: [PATCH 3/5] scheduler: AddEventHandler for assume cache This enables using the assume cache for cluster events. --- .../util/assumecache/assume_cache.go | 120 +++++++- .../util/assumecache/assume_cache_test.go | 289 ++++++++++++++++-- 2 files changed, 379 insertions(+), 30 deletions(-) diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index 69ec1175f03..a517db97443 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -25,7 +25,9 @@ import ( "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/api/meta" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/scheduler/util/queue" ) // Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon. @@ -119,9 +121,33 @@ type AssumeCache struct { // Will be used for all operations. logger klog.Logger - // Synchronizes updates to store + // Synchronizes updates to all fields below. rwMutex sync.RWMutex + // All registered event handlers. + eventHandlers []cache.ResourceEventHandler + + // The eventQueue contains functions which deliver an event to one + // event handler. + // + // These functions must be invoked while *not locking* rwMutex because + // the event handlers are allowed to access the assume cache. Holding + // rwMutex then would cause a deadlock. + // + // New functions get added as part of processing a cache update while + // the rwMutex is locked. Each function which adds something to the queue + // also drains the queue before returning, therefore it is guaranteed + // that all event handlers get notified immediately (useful for unit + // testing). + // + // A channel cannot be used here because it cannot have an unbounded + // capacity. This could lead to a deadlock (writer holds rwMutex, + // gets blocked because capacity is exhausted, reader is in a handler + // which tries to lock the rwMutex). Writing into such a channel + // while not holding the rwMutex doesn't work because in-order delivery + // of events would no longer be guaranteed. + eventQueue queue.FIFO[func()] + // describes the object stored description string @@ -199,9 +225,11 @@ func (c *AssumeCache) add(obj interface{}) { return } + defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() + var oldObj interface{} if objInfo, _ := c.getObjInfo(name); objInfo != nil { newVersion, err := c.getObjVersion(name, obj) if err != nil { @@ -221,6 +249,7 @@ func (c *AssumeCache) add(obj interface{}) { c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion) return } + oldObj = objInfo.latestObj } objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj} @@ -228,6 +257,18 @@ func (c *AssumeCache) add(obj interface{}) { c.logger.Info("Error occurred while updating stored object", "err", err) } else { c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) + for _, handler := range c.eventHandlers { + handler := handler + if oldObj == nil { + c.eventQueue.Push(func() { + handler.OnAdd(obj, false) + }) + } else { + c.eventQueue.Push(func() { + handler.OnUpdate(oldObj, obj) + }) + } + } } } @@ -246,14 +287,29 @@ func (c *AssumeCache) delete(obj interface{}) { return } + defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() + var oldObj interface{} + if len(c.eventHandlers) > 0 { + if objInfo, _ := c.getObjInfo(name); objInfo != nil { + oldObj = objInfo.latestObj + } + } + objInfo := &objInfo{name: name} err = c.store.Delete(objInfo) if err != nil { c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) } + + for _, handler := range c.eventHandlers { + handler := handler + c.eventQueue.Push(func() { + handler.OnDelete(oldObj) + }) + } } func (c *AssumeCache) getObjVersion(name string, obj interface{}) (int64, error) { @@ -315,6 +371,10 @@ func (c *AssumeCache) List(indexObj interface{}) []interface{} { c.rwMutex.RLock() defer c.rwMutex.RUnlock() + return c.listLocked(indexObj) +} + +func (c *AssumeCache) listLocked(indexObj interface{}) []interface{} { allObjs := []interface{}{} var objs []interface{} if c.indexName != "" { @@ -358,6 +418,7 @@ func (c *AssumeCache) Assume(obj interface{}) error { return &ObjectNameError{err} } + defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() @@ -380,6 +441,14 @@ func (c *AssumeCache) Assume(obj interface{}) error { return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) } + for _, handler := range c.eventHandlers { + handler := handler + oldObj := objInfo.latestObj + c.eventQueue.Push(func() { + handler.OnUpdate(oldObj, obj) + }) + } + // Only update the cached object objInfo.latestObj = obj c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion) @@ -388,6 +457,7 @@ func (c *AssumeCache) Assume(obj interface{}) error { // Restore the informer cache's version of the object. func (c *AssumeCache) Restore(objName string) { + defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() @@ -396,7 +466,53 @@ func (c *AssumeCache) Restore(objName string) { // This could be expected if object got deleted c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) } else { - objInfo.latestObj = objInfo.apiObj + if objInfo.latestObj != objInfo.apiObj { + for _, handler := range c.eventHandlers { + handler := handler + oldObj, obj := objInfo.latestObj, objInfo.apiObj + c.eventQueue.Push(func() { + handler.OnUpdate(oldObj, obj) + }) + } + + objInfo.latestObj = objInfo.apiObj + } c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) } } + +// AddEventHandler adds an event handler to the cache. Events to a +// single handler are delivered sequentially, but there is no +// coordination between different handlers. A handler may use the +// cache. +func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) { + defer c.emitEvents() + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + + c.eventHandlers = append(c.eventHandlers, handler) + allObjs := c.listLocked(nil) + for _, obj := range allObjs { + c.eventQueue.Push(func() { + handler.OnAdd(obj, true) + }) + } +} + +// emitEvents delivers all pending events that are in the queue, in the order +// in which they were stored there (FIFO). +func (c *AssumeCache) emitEvents() { + for { + c.rwMutex.Lock() + deliver, ok := c.eventQueue.Pop() + c.rwMutex.Unlock() + + if !ok { + return + } + func() { + defer utilruntime.HandleCrash() + deliver() + }() + } +} diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go index 6c11ac275fa..8e4336730aa 100644 --- a/pkg/scheduler/util/assumecache/assume_cache_test.go +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -19,6 +19,8 @@ package assumecache import ( "fmt" "slices" + "sort" + "sync" "testing" "github.com/google/go-cmp/cmp" @@ -118,6 +120,79 @@ func verifyList(tCtx ktesting.TContext, assumeCache *AssumeCache, expectedObjs [ } } +type mockEventHandler struct { + mutex sync.Mutex + events []event + cache *AssumeCache + block <-chan struct{} +} + +type event struct { + What string + OldObj, Obj interface{} + InitialList bool +} + +func (m *mockEventHandler) OnAdd(obj interface{}, initialList bool) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.events = append(m.events, event{ + What: "add", + Obj: obj, + InitialList: initialList, + }) + + if m.cache != nil { + // Must not deadlock! + m.cache.List(nil) + } + if m.block != nil { + <-m.block + } +} + +func (m *mockEventHandler) OnUpdate(oldObj, obj interface{}) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.events = append(m.events, event{ + What: "update", + OldObj: oldObj, + Obj: obj, + }) +} + +func (m *mockEventHandler) OnDelete(obj interface{}) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.events = append(m.events, event{ + What: "delete", + Obj: obj, + }) +} + +func (m *mockEventHandler) verifyAndFlush(tCtx ktesting.TContext, expectedEvents []event) { + m.mutex.Lock() + defer m.mutex.Unlock() + + tCtx.Helper() + if diff := cmp.Diff(expectedEvents, m.events); diff != "" { + tCtx.Fatalf("unexpected events (- expected, + actual):\n%s", diff) + } + m.events = nil +} + +func (m *mockEventHandler) sortEvents(cmp func(objI, objJ interface{}) bool) { + m.mutex.Lock() + defer m.mutex.Unlock() + + sort.Slice(m.events, func(i, j int) bool { + return cmp(m.events[i].Obj, m.events[j].Obj) + }) +} + func TestAssume(t *testing.T) { scenarios := map[string]struct { oldObj metav1.Object @@ -162,6 +237,8 @@ func TestAssume(t *testing.T) { for name, scenario := range scenarios { t.Run(name, func(t *testing.T) { tCtx, cache, informer := newTest(t) + var events mockEventHandler + cache.AddEventHandler(&events) // Add old object to cache. informer.add(scenario.oldObj) @@ -173,18 +250,25 @@ func TestAssume(t *testing.T) { t.Errorf("Assume() returned error: %v\ndiff (- expected, + actual):\n%s", err, diff) } - // Check that Get returns correct object. + // Check that Get returns correct object and + // that events were delivered correctly. + expectEvents := []event{{What: "add", Obj: scenario.oldObj}} expectedObj := scenario.newObj if scenario.expectErr != nil { expectedObj = scenario.oldObj + } else { + expectEvents = append(expectEvents, event{What: "update", OldObj: scenario.oldObj, Obj: scenario.newObj}) } verify(tCtx, cache, scenario.oldObj.GetName(), expectedObj, scenario.oldObj) + events.verifyAndFlush(tCtx, expectEvents) }) } } func TestRestore(t *testing.T) { tCtx, cache, informer := newTest(t) + var events mockEventHandler + cache.AddEventHandler(&events) // This test assumes an object with the same version as the API object. // The assume cache supports that, but doing so in real code suffers from @@ -194,25 +278,40 @@ func TestRestore(t *testing.T) { newObj := makeObj("pvc1", "5", "") // Restore object that doesn't exist - cache.Restore("nothing") + ktesting.Step(tCtx, "empty cache", func(tCtx ktesting.TContext) { + cache.Restore("nothing") + events.verifyAndFlush(tCtx, nil) + }) // Add old object to cache. - informer.add(oldObj) - verify(ktesting.WithStep(tCtx, "after initial update"), cache, oldObj.GetName(), oldObj, oldObj) + ktesting.Step(tCtx, "initial update", func(tCtx ktesting.TContext) { + informer.add(oldObj) + verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) + events.verifyAndFlush(tCtx, []event{{What: "add", Obj: oldObj}}) + }) // Restore object. - cache.Restore(oldObj.GetName()) - verify(ktesting.WithStep(tCtx, "after initial Restore"), cache, oldObj.GetName(), oldObj, oldObj) + ktesting.Step(tCtx, "initial Restore", func(tCtx ktesting.TContext) { + cache.Restore(oldObj.GetName()) + verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) + events.verifyAndFlush(tCtx, nil) + }) // Assume new object. - if err := cache.Assume(newObj); err != nil { - t.Fatalf("Assume() returned error %v", err) - } - verify(ktesting.WithStep(tCtx, "after Assume"), cache, oldObj.GetName(), newObj, oldObj) + ktesting.Step(tCtx, "Assume", func(tCtx ktesting.TContext) { + if err := cache.Assume(newObj); err != nil { + tCtx.Fatalf("Assume() returned error %v", err) + } + verify(tCtx, cache, oldObj.GetName(), newObj, oldObj) + events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) + }) // Restore object. - cache.Restore(oldObj.GetName()) - verify(ktesting.WithStep(tCtx, "after second Restore"), cache, oldObj.GetName(), oldObj, oldObj) + ktesting.Step(tCtx, "second Restore", func(tCtx ktesting.TContext) { + cache.Restore(oldObj.GetName()) + verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) + events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: newObj, Obj: oldObj}}) + }) } func TestEvents(t *testing.T) { @@ -226,27 +325,161 @@ func TestEvents(t *testing.T) { informer.add(oldObj) verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, oldObj, oldObj) + // Receive initial list. + var events mockEventHandler + cache.AddEventHandler(&events) + events.verifyAndFlush(ktesting.WithStep(tCtx, "initial list"), []event{{What: "add", Obj: oldObj, InitialList: true}}) + // Update object. - informer.update(newObj) - verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, newObj, newObj) + ktesting.Step(tCtx, "initial update", func(tCtx ktesting.TContext) { + informer.update(newObj) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) + }) // Some error cases (don't occur in practice). - informer.add(1) - verify(ktesting.WithStep(tCtx, "after nop add"), cache, key, newObj, newObj) - informer.add(nil) - verify(ktesting.WithStep(tCtx, "after nil add"), cache, key, newObj, newObj) - informer.update(oldObj) - verify(ktesting.WithStep(tCtx, "after nop update"), cache, key, newObj, newObj) - informer.update(nil) - verify(ktesting.WithStep(tCtx, "after nil update"), cache, key, newObj, newObj) - informer.delete(nil) - verify(ktesting.WithStep(tCtx, "after nop delete"), cache, key, newObj, newObj) + ktesting.Step(tCtx, "nop add", func(tCtx ktesting.TContext) { + informer.add(1) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) + }) + ktesting.Step(tCtx, "nil add", func(tCtx ktesting.TContext) { + informer.add(nil) + verify(tCtx, cache, key, newObj, newObj) + }) + ktesting.Step(tCtx, "nop update", func(tCtx ktesting.TContext) { + informer.update(oldObj) + events.verifyAndFlush(tCtx, nil) + verify(tCtx, cache, key, newObj, newObj) + }) + ktesting.Step(tCtx, "nil update", func(tCtx ktesting.TContext) { + informer.update(nil) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) + }) + ktesting.Step(tCtx, "nop delete", func(tCtx ktesting.TContext) { + informer.delete(nil) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) + }) // Delete object. - informer.delete(oldObj) - _, err := cache.Get(key) - if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" { - t.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff) + ktesting.Step(tCtx, "delete", func(tCtx ktesting.TContext) { + informer.delete(oldObj) + events.verifyAndFlush(tCtx, []event{{What: "delete", Obj: newObj}}) + _, err := cache.Get(key) + if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" { + tCtx.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff) + } + }) +} + +func TestEventHandlers(t *testing.T) { + tCtx, cache, informer := newTest(t) + handlers := make([]mockEventHandler, 5) + + objs := make([]metav1.Object, 0, 20) + for i := 0; i < 5; i++ { + objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) + informer.add(objs[i]) + } + + // Accessing cache during OnAdd must not deadlock! + handlers[0].cache = cache + + // Order of delivered events is random, we must ensure + // increasing order by name ourselves. + var expectedEvents []event + for _, obj := range objs { + expectedEvents = append(expectedEvents, + event{ + What: "add", + Obj: obj, + InitialList: true, + }, + ) + } + for i := range handlers { + cache.AddEventHandler(&handlers[i]) + handlers[i].sortEvents(func(objI, objJ interface{}) bool { + return objI.(*metav1.ObjectMeta).Name < + objJ.(*metav1.ObjectMeta).Name + }) + handlers[i].verifyAndFlush(tCtx, expectedEvents) + } + + for i := 5; i < 7; i++ { + objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) + informer.add(objs[i]) + for e := range handlers { + handlers[e].verifyAndFlush(tCtx, []event{{What: "add", Obj: objs[i]}}) + } + } + + for i, oldObj := range objs { + newObj := makeObj(fmt.Sprintf("test-pvc%v", i), "2", "") + objs[i] = newObj + informer.update(newObj) + for e := range handlers { + handlers[e].verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) + } + } + + for _, obj := range objs { + informer.delete(obj) + for e := range handlers { + handlers[e].verifyAndFlush(tCtx, []event{{What: "delete", Obj: obj}}) + } + } +} + +func TestEventHandlerConcurrency(t *testing.T) { + tCtx, cache, informer := newTest(t) + handlers := make([]mockEventHandler, 5) + + objs := make([]metav1.Object, 0, 20) + for i := 0; i < 5; i++ { + objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) + } + + // Accessing cache during OnAdd must not deadlock! + handlers[0].cache = cache + + // Each add blocks until this gets cancelled. + tCancelCtx := ktesting.WithCancel(tCtx) + var wg sync.WaitGroup + + for i := range handlers { + handlers[i].block = tCancelCtx.Done() + cache.AddEventHandler(&handlers[i]) + } + + // Execution of the add calls is random, therefore + // we have to sort again. + var expectedEvents []event + for _, obj := range objs { + wg.Add(1) + go func() { + defer wg.Done() + informer.add(obj) + }() + expectedEvents = append(expectedEvents, + event{ + What: "add", + Obj: obj, + }, + ) + } + + tCancelCtx.Cancel("proceed") + wg.Wait() + + for i := range handlers { + handlers[i].sortEvents(func(objI, objJ interface{}) bool { + return objI.(*metav1.ObjectMeta).Name < + objJ.(*metav1.ObjectMeta).Name + }) + handlers[i].verifyAndFlush(tCtx, expectedEvents) } } From 9a6f3b9388efeca7ba0da2d3aa5a0b7d5b8e31f9 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 1 Apr 2024 15:46:48 +0200 Subject: [PATCH 4/5] scheduler: central ResourceClaim assume cache This enables connecting the event handler for ResourceClaim to the assume cache, which addresses a theoretic race condition. It may also be useful for implementing the autoscaler support, because now the autoscaler can modify the content of the cache. --- pkg/scheduler/eventhandlers.go | 8 +-- pkg/scheduler/eventhandlers_test.go | 52 ++++++++++++++++++- pkg/scheduler/framework/interface.go | 6 +++ .../dynamicresources/dynamicresources.go | 18 +------ .../dynamicresources/dynamicresources_test.go | 4 +- pkg/scheduler/framework/runtime/framework.go | 25 +++++++-- pkg/scheduler/scheduler.go | 10 +++- 7 files changed, 94 insertions(+), 29 deletions(-) diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index b2a0f331f53..44311f5b247 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/profile" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) func (sched *Scheduler) onStorageClassAdd(obj interface{}) { @@ -288,6 +289,7 @@ func addAllEventHandlers( sched *Scheduler, informerFactory informers.SharedInformerFactory, dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, + resourceClaimCache *assumecache.AssumeCache, gvkMap map[framework.GVK]framework.ActionType, ) error { var ( @@ -456,11 +458,9 @@ func addAllEventHandlers( } case framework.ResourceClaim: if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { - if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler( + handlerRegistration = resourceClaimCache.AddEventHandler( buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"), - ); err != nil { - return err - } + ) handlers = append(handlers, handlerRegistration) } case framework.ResourceClass: diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index f0254df4095..a99146cf567 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -26,9 +26,12 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2/ktesting" "k8s.io/apimachinery/pkg/runtime" @@ -38,6 +41,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" @@ -46,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/queue" st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) func TestNodeAllocatableChanged(t *testing.T) { @@ -362,6 +367,7 @@ func TestAddAllEventHandlers(t *testing.T) { tests := []struct { name string gvkMap map[framework.GVK]framework.ActionType + enableDRA bool expectStaticInformers map[reflect.Type]bool expectDynamicInformers map[schema.GroupVersionResource]bool }{ @@ -375,6 +381,44 @@ func TestAddAllEventHandlers(t *testing.T) { }, expectDynamicInformers: map[schema.GroupVersionResource]bool{}, }, + { + name: "DRA events disabled", + gvkMap: map[framework.GVK]framework.ActionType{ + framework.PodSchedulingContext: framework.Add, + framework.ResourceClaim: framework.Add, + framework.ResourceClass: framework.Add, + framework.ResourceClaimParameters: framework.Add, + framework.ResourceClassParameters: framework.Add, + }, + expectStaticInformers: map[reflect.Type]bool{ + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Namespace{}): true, + }, + expectDynamicInformers: map[schema.GroupVersionResource]bool{}, + }, + { + name: "DRA events enabled", + gvkMap: map[framework.GVK]framework.ActionType{ + framework.PodSchedulingContext: framework.Add, + framework.ResourceClaim: framework.Add, + framework.ResourceClass: framework.Add, + framework.ResourceClaimParameters: framework.Add, + framework.ResourceClassParameters: framework.Add, + }, + enableDRA: true, + expectStaticInformers: map[reflect.Type]bool{ + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Namespace{}): true, + reflect.TypeOf(&resourcev1alpha2.PodSchedulingContext{}): true, + reflect.TypeOf(&resourcev1alpha2.ResourceClaim{}): true, + reflect.TypeOf(&resourcev1alpha2.ResourceClaimParameters{}): true, + reflect.TypeOf(&resourcev1alpha2.ResourceClass{}): true, + reflect.TypeOf(&resourcev1alpha2.ResourceClassParameters{}): true, + }, + expectDynamicInformers: map[schema.GroupVersionResource]bool{}, + }, { name: "add GVKs handlers defined in framework dynamically", gvkMap: map[framework.GVK]framework.ActionType{ @@ -433,6 +477,7 @@ func TestAddAllEventHandlers(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, tt.enableDRA) logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -447,8 +492,13 @@ func TestAddAllEventHandlers(t *testing.T) { dynclient := dyfake.NewSimpleDynamicClient(scheme) dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0) + var resourceClaimCache *assumecache.AssumeCache + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer() + resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil) + } - if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap); err != nil { + if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, resourceClaimCache, tt.gvkMap); err != nil { t.Fatalf("Add event handlers failed, error = %v", err) } diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 5fd8bd86fcf..ff117be8095 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -38,6 +38,7 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) // NodeScoreList declares a list of nodes and their scores. @@ -701,6 +702,11 @@ type Handle interface { SharedInformerFactory() informers.SharedInformerFactory + // ResourceClaimInfos returns an assume cache of ResourceClaim objects + // which gets populated by the shared informer factory and the dynamic resources + // plugin. + ResourceClaimCache() *assumecache.AssumeCache + // RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node. RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index f54cf0d2bea..715f3f9b1c6 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -343,7 +343,6 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe return &dynamicResources{}, nil } - logger := klog.FromContext(ctx) pl := &dynamicResources{ enabled: true, fh: fh, @@ -355,7 +354,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(), resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(), claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()), - claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), + claimAssumeCache: fh.ResourceClaimCache(), } return pl, nil @@ -597,21 +596,6 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po // // TODO (https://github.com/kubernetes/kubernetes/issues/123697): // check that the pending claims depend on structured parameters (depends on refactoring foreachPodResourceClaim, see other TODO). - // - // There is a small race here: - // - The dynamicresources plugin allocates claim A and updates the assume cache. - // - A second pod gets marked as unschedulable based on that assume cache. - // - Before the informer cache here catches up, the pod runs, terminates and - // the claim gets deallocated without ever sending the claim status with - // allocation to the scheduler. - // - The comparison below is for a *very* old claim with no allocation and the - // new claim where the allocation is already removed again, so no - // RemovedClaimAllocation event gets emitted. - // - // This is extremely unlikely and thus a fix is not needed for alpha in Kubernetes 1.30. - // TODO (https://github.com/kubernetes/kubernetes/issues/123698): The solution is to somehow integrate the assume cache - // into the event mechanism. This can be tackled together with adding autoscaler - // support, which also needs to do something with the assume cache. logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) return framework.Queue, nil } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 025a3478383..90f08a1aa7c 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/ptr" ) @@ -1319,10 +1320,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters")) tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) - + assumeCache := assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil) opts := []runtime.Option{ runtime.WithClientSet(tc.client), runtime.WithInformerFactory(tc.informerFactory), + runtime.WithResourceClaimCache(assumeCache), } fh, err := runtime.NewFramework(tCtx, nil, nil, opts...) if err != nil { diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 44a28e3c991..257f0e20439 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/metrics" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/pkg/util/slice" ) @@ -71,11 +72,12 @@ type frameworkImpl struct { // pluginsMap contains all plugins, by name. pluginsMap map[string]framework.Plugin - clientSet clientset.Interface - kubeConfig *restclient.Config - eventRecorder events.EventRecorder - informerFactory informers.SharedInformerFactory - logger klog.Logger + clientSet clientset.Interface + kubeConfig *restclient.Config + eventRecorder events.EventRecorder + informerFactory informers.SharedInformerFactory + resourceClaimCache *assumecache.AssumeCache + logger klog.Logger metricsRecorder *metrics.MetricAsyncRecorder profileName string @@ -126,6 +128,7 @@ type frameworkOptions struct { kubeConfig *restclient.Config eventRecorder events.EventRecorder informerFactory informers.SharedInformerFactory + resourceClaimCache *assumecache.AssumeCache snapshotSharedLister framework.SharedLister metricsRecorder *metrics.MetricAsyncRecorder podNominator framework.PodNominator @@ -176,6 +179,13 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option } } +// WithResourceClaimCache sets the resource claim cache for the scheduling frameworkImpl. +func WithResourceClaimCache(resourceClaimCache *assumecache.AssumeCache) Option { + return func(o *frameworkOptions) { + o.resourceClaimCache = resourceClaimCache + } +} + // WithSnapshotSharedLister sets the SharedLister of the snapshot. func WithSnapshotSharedLister(snapshotSharedLister framework.SharedLister) Option { return func(o *frameworkOptions) { @@ -259,6 +269,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler kubeConfig: options.kubeConfig, eventRecorder: options.eventRecorder, informerFactory: options.informerFactory, + resourceClaimCache: options.resourceClaimCache, metricsRecorder: options.metricsRecorder, extenders: options.extenders, PodNominator: options.podNominator, @@ -1598,6 +1609,10 @@ func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory return f.informerFactory } +func (f *frameworkImpl) ResourceClaimCache() *assumecache.AssumeCache { + return f.resourceClaimCache +} + func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] { pgSet := sets.Set[string]{} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index df007aa82fc..291830c642e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -48,6 +48,7 @@ import ( internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/profile" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) const ( @@ -293,11 +294,18 @@ func New(ctx context.Context, snapshot := internalcache.NewEmptySnapshot() metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything) + var resourceClaimCache *assumecache.AssumeCache + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer() + resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil) + } + profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), frameworkruntime.WithClientSet(client), frameworkruntime.WithKubeConfig(options.kubeConfig), frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithResourceClaimCache(resourceClaimCache), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)), frameworkruntime.WithParallelism(int(options.parallelism)), @@ -356,7 +364,7 @@ func New(ctx context.Context, sched.NextPod = podQueue.Pop sched.applyDefaultHandlers() - if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil { + if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, resourceClaimCache, unionedGVKs(queueingHintsPerProfile)); err != nil { return nil, fmt.Errorf("adding event handlers: %w", err) } From 1b63639d31f40c5d0953bb67b590bd84eae256a7 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Sat, 15 Jun 2024 18:15:13 +0200 Subject: [PATCH 5/5] DRA scheduler: use assume cache to list claims This finishes the transition to the assume cache as source of truth for the current set of claims. The tests have to be adapted. It's not enough anymore to directly put objects into the informer store because that doesn't change the assume cache content. Instead, normal Create/Update calls and waiting for the cache update are needed. --- .../dynamicresources/dynamicresources.go | 9 ++- .../dynamicresources/dynamicresources_test.go | 71 +++++++++++------ .../util/assumecache/assume_cache.go | 79 +++++++++++-------- .../util/assumecache/assume_cache_test.go | 13 +-- 4 files changed, 108 insertions(+), 64 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 715f3f9b1c6..e701a55d9c2 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -272,7 +272,6 @@ type dynamicResources struct { enabled bool fh framework.Handle clientset kubernetes.Interface - claimLister resourcev1alpha2listers.ResourceClaimLister classLister resourcev1alpha2listers.ResourceClassLister podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister claimParametersLister resourcev1alpha2listers.ResourceClaimParametersLister @@ -347,7 +346,6 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe enabled: true, fh: fh, clientset: fh.ClientSet(), - claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(), classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(), podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(), claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(), @@ -791,11 +789,16 @@ func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podReso if claimName == nil { continue } - claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(*claimName) + obj, err := pl.claimAssumeCache.Get(pod.Namespace + "/" + *claimName) if err != nil { return err } + claim, ok := obj.(*resourcev1alpha2.ResourceClaim) + if !ok { + return fmt.Errorf("unexpected object type %T for assumed object %s/%s", obj, pod.Namespace, *claimName) + } + if claim.DeletionTimestamp != nil { return fmt.Errorf("resourceclaim %q is being deleted", claim.Name) } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 90f08a1aa7c..562a8de7eda 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -1149,12 +1149,13 @@ func TestPlugin(t *testing.T) { } type testContext struct { - ctx context.Context - client *fake.Clientset - informerFactory informers.SharedInformerFactory - p *dynamicResources - nodeInfos []*framework.NodeInfo - state *framework.CycleState + ctx context.Context + client *fake.Clientset + informerFactory informers.SharedInformerFactory + claimAssumeCache *assumecache.AssumeCache + p *dynamicResources + nodeInfos []*framework.NodeInfo + state *framework.CycleState } func (tc *testContext) verify(t *testing.T, expected result, initialObjects []metav1.Object, result interface{}, status *framework.Status) { @@ -1320,11 +1321,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters")) tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) - assumeCache := assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil) + tc.claimAssumeCache = assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil) opts := []runtime.Option{ runtime.WithClientSet(tc.client), runtime.WithInformerFactory(tc.informerFactory), - runtime.WithResourceClaimCache(assumeCache), + runtime.WithResourceClaimCache(tc.claimAssumeCache), } fh, err := runtime.NewFramework(tCtx, nil, nil, opts...) if err != nil { @@ -1491,6 +1492,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { }, "backoff-wrong-old-object": { pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, oldObj: "not-a-claim", newObj: pendingImmediateClaim, expectedErr: true, @@ -1519,15 +1521,10 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { }, "structured-claim-deallocate": { pod: podWithClaimName, - claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, - oldObj: func() *resourcev1alpha2.ResourceClaim { - claim := structuredAllocatedClaim.DeepCopy() - claim.Name += "-other" - return claim - }(), + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim, otherStructuredAllocatedClaim}, + oldObj: otherStructuredAllocatedClaim, newObj: func() *resourcev1alpha2.ResourceClaim { - claim := structuredAllocatedClaim.DeepCopy() - claim.Name += "-other" + claim := otherStructuredAllocatedClaim.DeepCopy() claim.Status.Allocation = nil return claim }(), @@ -1539,18 +1536,48 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { for name, tc := range testcases { t.Run(name, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + logger, tCtx := ktesting.NewTestContext(t) testCtx := setup(t, nil, tc.claims, nil, nil, nil) + oldObj := tc.oldObj + newObj := tc.newObj if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok { - // Update the informer because the lister gets called and must have the claim. - store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore() + // Add or update through the client and wait until the event is processed. + claimKey := claim.Namespace + "/" + claim.Name if tc.oldObj == nil { - require.NoError(t, store.Add(claim)) + // Some test claims already have it. Clear for create. + createClaim := claim.DeepCopy() + createClaim.UID = "" + storedClaim, err := testCtx.client.ResourceV1alpha2().ResourceClaims(createClaim.Namespace).Create(tCtx, createClaim, metav1.CreateOptions{}) + require.NoError(t, err, "create claim") + claim = storedClaim } else { - require.NoError(t, store.Update(claim)) + cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey) + require.NoError(t, err, "retrieve old claim") + updateClaim := claim.DeepCopy() + // The test claim doesn't have those (generated dynamically), so copy them. + updateClaim.UID = cachedClaim.(*resourcev1alpha2.ResourceClaim).UID + updateClaim.ResourceVersion = cachedClaim.(*resourcev1alpha2.ResourceClaim).ResourceVersion + + storedClaim, err := testCtx.client.ResourceV1alpha2().ResourceClaims(updateClaim.Namespace).Update(tCtx, updateClaim, metav1.UpdateOptions{}) + require.NoError(t, err, "update claim") + claim = storedClaim } + + // Eventually the assume cache will have it, too. + require.EventuallyWithT(t, func(t *assert.CollectT) { + cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey) + require.NoError(t, err, "retrieve claim") + if cachedClaim.(*resourcev1alpha2.ResourceClaim).ResourceVersion != claim.ResourceVersion { + t.Errorf("cached claim not updated yet") + } + }, time.Minute, time.Second, "claim assume cache must have new or updated claim") + + // This has the actual UID and ResourceVersion, + // which is relevant for + // isSchedulableAfterClaimChange. + newObj = claim } - actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, tc.oldObj, tc.newObj) + actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, oldObj, newObj) if tc.expectedErr { require.Error(t, err) return diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index a517db97443..c7392129cbd 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -125,7 +125,8 @@ type AssumeCache struct { rwMutex sync.RWMutex // All registered event handlers. - eventHandlers []cache.ResourceEventHandler + eventHandlers []cache.ResourceEventHandler + handlerRegistration cache.ResourceEventHandlerRegistration // The eventQueue contains functions which deliver an event to one // event handler. @@ -203,7 +204,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam // Unit tests don't use informers if informer != nil { // Cannot fail in practice?! No-one bothers checking the error. - _, _ = informer.AddEventHandler( + c.handlerRegistration, _ = informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: c.add, UpdateFunc: c.update, @@ -257,18 +258,7 @@ func (c *AssumeCache) add(obj interface{}) { c.logger.Info("Error occurred while updating stored object", "err", err) } else { c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) - for _, handler := range c.eventHandlers { - handler := handler - if oldObj == nil { - c.eventQueue.Push(func() { - handler.OnAdd(obj, false) - }) - } else { - c.eventQueue.Push(func() { - handler.OnUpdate(oldObj, obj) - }) - } - } + c.pushEvent(oldObj, obj) } } @@ -304,11 +294,32 @@ func (c *AssumeCache) delete(obj interface{}) { c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) } + c.pushEvent(oldObj, nil) +} + +// pushEvent gets called while the mutex is locked for writing. +// It ensures that all currently registered event handlers get +// notified about a change when the caller starts delivering +// those with emitEvents. +// +// For a delete event, newObj is nil. For an add, oldObj is nil. +// An update has both as non-nil. +func (c *AssumeCache) pushEvent(oldObj, newObj interface{}) { for _, handler := range c.eventHandlers { handler := handler - c.eventQueue.Push(func() { - handler.OnDelete(oldObj) - }) + if oldObj == nil { + c.eventQueue.Push(func() { + handler.OnAdd(newObj, false) + }) + } else if newObj == nil { + c.eventQueue.Push(func() { + handler.OnDelete(oldObj) + }) + } else { + c.eventQueue.Push(func() { + handler.OnUpdate(oldObj, newObj) + }) + } } } @@ -441,13 +452,7 @@ func (c *AssumeCache) Assume(obj interface{}) error { return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) } - for _, handler := range c.eventHandlers { - handler := handler - oldObj := objInfo.latestObj - c.eventQueue.Push(func() { - handler.OnUpdate(oldObj, obj) - }) - } + c.pushEvent(objInfo.latestObj, obj) // Only update the cached object objInfo.latestObj = obj @@ -467,14 +472,7 @@ func (c *AssumeCache) Restore(objName string) { c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) } else { if objInfo.latestObj != objInfo.apiObj { - for _, handler := range c.eventHandlers { - handler := handler - oldObj, obj := objInfo.latestObj, objInfo.apiObj - c.eventQueue.Push(func() { - handler.OnUpdate(oldObj, obj) - }) - } - + c.pushEvent(objInfo.latestObj, objInfo.apiObj) objInfo.latestObj = objInfo.apiObj } c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) @@ -485,7 +483,9 @@ func (c *AssumeCache) Restore(objName string) { // single handler are delivered sequentially, but there is no // coordination between different handlers. A handler may use the // cache. -func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) { +// +// The return value can be used to wait for cache synchronization. +func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache.ResourceEventHandlerRegistration { defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() @@ -497,6 +497,13 @@ func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) { handler.OnAdd(obj, true) }) } + + if c.handlerRegistration == nil { + // No informer, so immediately synced. + return syncedHandlerRegistration{} + } + + return c.handlerRegistration } // emitEvents delivers all pending events that are in the queue, in the order @@ -516,3 +523,9 @@ func (c *AssumeCache) emitEvents() { }() } } + +// syncedHandlerRegistration is an implementation of ResourceEventHandlerRegistration +// which always returns true. +type syncedHandlerRegistration struct{} + +func (syncedHandlerRegistration) HasSynced() bool { return true } diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go index 8e4336730aa..a18d51e0a87 100644 --- a/pkg/scheduler/util/assumecache/assume_cache_test.go +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -290,7 +290,7 @@ func TestRestore(t *testing.T) { events.verifyAndFlush(tCtx, []event{{What: "add", Obj: oldObj}}) }) - // Restore object. + // Restore the same object. ktesting.Step(tCtx, "initial Restore", func(tCtx ktesting.TContext) { cache.Restore(oldObj.GetName()) verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) @@ -306,7 +306,7 @@ func TestRestore(t *testing.T) { events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) }) - // Restore object. + // Restore the same object. ktesting.Step(tCtx, "second Restore", func(tCtx ktesting.TContext) { cache.Restore(oldObj.GetName()) verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) @@ -346,6 +346,7 @@ func TestEvents(t *testing.T) { ktesting.Step(tCtx, "nil add", func(tCtx ktesting.TContext) { informer.add(nil) verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) }) ktesting.Step(tCtx, "nop update", func(tCtx ktesting.TContext) { informer.update(oldObj) @@ -378,7 +379,7 @@ func TestEventHandlers(t *testing.T) { tCtx, cache, informer := newTest(t) handlers := make([]mockEventHandler, 5) - objs := make([]metav1.Object, 0, 20) + var objs []metav1.Object for i := 0; i < 5; i++ { objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) informer.add(objs[i]) @@ -437,7 +438,7 @@ func TestEventHandlerConcurrency(t *testing.T) { tCtx, cache, informer := newTest(t) handlers := make([]mockEventHandler, 5) - objs := make([]metav1.Object, 0, 20) + var objs []metav1.Object for i := 0; i < 5; i++ { objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) } @@ -487,7 +488,7 @@ func TestListNoIndexer(t *testing.T) { tCtx, cache, informer := newTest(t) // Add a bunch of objects. - objs := make([]interface{}, 0, 10) + var objs []interface{} for i := 0; i < 10; i++ { obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "") objs = append(objs, obj) @@ -526,7 +527,7 @@ func TestListWithIndexer(t *testing.T) { // Add a bunch of objects. ns := "ns1" - objs := make([]interface{}, 0, 10) + var objs []interface{} for i := 0; i < 10; i++ { obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns) objs = append(objs, obj)