From 732775ee7d1270b6056c371c4298a6b3380bec02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 9 Feb 2023 20:59:18 +0100 Subject: [PATCH 1/2] Paginate within DeleteCollection call. --- .../pkg/registry/generic/registry/store.go | 111 +++++++++++++----- .../registry/generic/registry/store_test.go | 49 +++++--- 2 files changed, 118 insertions(+), 42 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 55f06f7972b..353e5532c1e 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -1143,6 +1143,11 @@ func (e *Store) DeleteReturnsDeletedObject() bool { return e.ReturnDeletedObject } +// deleteCollectionPageSize is the size of the page used when +// listing objects from storage during DeleteCollection calls. +// It's a variable to make allow overwriting in tests. +var deleteCollectionPageSize = int64(10000) + // DeleteCollection removes all items returned by List with a given ListOptions from storage. // // DeleteCollection is currently NOT atomic. It can happen that only subset of objects @@ -1155,27 +1160,11 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali listOptions = listOptions.DeepCopy() } - listObj, err := e.List(ctx, listOptions) - if err != nil { - return nil, err - } - items, err := meta.ExtractList(listObj) - if err != nil { - return nil, err - } - if len(items) == 0 { - // Nothing to delete, return now - return listObj, nil - } - // Spawn a number of goroutines, so that we can issue requests to storage - // in parallel to speed up deletion. - // It is proportional to the number of items to delete, up to - // DeleteCollectionWorkers (it doesn't make much sense to spawn 16 - // workers to delete 10 items). + itemsLock := sync.RWMutex{} + var items []runtime.Object + + // TODO(wojtek-t): Decide if we don't want to start workers more opportunistically. workersNumber := e.DeleteCollectionWorkers - if workersNumber > len(items) { - workersNumber = len(items) - } if workersNumber < 1 { workersNumber = 1 } @@ -1194,7 +1183,9 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali defer wg.Done() for index := range toProcess { + itemsLock.RLock() accessor, err := meta.Accessor(items[index]) + itemsLock.RUnlock() if err != nil { errs <- err return @@ -1220,20 +1211,86 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali close(workersExited) }() - func() { + hasLimit := listOptions.Limit > 0 + if listOptions.Limit == 0 { + listOptions.Limit = deleteCollectionPageSize + } + + // Paginate the list request and throw all items into workers. + listObj, err := func() (runtime.Object, error) { defer close(toProcess) - for i := 0; i < len(items); i++ { + processedItems := 0 + var originalList runtime.Object + for { select { - case toProcess <- i: - case <-workersExited: - klog.V(4).InfoS("workers already exited, and there are some items waiting to be processed", "finished", i, "total", len(items)) - return + case <-ctx.Done(): + return nil, ctx.Err() + default: } + + listObj, err := e.List(ctx, listOptions) + if err != nil { + return nil, err + } + + newItems, err := meta.ExtractList(listObj) + if err != nil { + return nil, err + } + itemsLock.Lock() + items = append(items, newItems...) + itemsLock.Unlock() + + for i := 0; i < len(newItems); i++ { + select { + case toProcess <- processedItems + i: + case <-workersExited: + klog.V(4).InfoS("workers already exited, and there are some items waiting to be processed", "queued/finished", i, "total", processedItems+len(newItems)) + // Try to propagate an error from the workers if possible. + select { + case err := <-errs: + return nil, err + default: + return nil, fmt.Errorf("all DeleteCollection workers exited") + } + } + } + processedItems += len(newItems) + + // If the original request was setting the limit, finish after running it. + if hasLimit { + return listObj, nil + } + + if originalList == nil { + originalList = listObj + meta.SetList(originalList, nil) + } + + // If there are no more items, return the list. + m, err := meta.ListAccessor(listObj) + if err != nil { + return nil, err + } + if len(m.GetContinue()) == 0 { + itemsLock.Lock() + meta.SetList(originalList, items) + itemsLock.Unlock() + return originalList, nil + } + + // Set up the next loop. + listOptions.Continue = m.GetContinue() + listOptions.ResourceVersion = "" + listOptions.ResourceVersionMatch = "" } }() + if err != nil { + return nil, err + } - // Wait for all workers to exist. + // Wait for all workers to exit. <-workersExited select { diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index 390f794065e..c05940a6f2d 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -25,6 +25,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -2020,19 +2021,34 @@ func TestStoreDeletionPropagation(t *testing.T) { } } -func TestStoreDeleteCollection(t *testing.T) { - podA := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - podB := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} +type storageWithCounter struct { + storage.Interface + listCounter int64 +} + +func (s *storageWithCounter) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + atomic.AddInt64(&s.listCounter, 1) + return s.Interface.GetList(ctx, key, opts, listObj) +} + +func TestStoreDeleteCollection(t *testing.T) { testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test") destroyFunc, registry := NewTestGenericStoreRegistry(t) defer destroyFunc() - if _, err := registry.Create(testContext, podA, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil { - t.Errorf("Unexpected error: %v", err) - } - if _, err := registry.Create(testContext, podB, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil { - t.Errorf("Unexpected error: %v", err) + // Overwrite the underlying storage interface so that it counts GetList calls + // and reduce the default page size to 2. + storeWithCounter := &storageWithCounter{Interface: registry.Storage.Storage} + registry.Storage.Storage = storeWithCounter + deleteCollectionPageSize = 2 + + numPods := 10 + for i := 0; i < numPods; i++ { + pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}} + if _, err := registry.Create(testContext, pod, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil { + t.Errorf("Unexpected error: %v", err) + } } // Delete all pods. @@ -2041,15 +2057,18 @@ func TestStoreDeleteCollection(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } deletedPods := deleted.(*example.PodList) - if len(deletedPods.Items) != 2 { - t.Errorf("Unexpected number of pods deleted: %d, expected: 3", len(deletedPods.Items)) + if len(deletedPods.Items) != numPods { + t.Errorf("Unexpected number of pods deleted: %d, expected: %d", len(deletedPods.Items), numPods) + } + expectedCalls := (int64(numPods) + deleteCollectionPageSize - 1) / deleteCollectionPageSize + if listCalls := atomic.LoadInt64(&storeWithCounter.listCounter); listCalls != expectedCalls { + t.Errorf("Unexpected number of list calls: %d, expected: %d", listCalls, expectedCalls) } - if _, err := registry.Get(testContext, podA.Name, &metav1.GetOptions{}); !errors.IsNotFound(err) { - t.Errorf("Unexpected error: %v", err) - } - if _, err := registry.Get(testContext, podB.Name, &metav1.GetOptions{}); !errors.IsNotFound(err) { - t.Errorf("Unexpected error: %v", err) + for i := 0; i < numPods; i++ { + if _, err := registry.Get(testContext, fmt.Sprintf("foo-%d", i), &metav1.GetOptions{}); !errors.IsNotFound(err) { + t.Errorf("Unexpected error: %v", err) + } } } From e08bd3bc263615f3a26bd17c66a3222e4b6ff7ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Mon, 3 Jul 2023 16:56:53 +0200 Subject: [PATCH 2/2] Simplify DeleteCollection --- .../pkg/registry/generic/registry/store.go | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 353e5532c1e..028053952a3 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -1160,7 +1160,6 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali listOptions = listOptions.DeepCopy() } - itemsLock := sync.RWMutex{} var items []runtime.Object // TODO(wojtek-t): Decide if we don't want to start workers more opportunistically. @@ -1169,7 +1168,14 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali workersNumber = 1 } wg := sync.WaitGroup{} - toProcess := make(chan int, 2*workersNumber) + // Ensure that chanSize is not too high (to avoid wasted work) but + // at the same time high enough to start listing before we process + // the whole page. + chanSize := 2 * workersNumber + if chanSize < 256 { + chanSize = 256 + } + toProcess := make(chan runtime.Object, chanSize) errs := make(chan error, workersNumber+1) workersExited := make(chan struct{}) @@ -1182,10 +1188,8 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali }) defer wg.Done() - for index := range toProcess { - itemsLock.RLock() - accessor, err := meta.Accessor(items[index]) - itemsLock.RUnlock() + for item := range toProcess { + accessor, err := meta.Accessor(item) if err != nil { errs <- err return @@ -1238,13 +1242,11 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali if err != nil { return nil, err } - itemsLock.Lock() items = append(items, newItems...) - itemsLock.Unlock() for i := 0; i < len(newItems); i++ { select { - case toProcess <- processedItems + i: + case toProcess <- newItems[i]: case <-workersExited: klog.V(4).InfoS("workers already exited, and there are some items waiting to be processed", "queued/finished", i, "total", processedItems+len(newItems)) // Try to propagate an error from the workers if possible. @@ -1274,9 +1276,7 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali return nil, err } if len(m.GetContinue()) == 0 { - itemsLock.Lock() meta.SetList(originalList, items) - itemsLock.Unlock() return originalList, nil }