Merge pull request #75849 from jpbetz/pagination-podgc
Add ListPager.EachListItem util
This commit is contained in:
		| @@ -17,6 +17,7 @@ go_library( | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", | ||||
|     ], | ||||
| ) | ||||
|  | ||||
|   | ||||
| @@ -25,9 +25,11 @@ import ( | ||||
| 	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||||
| ) | ||||
|  | ||||
| const defaultPageSize = 500 | ||||
| const defaultPageBufferSize = 10 | ||||
|  | ||||
| // ListPageFunc returns a list object for the given list options. | ||||
| type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) | ||||
| @@ -48,6 +50,9 @@ type ListPager struct { | ||||
| 	PageFn   ListPageFunc | ||||
|  | ||||
| 	FullListIfExpired bool | ||||
|  | ||||
| 	// Number of pages to buffer | ||||
| 	PageBufferSize int32 | ||||
| } | ||||
|  | ||||
| // New creates a new pager from the provided pager function using the default | ||||
| @@ -58,6 +63,7 @@ func New(fn ListPageFunc) *ListPager { | ||||
| 		PageSize:          defaultPageSize, | ||||
| 		PageFn:            fn, | ||||
| 		FullListIfExpired: true, | ||||
| 		PageBufferSize:    defaultPageBufferSize, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -73,6 +79,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti | ||||
| 	} | ||||
| 	var list *metainternalversion.List | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return nil, ctx.Err() | ||||
| 		default: | ||||
| 		} | ||||
|  | ||||
| 		obj, err := p.PageFn(ctx, options) | ||||
| 		if err != nil { | ||||
| 			if !errors.IsResourceExpired(err) || !p.FullListIfExpired { | ||||
| @@ -115,3 +127,105 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti | ||||
| 		options.Continue = m.GetContinue() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // EachListItem fetches runtime.Object items using this ListPager and invokes fn on each item. If | ||||
| // fn returns an error, processing stops and that error is returned. If fn does not return an error, | ||||
| // any error encountered while retrieving the list from the server is returned. If the context | ||||
| // cancels or times out, the context error is returned. Since the list is retrieved in paginated | ||||
| // chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the pagination list | ||||
| // requests exceed the expiration limit of the apiserver being called. | ||||
| // | ||||
| // Items are retrieved in chunks from the server to reduce the impact on the server with up to | ||||
| // ListPager.PageBufferSize chunks buffered concurrently in the background. | ||||
| func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error { | ||||
| 	return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error { | ||||
| 		return meta.EachListItem(obj, fn) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on | ||||
| // each list chunk.  If fn returns an error, processing stops and that error is returned. If fn does | ||||
| // not return an error, any error encountered while retrieving the list from the server is | ||||
| // returned. If the context cancels or times out, the context error is returned. Since the list is | ||||
| // retrieved in paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if | ||||
| // the pagination list requests exceed the expiration limit of the apiserver being called. | ||||
| // | ||||
| // Up to ListPager.PageBufferSize chunks are buffered concurrently in the background. | ||||
| func (p *ListPager) eachListChunkBuffered(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error { | ||||
| 	if p.PageBufferSize < 0 { | ||||
| 		return fmt.Errorf("ListPager.PageBufferSize must be >= 0, got %d", p.PageBufferSize) | ||||
| 	} | ||||
|  | ||||
| 	// Ensure background goroutine is stopped if this call exits before all list items are | ||||
| 	// processed. Cancelation error from this deferred cancel call is never returned to caller; | ||||
| 	// either the list result has already been sent to bgResultC or the fn error is returned and | ||||
| 	// the cancelation error is discarded. | ||||
| 	ctx, cancel := context.WithCancel(ctx) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	chunkC := make(chan runtime.Object, p.PageBufferSize) | ||||
| 	bgResultC := make(chan error, 1) | ||||
| 	go func() { | ||||
| 		defer utilruntime.HandleCrash() | ||||
|  | ||||
| 		var err error | ||||
| 		defer func() { | ||||
| 			close(chunkC) | ||||
| 			bgResultC <- err | ||||
| 		}() | ||||
| 		err = p.eachListChunk(ctx, options, func(chunk runtime.Object) error { | ||||
| 			select { | ||||
| 			case chunkC <- chunk: // buffer the chunk, this can block | ||||
| 			case <-ctx.Done(): | ||||
| 				return ctx.Err() | ||||
| 			} | ||||
| 			return nil | ||||
| 		}) | ||||
| 	}() | ||||
|  | ||||
| 	for o := range chunkC { | ||||
| 		err := fn(o) | ||||
| 		if err != nil { | ||||
| 			return err // any fn error should be returned immediately | ||||
| 		} | ||||
| 	} | ||||
| 	// promote the results of our background goroutine to the foreground | ||||
| 	return <-bgResultC | ||||
| } | ||||
|  | ||||
| // eachListChunk fetches runtimeObject list chunks using this ListPager and invokes fn on each list | ||||
| // chunk. If fn returns an error, processing stops and that error is returned. If fn does not return | ||||
| // an error, any error encountered while retrieving the list from the server is returned. If the | ||||
| // context cancels or times out, the context error is returned. Since the list is retrieved in | ||||
| // paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the | ||||
| // pagination list requests exceed the expiration limit of the apiserver being called. | ||||
| func (p *ListPager) eachListChunk(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error { | ||||
| 	if options.Limit == 0 { | ||||
| 		options.Limit = p.PageSize | ||||
| 	} | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return ctx.Err() | ||||
| 		default: | ||||
| 		} | ||||
|  | ||||
| 		obj, err := p.PageFn(ctx, options) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		m, err := meta.ListAccessor(obj) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("returned object must be a list: %v", err) | ||||
| 		} | ||||
| 		if err := fn(obj); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		// if we have no more items, return. | ||||
| 		if len(m.GetContinue()) == 0 { | ||||
| 			return nil | ||||
| 		} | ||||
| 		// set the next loop up | ||||
| 		options.Continue = m.GetContinue() | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -21,6 +21,7 @@ import ( | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/apimachinery/pkg/api/errors" | ||||
| 	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" | ||||
| @@ -115,7 +116,6 @@ func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options | ||||
| 	} | ||||
| 	return p.PagedList(ctx, options) | ||||
| } | ||||
|  | ||||
| func TestListPager_List(t *testing.T) { | ||||
| 	type fields struct { | ||||
| 		PageSize          int64 | ||||
| @@ -189,7 +189,11 @@ func TestListPager_List(t *testing.T) { | ||||
| 				PageFn:            tt.fields.PageFn, | ||||
| 				FullListIfExpired: tt.fields.FullListIfExpired, | ||||
| 			} | ||||
| 			got, err := p.List(tt.args.ctx, tt.args.options) | ||||
| 			ctx := tt.args.ctx | ||||
| 			if ctx == nil { | ||||
| 				ctx = context.Background() | ||||
| 			} | ||||
| 			got, err := p.List(ctx, tt.args.options) | ||||
| 			if (err != nil) != tt.wantErr { | ||||
| 				t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr) | ||||
| 				return | ||||
| @@ -204,3 +208,237 @@ func TestListPager_List(t *testing.T) { | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestListPager_EachListItem(t *testing.T) { | ||||
| 	type fields struct { | ||||
| 		PageSize int64 | ||||
| 		PageFn   ListPageFunc | ||||
| 	} | ||||
| 	tests := []struct { | ||||
| 		name                 string | ||||
| 		fields               fields | ||||
| 		want                 runtime.Object | ||||
| 		wantErr              bool | ||||
| 		wantPanic            bool | ||||
| 		isExpired            bool | ||||
| 		processorErrorOnItem int | ||||
| 		processorPanicOnItem int | ||||
| 		cancelContextOnItem  int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name:   "empty page", | ||||
| 			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList}, | ||||
| 			want:   list(0, "rv:20"), | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:   "one page", | ||||
| 			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList}, | ||||
| 			want:   list(9, "rv:20"), | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:   "one full page", | ||||
| 			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList}, | ||||
| 			want:   list(10, "rv:20"), | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:   "two pages", | ||||
| 			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, | ||||
| 			want:   list(11, "rv:20"), | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:   "three pages", | ||||
| 			fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList}, | ||||
| 			want:   list(21, "rv:20"), | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:      "expires on second page", | ||||
| 			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage}, | ||||
| 			want:      list(10, "rv:20"), // all items on the first page should have been visited | ||||
| 			wantErr:   true, | ||||
| 			isExpired: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:                 "error processing item", | ||||
| 			fields:               fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList}, | ||||
| 			want:                 list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited | ||||
| 			wantPanic:            true, | ||||
| 			processorPanicOnItem: 3, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:                "cancel context while processing", | ||||
| 			fields:              fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList}, | ||||
| 			want:                list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited | ||||
| 			wantErr:             true, | ||||
| 			cancelContextOnItem: 3, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:      "panic processing item", | ||||
| 			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList}, | ||||
| 			want:      list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited | ||||
| 			wantPanic: true, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	processorErr := fmt.Errorf("processor error") | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			ctx, cancel := context.WithCancel(context.Background()) | ||||
| 			p := &ListPager{ | ||||
| 				PageSize: tt.fields.PageSize, | ||||
| 				PageFn:   tt.fields.PageFn, | ||||
| 			} | ||||
| 			var items []runtime.Object | ||||
|  | ||||
| 			fn := func(obj runtime.Object) error { | ||||
| 				items = append(items, obj) | ||||
| 				if tt.processorErrorOnItem > 0 && len(items) == tt.processorErrorOnItem { | ||||
| 					return processorErr | ||||
| 				} | ||||
| 				if tt.processorPanicOnItem > 0 && len(items) == tt.processorPanicOnItem { | ||||
| 					panic(processorErr) | ||||
| 				} | ||||
| 				if tt.cancelContextOnItem > 0 && len(items) == tt.cancelContextOnItem { | ||||
| 					cancel() | ||||
| 				} | ||||
| 				return nil | ||||
| 			} | ||||
| 			var err error | ||||
| 			var panic interface{} | ||||
| 			func() { | ||||
| 				defer func() { | ||||
| 					panic = recover() | ||||
| 				}() | ||||
| 				err = p.EachListItem(ctx, metav1.ListOptions{}, fn) | ||||
| 			}() | ||||
| 			if (panic != nil) && !tt.wantPanic { | ||||
| 				t.Fatalf(".EachListItem() panic = %v, wantPanic %v", panic, tt.wantPanic) | ||||
| 			} else { | ||||
| 				return | ||||
| 			} | ||||
| 			if (err != nil) != tt.wantErr { | ||||
| 				t.Errorf("ListPager.EachListItem() error = %v, wantErr %v", err, tt.wantErr) | ||||
| 				return | ||||
| 			} | ||||
| 			if tt.isExpired != errors.IsResourceExpired(err) { | ||||
| 				t.Errorf("ListPager.EachListItem() error = %v, isExpired %v", err, tt.isExpired) | ||||
| 				return | ||||
| 			} | ||||
| 			if tt.processorErrorOnItem > 0 && err != processorErr { | ||||
| 				t.Errorf("ListPager.EachListItem() error = %v, processorErrorOnItem %d", err, tt.processorErrorOnItem) | ||||
| 				return | ||||
| 			} | ||||
| 			l := tt.want.(*metainternalversion.List) | ||||
| 			if !reflect.DeepEqual(items, l.Items) { | ||||
| 				t.Errorf("ListPager.EachListItem() = %v, want %v", items, l.Items) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestListPager_eachListPageBuffered(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		name           string | ||||
| 		totalPages     int | ||||
| 		pagesProcessed int | ||||
| 		wantPageLists  int | ||||
| 		pageBufferSize int32 | ||||
| 		pageSize       int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name:           "no buffer, one total page", | ||||
| 			totalPages:     1, | ||||
| 			pagesProcessed: 1, | ||||
| 			wantPageLists:  1, | ||||
| 			pageBufferSize: 0, | ||||
| 		}, { | ||||
| 			name:           "no buffer, 1/5 pages processed", | ||||
| 			totalPages:     5, | ||||
| 			pagesProcessed: 1, | ||||
| 			wantPageLists:  2, // 1 received for processing, 1 listed | ||||
| 			pageBufferSize: 0, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:           "no buffer, 2/5 pages processed", | ||||
| 			totalPages:     5, | ||||
| 			pagesProcessed: 2, | ||||
| 			wantPageLists:  3, | ||||
| 			pageBufferSize: 0, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:           "no buffer, 5/5 pages processed", | ||||
| 			totalPages:     5, | ||||
| 			pagesProcessed: 5, | ||||
| 			wantPageLists:  5, | ||||
| 			pageBufferSize: 0, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:           "size 1 buffer, 1/5 pages processed", | ||||
| 			totalPages:     5, | ||||
| 			pagesProcessed: 1, | ||||
| 			wantPageLists:  3, | ||||
| 			pageBufferSize: 1, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:           "size 1 buffer, 5/5 pages processed", | ||||
| 			totalPages:     5, | ||||
| 			pagesProcessed: 5, | ||||
| 			wantPageLists:  5, | ||||
| 			pageBufferSize: 1, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:           "size 10 buffer, 1/5 page processed", | ||||
| 			totalPages:     5, | ||||
| 			pagesProcessed: 1, | ||||
| 			wantPageLists:  5, | ||||
| 			pageBufferSize: 10, // buffer is larger than list | ||||
| 		}, | ||||
| 	} | ||||
| 	processorErr := fmt.Errorf("processor error") | ||||
| 	pageSize := 10 | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			pgr := &testPager{t: t, expectPage: int64(pageSize), remaining: tt.totalPages * pageSize, rv: "rv:20"} | ||||
| 			pageLists := 0 | ||||
| 			wantedPageListsDone := make(chan struct{}) | ||||
| 			listFn := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { | ||||
| 				pageLists++ | ||||
| 				if pageLists == tt.wantPageLists { | ||||
| 					close(wantedPageListsDone) | ||||
| 				} | ||||
| 				return pgr.PagedList(ctx, options) | ||||
| 			} | ||||
| 			p := &ListPager{ | ||||
| 				PageSize:       int64(pageSize), | ||||
| 				PageBufferSize: tt.pageBufferSize, | ||||
| 				PageFn:         listFn, | ||||
| 			} | ||||
|  | ||||
| 			pagesProcessed := 0 | ||||
| 			fn := func(obj runtime.Object) error { | ||||
| 				pagesProcessed++ | ||||
| 				if tt.pagesProcessed == pagesProcessed && tt.wantPageLists > 0 { | ||||
| 					// wait for buffering to catch up | ||||
| 					select { | ||||
| 					case <-time.After(time.Second): | ||||
| 						return fmt.Errorf("Timed out waiting for %d page lists", tt.wantPageLists) | ||||
| 					case <-wantedPageListsDone: | ||||
| 					} | ||||
| 					return processorErr | ||||
| 				} | ||||
| 				return nil | ||||
| 			} | ||||
| 			err := p.eachListChunkBuffered(context.Background(), metav1.ListOptions{}, fn) | ||||
| 			if tt.pagesProcessed > 0 && err == processorErr { | ||||
| 				// expected | ||||
| 			} else if err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| 			if tt.wantPageLists > 0 && pageLists != tt.wantPageLists { | ||||
| 				t.Errorf("expected %d page lists, got %d", tt.wantPageLists, pageLists) | ||||
| 			} | ||||
| 			if pagesProcessed != tt.pagesProcessed { | ||||
| 				t.Errorf("expected %d pages processed, got %d", tt.pagesProcessed, pagesProcessed) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot