Merge pull request #122657 from p0lyn0mial/upstream-client-go-rest-client-watch-list
add watchlist to client-go rest client
This commit is contained in:
		@@ -37,12 +37,15 @@ import (
 | 
				
			|||||||
	"golang.org/x/net/http2"
 | 
						"golang.org/x/net/http2"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
						"k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/api/meta"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/conversion"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
						"k8s.io/apimachinery/pkg/runtime/schema"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
 | 
						"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/net"
 | 
						"k8s.io/apimachinery/pkg/util/net"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/watch"
 | 
						"k8s.io/apimachinery/pkg/watch"
 | 
				
			||||||
 | 
						clientfeatures "k8s.io/client-go/features"
 | 
				
			||||||
	restclientwatch "k8s.io/client-go/rest/watch"
 | 
						restclientwatch "k8s.io/client-go/rest/watch"
 | 
				
			||||||
	"k8s.io/client-go/tools/metrics"
 | 
						"k8s.io/client-go/tools/metrics"
 | 
				
			||||||
	"k8s.io/client-go/util/flowcontrol"
 | 
						"k8s.io/client-go/util/flowcontrol"
 | 
				
			||||||
@@ -768,6 +771,142 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type WatchListResult struct {
 | 
				
			||||||
 | 
						// err holds any errors we might have received
 | 
				
			||||||
 | 
						// during streaming.
 | 
				
			||||||
 | 
						err error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// items hold the collected data
 | 
				
			||||||
 | 
						items []runtime.Object
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// initialEventsEndBookmarkRV holds the resource version
 | 
				
			||||||
 | 
						// extracted from the bookmark event that marks
 | 
				
			||||||
 | 
						// the end of the stream.
 | 
				
			||||||
 | 
						initialEventsEndBookmarkRV string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// gv represents the API version
 | 
				
			||||||
 | 
						// it is used to construct the final list response
 | 
				
			||||||
 | 
						// normally this information is filled by the server
 | 
				
			||||||
 | 
						gv schema.GroupVersion
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r WatchListResult) Into(obj runtime.Object) error {
 | 
				
			||||||
 | 
						if r.err != nil {
 | 
				
			||||||
 | 
							return r.err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						listPtr, err := meta.GetItemsPtr(obj)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						listVal, err := conversion.EnforcePtr(listPtr)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if listVal.Kind() != reflect.Slice {
 | 
				
			||||||
 | 
							return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if len(r.items) == 0 {
 | 
				
			||||||
 | 
							listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0))
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							listVal.Set(reflect.MakeSlice(listVal.Type(), len(r.items), len(r.items)))
 | 
				
			||||||
 | 
							for i, o := range r.items {
 | 
				
			||||||
 | 
								if listVal.Type().Elem() != reflect.TypeOf(o).Elem() {
 | 
				
			||||||
 | 
									return fmt.Errorf("received object type = %v at index = %d, doesn't match the list item type = %v", reflect.TypeOf(o).Elem(), i, listVal.Type().Elem())
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								listVal.Index(i).Set(reflect.ValueOf(o).Elem())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						listMeta, err := meta.ListAccessor(obj)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						listMeta.SetResourceVersion(r.initialEventsEndBookmarkRV)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						typeMeta, err := meta.TypeAccessor(obj)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						version := r.gv.String()
 | 
				
			||||||
 | 
						typeMeta.SetAPIVersion(version)
 | 
				
			||||||
 | 
						typeMeta.SetKind(reflect.TypeOf(obj).Elem().Name())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// WatchList establishes a stream to get a consistent snapshot of data
 | 
				
			||||||
 | 
					// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Note that the watchlist requires properly setting the ListOptions
 | 
				
			||||||
 | 
					// otherwise it just establishes a regular watch with the server.
 | 
				
			||||||
 | 
					// Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
 | 
				
			||||||
 | 
					// to see what parameters are currently required.
 | 
				
			||||||
 | 
					func (r *Request) WatchList(ctx context.Context) WatchListResult {
 | 
				
			||||||
 | 
						if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
 | 
				
			||||||
 | 
							return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// TODO(#115478): consider validating request parameters (i.e sendInitialEvents).
 | 
				
			||||||
 | 
						//  Most users use the generated client, which handles the proper setting of parameters.
 | 
				
			||||||
 | 
						//  We don't have validation for other methods (e.g., the Watch)
 | 
				
			||||||
 | 
						//  thus, for symmetry, we haven't added additional checks for the WatchList method.
 | 
				
			||||||
 | 
						w, err := r.Watch(ctx)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return WatchListResult{err: err}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return r.handleWatchList(ctx, w)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// handleWatchList holds the actual logic for easier unit testing.
 | 
				
			||||||
 | 
					// Note that this function will close the passed watch.
 | 
				
			||||||
 | 
					func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchListResult {
 | 
				
			||||||
 | 
						defer w.Stop()
 | 
				
			||||||
 | 
						var lastKey string
 | 
				
			||||||
 | 
						var items []runtime.Object
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case <-ctx.Done():
 | 
				
			||||||
 | 
								return WatchListResult{err: ctx.Err()}
 | 
				
			||||||
 | 
							case event, ok := <-w.ResultChan():
 | 
				
			||||||
 | 
								if !ok {
 | 
				
			||||||
 | 
									return WatchListResult{err: fmt.Errorf("unexpected watch close")}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if event.Type == watch.Error {
 | 
				
			||||||
 | 
									return WatchListResult{err: errors.FromObject(event.Object)}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								meta, err := meta.Accessor(event.Object)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return WatchListResult{err: fmt.Errorf("failed to parse watch event: %#v", event)}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								switch event.Type {
 | 
				
			||||||
 | 
								case watch.Added:
 | 
				
			||||||
 | 
									// the following check ensures that the response is ordered.
 | 
				
			||||||
 | 
									// earlier servers had a bug that caused them to not sort the output.
 | 
				
			||||||
 | 
									// in such cases, return an error which can trigger fallback logic.
 | 
				
			||||||
 | 
									key := objectKeyFromMeta(meta)
 | 
				
			||||||
 | 
									if len(lastKey) > 0 && lastKey > key {
 | 
				
			||||||
 | 
										return WatchListResult{err: fmt.Errorf("cannot add the obj (%#v) with the key = %s, as it violates the ordering guarantees provided by the watchlist feature in beta phase, lastInsertedKey was = %s", event.Object, key, lastKey)}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									items = append(items, event.Object)
 | 
				
			||||||
 | 
									lastKey = key
 | 
				
			||||||
 | 
								case watch.Bookmark:
 | 
				
			||||||
 | 
									if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
 | 
				
			||||||
 | 
										return WatchListResult{
 | 
				
			||||||
 | 
											items:                      items,
 | 
				
			||||||
 | 
											initialEventsEndBookmarkRV: meta.GetResourceVersion(),
 | 
				
			||||||
 | 
											gv:                         r.c.content.GroupVersion,
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								default:
 | 
				
			||||||
 | 
									return WatchListResult{err: fmt.Errorf("unexpected watch event %#v, expected to only receive watch.Added and watch.Bookmark events", event)}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
 | 
					func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
 | 
				
			||||||
	contentType := resp.Header.Get("Content-Type")
 | 
						contentType := resp.Header.Get("Content-Type")
 | 
				
			||||||
	mediaType, params, err := mime.ParseMediaType(contentType)
 | 
						mediaType, params, err := mime.ParseMediaType(contentType)
 | 
				
			||||||
@@ -1470,3 +1609,10 @@ func ValidatePathSegmentName(name string, prefix bool) []string {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return IsValidPathSegmentName(name)
 | 
						return IsValidPathSegmentName(name)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func objectKeyFromMeta(objMeta metav1.Object) string {
 | 
				
			||||||
 | 
						if len(objMeta.GetNamespace()) > 0 {
 | 
				
			||||||
 | 
							return fmt.Sprintf("%s/%s", objMeta.GetNamespace(), objMeta.GetName())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return objMeta.GetName()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										356
									
								
								staging/src/k8s.io/client-go/rest/request_watchlist_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										356
									
								
								staging/src/k8s.io/client-go/rest/request_watchlist_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,356 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2024 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package rest
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"regexp"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/google/go-cmp/cmp"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						v1 "k8s.io/api/core/v1"
 | 
				
			||||||
 | 
						apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
				
			||||||
 | 
						apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/runtime/schema"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/watch"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestWatchListResult(t *testing.T) {
 | 
				
			||||||
 | 
						scenarios := []struct {
 | 
				
			||||||
 | 
							name   string
 | 
				
			||||||
 | 
							target WatchListResult
 | 
				
			||||||
 | 
							result runtime.Object
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							expectedResult *v1.PodList
 | 
				
			||||||
 | 
							expectedErr    error
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:        "not a pointer",
 | 
				
			||||||
 | 
								result:      fakeObj{},
 | 
				
			||||||
 | 
								expectedErr: fmt.Errorf("rest.fakeObj is not a list: expected pointer, but got rest.fakeObj type"),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:        "nil input won't panic",
 | 
				
			||||||
 | 
								result:      nil,
 | 
				
			||||||
 | 
								expectedErr: fmt.Errorf("<nil> is not a list: expected pointer, but got invalid kind"),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:        "not a list",
 | 
				
			||||||
 | 
								result:      &v1.Pod{},
 | 
				
			||||||
 | 
								expectedErr: fmt.Errorf("*v1.Pod is not a list: no Items field in this object"),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:        "an err is always returned",
 | 
				
			||||||
 | 
								result:      nil,
 | 
				
			||||||
 | 
								target:      WatchListResult{err: fmt.Errorf("dummy err")},
 | 
				
			||||||
 | 
								expectedErr: fmt.Errorf("dummy err"),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:   "empty list",
 | 
				
			||||||
 | 
								result: &v1.PodList{},
 | 
				
			||||||
 | 
								expectedResult: &v1.PodList{
 | 
				
			||||||
 | 
									TypeMeta: metav1.TypeMeta{Kind: "PodList"},
 | 
				
			||||||
 | 
									Items:    []v1.Pod{},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:   "gv is applied",
 | 
				
			||||||
 | 
								result: &v1.PodList{},
 | 
				
			||||||
 | 
								target: WatchListResult{gv: schema.GroupVersion{Group: "g", Version: "v"}},
 | 
				
			||||||
 | 
								expectedResult: &v1.PodList{
 | 
				
			||||||
 | 
									TypeMeta: metav1.TypeMeta{Kind: "PodList", APIVersion: "g/v"},
 | 
				
			||||||
 | 
									Items:    []v1.Pod{},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:   "gv is applied, empty group",
 | 
				
			||||||
 | 
								result: &v1.PodList{},
 | 
				
			||||||
 | 
								target: WatchListResult{gv: schema.GroupVersion{Version: "v"}},
 | 
				
			||||||
 | 
								expectedResult: &v1.PodList{
 | 
				
			||||||
 | 
									TypeMeta: metav1.TypeMeta{Kind: "PodList", APIVersion: "v"},
 | 
				
			||||||
 | 
									Items:    []v1.Pod{},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:   "rv is applied",
 | 
				
			||||||
 | 
								result: &v1.PodList{},
 | 
				
			||||||
 | 
								target: WatchListResult{initialEventsEndBookmarkRV: "100"},
 | 
				
			||||||
 | 
								expectedResult: &v1.PodList{
 | 
				
			||||||
 | 
									TypeMeta: metav1.TypeMeta{Kind: "PodList"},
 | 
				
			||||||
 | 
									ListMeta: metav1.ListMeta{ResourceVersion: "100"},
 | 
				
			||||||
 | 
									Items:    []v1.Pod{},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:   "items are applied",
 | 
				
			||||||
 | 
								result: &v1.PodList{},
 | 
				
			||||||
 | 
								target: WatchListResult{items: []runtime.Object{makePod(1), makePod(2)}},
 | 
				
			||||||
 | 
								expectedResult: &v1.PodList{
 | 
				
			||||||
 | 
									TypeMeta: metav1.TypeMeta{Kind: "PodList"},
 | 
				
			||||||
 | 
									Items:    []v1.Pod{*makePod(1), *makePod(2)},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:        "type mismatch",
 | 
				
			||||||
 | 
								result:      &v1.PodList{},
 | 
				
			||||||
 | 
								target:      WatchListResult{items: []runtime.Object{makeNamespace("1")}},
 | 
				
			||||||
 | 
								expectedErr: fmt.Errorf("received object type = v1.Namespace at index = 0, doesn't match the list item type = v1.Pod"),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, scenario := range scenarios {
 | 
				
			||||||
 | 
							t.Run(scenario.name, func(t *testing.T) {
 | 
				
			||||||
 | 
								err := scenario.target.Into(scenario.result)
 | 
				
			||||||
 | 
								if scenario.expectedErr != nil && err == nil {
 | 
				
			||||||
 | 
									t.Fatalf("expected an error = %v, got nil", scenario.expectedErr)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if scenario.expectedErr == nil && err != nil {
 | 
				
			||||||
 | 
									t.Fatalf("didn't expect an error, got =  %v", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									if scenario.expectedErr.Error() != err.Error() {
 | 
				
			||||||
 | 
										t.Fatalf("unexpected err = %v, expected = %v", err, scenario.expectedErr)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if !apiequality.Semantic.DeepEqual(scenario.expectedResult, scenario.result) {
 | 
				
			||||||
 | 
									t.Errorf("diff: %v", cmp.Diff(scenario.expectedResult, scenario.result))
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestWatchListSuccess(t *testing.T) {
 | 
				
			||||||
 | 
						scenarios := []struct {
 | 
				
			||||||
 | 
							name           string
 | 
				
			||||||
 | 
							gv             schema.GroupVersion
 | 
				
			||||||
 | 
							watchEvents    []watch.Event
 | 
				
			||||||
 | 
							expectedResult *v1.PodList
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "happy path",
 | 
				
			||||||
 | 
								// Note that the APIVersion for the core API group is "v1" (not "core/v1").
 | 
				
			||||||
 | 
								// We fake "core/v1" here to test if the Group part is properly
 | 
				
			||||||
 | 
								// recognized and set on the resulting object.
 | 
				
			||||||
 | 
								gv: schema.GroupVersion{Group: "core", Version: "v1"},
 | 
				
			||||||
 | 
								watchEvents: []watch.Event{
 | 
				
			||||||
 | 
									{Type: watch.Added, Object: makePod(1)},
 | 
				
			||||||
 | 
									{Type: watch.Added, Object: makePod(2)},
 | 
				
			||||||
 | 
									{Type: watch.Bookmark, Object: makeBookmarkEvent(5)},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								expectedResult: &v1.PodList{
 | 
				
			||||||
 | 
									TypeMeta: metav1.TypeMeta{
 | 
				
			||||||
 | 
										APIVersion: "core/v1",
 | 
				
			||||||
 | 
										Kind:       "PodList",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									ListMeta: metav1.ListMeta{ResourceVersion: "5"},
 | 
				
			||||||
 | 
									Items:    []v1.Pod{*makePod(1), *makePod(2)},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "APIVersion with only version provided is properly set",
 | 
				
			||||||
 | 
								gv:   schema.GroupVersion{Version: "v1"},
 | 
				
			||||||
 | 
								watchEvents: []watch.Event{
 | 
				
			||||||
 | 
									{Type: watch.Added, Object: makePod(1)},
 | 
				
			||||||
 | 
									{Type: watch.Bookmark, Object: makeBookmarkEvent(5)},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								expectedResult: &v1.PodList{
 | 
				
			||||||
 | 
									TypeMeta: metav1.TypeMeta{
 | 
				
			||||||
 | 
										APIVersion: "v1",
 | 
				
			||||||
 | 
										Kind:       "PodList",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									ListMeta: metav1.ListMeta{ResourceVersion: "5"},
 | 
				
			||||||
 | 
									Items:    []v1.Pod{*makePod(1)},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "only the bookmark",
 | 
				
			||||||
 | 
								gv:   schema.GroupVersion{Version: "v1"},
 | 
				
			||||||
 | 
								watchEvents: []watch.Event{
 | 
				
			||||||
 | 
									{Type: watch.Bookmark, Object: makeBookmarkEvent(5)},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								expectedResult: &v1.PodList{
 | 
				
			||||||
 | 
									TypeMeta: metav1.TypeMeta{
 | 
				
			||||||
 | 
										APIVersion: "v1",
 | 
				
			||||||
 | 
										Kind:       "PodList",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									ListMeta: metav1.ListMeta{ResourceVersion: "5"},
 | 
				
			||||||
 | 
									Items:    []v1.Pod{},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, scenario := range scenarios {
 | 
				
			||||||
 | 
							t.Run(scenario.name, func(t *testing.T) {
 | 
				
			||||||
 | 
								ctx := context.Background()
 | 
				
			||||||
 | 
								fakeWatcher := watch.NewFake()
 | 
				
			||||||
 | 
								target := &Request{
 | 
				
			||||||
 | 
									c: &RESTClient{
 | 
				
			||||||
 | 
										content: ClientContentConfig{
 | 
				
			||||||
 | 
											GroupVersion: scenario.gv,
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								go func(watchEvents []watch.Event) {
 | 
				
			||||||
 | 
									for _, watchEvent := range watchEvents {
 | 
				
			||||||
 | 
										fakeWatcher.Action(watchEvent.Type, watchEvent.Object)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}(scenario.watchEvents)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								res := target.handleWatchList(ctx, fakeWatcher)
 | 
				
			||||||
 | 
								if res.err != nil {
 | 
				
			||||||
 | 
									t.Fatal(res.err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								result := &v1.PodList{}
 | 
				
			||||||
 | 
								if err := res.Into(result); err != nil {
 | 
				
			||||||
 | 
									t.Fatal(err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if !apiequality.Semantic.DeepEqual(scenario.expectedResult, result) {
 | 
				
			||||||
 | 
									t.Errorf("diff: %v", cmp.Diff(scenario.expectedResult, result))
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if !fakeWatcher.IsStopped() {
 | 
				
			||||||
 | 
									t.Fatalf("the watcher wasn't stopped")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestWatchListFailure(t *testing.T) {
 | 
				
			||||||
 | 
						scenarios := []struct {
 | 
				
			||||||
 | 
							name        string
 | 
				
			||||||
 | 
							ctx         context.Context
 | 
				
			||||||
 | 
							watcher     *watch.FakeWatcher
 | 
				
			||||||
 | 
							watchEvents []watch.Event
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							expectedError error
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "request stop",
 | 
				
			||||||
 | 
								ctx: func() context.Context {
 | 
				
			||||||
 | 
									ctx, ctxCancel := context.WithCancel(context.TODO())
 | 
				
			||||||
 | 
									ctxCancel()
 | 
				
			||||||
 | 
									return ctx
 | 
				
			||||||
 | 
								}(),
 | 
				
			||||||
 | 
								watcher:       watch.NewFake(),
 | 
				
			||||||
 | 
								expectedError: fmt.Errorf("context canceled"),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "stop watcher",
 | 
				
			||||||
 | 
								ctx:  context.TODO(),
 | 
				
			||||||
 | 
								watcher: func() *watch.FakeWatcher {
 | 
				
			||||||
 | 
									w := watch.NewFake()
 | 
				
			||||||
 | 
									w.Stop()
 | 
				
			||||||
 | 
									return w
 | 
				
			||||||
 | 
								}(),
 | 
				
			||||||
 | 
								expectedError: fmt.Errorf("unexpected watch close"),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:          "stop on watch.Error",
 | 
				
			||||||
 | 
								ctx:           context.TODO(),
 | 
				
			||||||
 | 
								watcher:       watch.NewFake(),
 | 
				
			||||||
 | 
								watchEvents:   []watch.Event{{Type: watch.Error, Object: &apierrors.NewInternalError(fmt.Errorf("dummy errror")).ErrStatus}},
 | 
				
			||||||
 | 
								expectedError: fmt.Errorf("Internal error occurred: dummy errror"),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:          "incorrect watch type (Deleted)",
 | 
				
			||||||
 | 
								ctx:           context.TODO(),
 | 
				
			||||||
 | 
								watcher:       watch.NewFake(),
 | 
				
			||||||
 | 
								watchEvents:   []watch.Event{{Type: watch.Deleted, Object: makePod(1)}},
 | 
				
			||||||
 | 
								expectedError: fmt.Errorf("unexpected watch event .*, expected to only receive watch.Added and watch.Bookmark events"),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:          "incorrect watch type (Modified)",
 | 
				
			||||||
 | 
								ctx:           context.TODO(),
 | 
				
			||||||
 | 
								watcher:       watch.NewFake(),
 | 
				
			||||||
 | 
								watchEvents:   []watch.Event{{Type: watch.Modified, Object: makePod(1)}},
 | 
				
			||||||
 | 
								expectedError: fmt.Errorf("unexpected watch event .*, expected to only receive watch.Added and watch.Bookmark events"),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:          "unordered input returns an error",
 | 
				
			||||||
 | 
								ctx:           context.TODO(),
 | 
				
			||||||
 | 
								watcher:       watch.NewFake(),
 | 
				
			||||||
 | 
								watchEvents:   []watch.Event{{Type: watch.Added, Object: makePod(3)}, {Type: watch.Added, Object: makePod(1)}},
 | 
				
			||||||
 | 
								expectedError: fmt.Errorf("cannot add the obj .* with the key = ns/pod-1, as it violates the ordering guarantees provided by the watchlist feature in beta phase, lastInsertedKey was = ns/pod-3"),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, scenario := range scenarios {
 | 
				
			||||||
 | 
							t.Run(scenario.name, func(t *testing.T) {
 | 
				
			||||||
 | 
								target := &Request{}
 | 
				
			||||||
 | 
								go func(w *watch.FakeWatcher, watchEvents []watch.Event) {
 | 
				
			||||||
 | 
									for _, event := range watchEvents {
 | 
				
			||||||
 | 
										w.Action(event.Type, event.Object)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}(scenario.watcher, scenario.watchEvents)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								res := target.handleWatchList(scenario.ctx, scenario.watcher)
 | 
				
			||||||
 | 
								resErr := res.Into(nil)
 | 
				
			||||||
 | 
								if resErr == nil {
 | 
				
			||||||
 | 
									t.Fatal("expected to get an error, got nil")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								matched, err := regexp.MatchString(scenario.expectedError.Error(), resErr.Error())
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									t.Fatal(err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if !matched {
 | 
				
			||||||
 | 
									t.Fatalf("unexpected err = %v, expected = %v", resErr, scenario.expectedError)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if !scenario.watcher.IsStopped() {
 | 
				
			||||||
 | 
									t.Fatalf("the watcher wasn't stopped")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func makePod(rv uint64) *v1.Pod {
 | 
				
			||||||
 | 
						return &v1.Pod{
 | 
				
			||||||
 | 
							ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
								Name:            fmt.Sprintf("pod-%d", rv),
 | 
				
			||||||
 | 
								Namespace:       "ns",
 | 
				
			||||||
 | 
								ResourceVersion: fmt.Sprintf("%d", rv),
 | 
				
			||||||
 | 
								Annotations:     map[string]string{},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func makeNamespace(name string) *v1.Namespace {
 | 
				
			||||||
 | 
						return &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func makeBookmarkEvent(rv uint64) *v1.Pod {
 | 
				
			||||||
 | 
						return &v1.Pod{
 | 
				
			||||||
 | 
							ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
								ResourceVersion: fmt.Sprintf("%d", rv),
 | 
				
			||||||
 | 
								Annotations:     map[string]string{metav1.InitialEventsAnnotationKey: "true"},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type fakeObj struct {
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (f fakeObj) GetObjectKind() schema.ObjectKind {
 | 
				
			||||||
 | 
						return schema.EmptyObjectKind
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (f fakeObj) DeepCopyObject() runtime.Object {
 | 
				
			||||||
 | 
						return fakeObj{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user