dynamic client: add support for API streaming
This commit is contained in:
		| @@ -27,6 +27,8 @@ import ( | ||||
| 	"strings" | ||||
| 	"testing" | ||||
|  | ||||
| 	"github.com/google/go-cmp/cmp" | ||||
|  | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| @@ -34,6 +36,8 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/runtime/serializer/streaming" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/watch" | ||||
| 	clientfeatures "k8s.io/client-go/features" | ||||
| 	clientfeaturestesting "k8s.io/client-go/features/testing" | ||||
| 	restclient "k8s.io/client-go/rest" | ||||
| 	restclientwatch "k8s.io/client-go/rest/watch" | ||||
| ) | ||||
| @@ -148,6 +152,171 @@ func TestList(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestWatchList(t *testing.T) { | ||||
| 	clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true) | ||||
|  | ||||
| 	type requestParam struct { | ||||
| 		Path  string | ||||
| 		Query string | ||||
| 	} | ||||
|  | ||||
| 	scenarios := []struct { | ||||
| 		name          string | ||||
| 		namespace     string | ||||
| 		watchResponse []watch.Event | ||||
| 		listResponse  []byte | ||||
|  | ||||
| 		expectedRequestParams []requestParam | ||||
| 		expectedList          *unstructured.UnstructuredList | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name: "watch-list request for cluster wide resource", | ||||
| 			watchResponse: []watch.Event{ | ||||
| 				{Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "item1")}, | ||||
| 				{Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "item2")}, | ||||
| 				{Type: watch.Bookmark, Object: func() runtime.Object { | ||||
| 					obj := getObject("gtest/vTest", "rTest", "item2") | ||||
| 					obj.SetResourceVersion("10") | ||||
| 					obj.SetAnnotations(map[string]string{metav1.InitialEventsAnnotationKey: "true"}) | ||||
| 					return obj | ||||
| 				}()}, | ||||
| 			}, | ||||
| 			expectedRequestParams: []requestParam{ | ||||
| 				{ | ||||
| 					Path:  "/apis/gtest/vtest/rtest", | ||||
| 					Query: "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true", | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedList: &unstructured.UnstructuredList{ | ||||
| 				Object: map[string]interface{}{ | ||||
| 					"apiVersion": "", | ||||
| 					"kind":       "UnstructuredList", | ||||
| 					"metadata": map[string]interface{}{ | ||||
| 						"resourceVersion": "10", | ||||
| 					}, | ||||
| 				}, | ||||
| 				Items: []unstructured.Unstructured{ | ||||
| 					*getObject("gtest/vTest", "rTest", "item1"), | ||||
| 					*getObject("gtest/vTest", "rTest", "item2"), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:      "watch-list request for namespaced watch resource", | ||||
| 			namespace: "nstest", | ||||
| 			watchResponse: []watch.Event{ | ||||
| 				{Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "item1")}, | ||||
| 				{Type: watch.Bookmark, Object: func() runtime.Object { | ||||
| 					obj := getObject("gtest/vTest", "rTest", "item2") | ||||
| 					obj.SetResourceVersion("39") | ||||
| 					obj.SetAnnotations(map[string]string{metav1.InitialEventsAnnotationKey: "true"}) | ||||
| 					return obj | ||||
| 				}()}, | ||||
| 			}, | ||||
| 			expectedRequestParams: []requestParam{ | ||||
| 				{ | ||||
| 					Path:  "/apis/gtest/vtest/namespaces/nstest/rtest", | ||||
| 					Query: "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true", | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedList: &unstructured.UnstructuredList{ | ||||
| 				Object: map[string]interface{}{ | ||||
| 					"apiVersion": "", | ||||
| 					"kind":       "UnstructuredList", | ||||
| 					"metadata": map[string]interface{}{ | ||||
| 						"resourceVersion": "39", | ||||
| 					}, | ||||
| 				}, | ||||
| 				Items: []unstructured.Unstructured{ | ||||
| 					*getObject("gtest/vTest", "rTest", "item1"), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:      "watch-list request falls back to standard list on any error", | ||||
| 			namespace: "nstest", | ||||
| 			// watchList method in client-go expect only watch.Add and watch.Bookmark events | ||||
| 			// receiving watch.Error will cause this method to report an error which will | ||||
| 			// trigger the fallback logic | ||||
| 			watchResponse: []watch.Event{ | ||||
| 				{Type: watch.Error, Object: getObject("gtest/vTest", "rTest", "item1")}, | ||||
| 			}, | ||||
| 			listResponse: getListJSON("vTest", "UnstructuredList", | ||||
| 				getJSON("gtest/vTest", "rTest", "item1"), | ||||
| 				getJSON("gtest/vTest", "rTest", "item2")), | ||||
| 			expectedRequestParams: []requestParam{ | ||||
| 				// a watch-list request first | ||||
| 				{ | ||||
| 					Path:  "/apis/gtest/vtest/namespaces/nstest/rtest", | ||||
| 					Query: "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true", | ||||
| 				}, | ||||
| 				// a standard list request second | ||||
| 				{ | ||||
| 					Path: "/apis/gtest/vtest/namespaces/nstest/rtest", | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedList: &unstructured.UnstructuredList{ | ||||
| 				Object: map[string]interface{}{ | ||||
| 					"apiVersion": "vTest", | ||||
| 					"kind":       "UnstructuredList", | ||||
| 				}, | ||||
| 				Items: []unstructured.Unstructured{ | ||||
| 					*getObject("gtest/vTest", "rTest", "item1"), | ||||
| 					*getObject("gtest/vTest", "rTest", "item2"), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, scenario := range scenarios { | ||||
| 		t.Run(scenario.name, func(t *testing.T) { | ||||
| 			var actualRequestParams []requestParam | ||||
| 			resource := schema.GroupVersionResource{Group: "gtest", Version: "vtest", Resource: "rtest"} | ||||
| 			cl, srv, err := getClientServer(func(w http.ResponseWriter, r *http.Request) { | ||||
| 				if r.Method != "GET" { | ||||
| 					t.Errorf("unexpected HTTP method %s. expected GET", r.Method) | ||||
| 				} | ||||
| 				actualRequestParams = append(actualRequestParams, requestParam{ | ||||
| 					Path:  r.URL.Path, | ||||
| 					Query: r.URL.RawQuery, | ||||
| 				}) | ||||
|  | ||||
| 				w.Header().Set("Content-Type", runtime.ContentTypeJSON) | ||||
| 				// handle LIST response | ||||
| 				if len(scenario.listResponse) > 0 { | ||||
| 					if _, err := w.Write(scenario.listResponse); err != nil { | ||||
| 						t.Fatal(err) | ||||
| 					} | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 				// handle WATCH response | ||||
| 				enc := restclientwatch.NewEncoder(streaming.NewEncoder(w, unstructured.UnstructuredJSONScheme), unstructured.UnstructuredJSONScheme) | ||||
| 				for _, e := range scenario.watchResponse { | ||||
| 					if err := enc.Encode(&e); err != nil { | ||||
| 						t.Fatal(err) | ||||
| 					} | ||||
| 				} | ||||
| 			}) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("unexpected error when creating test client and server: %v", err) | ||||
| 			} | ||||
| 			defer srv.Close() | ||||
|  | ||||
| 			actualList, err := cl.Resource(resource).Namespace(scenario.namespace).List(context.TODO(), metav1.ListOptions{}) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("unexpected error: %v", err) | ||||
| 			} | ||||
|  | ||||
| 			if !cmp.Equal(scenario.expectedRequestParams, actualRequestParams) { | ||||
| 				t.Fatalf("unexpected request params: %v", cmp.Diff(scenario.expectedRequestParams, actualRequestParams)) | ||||
| 			} | ||||
| 			if !cmp.Equal(scenario.expectedList, actualList) { | ||||
| 				t.Errorf("received expected list, diff: %s", cmp.Diff(scenario.expectedList, actualList)) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestGet(t *testing.T) { | ||||
| 	tcs := []struct { | ||||
| 		resource    string | ||||
|   | ||||
| @@ -20,6 +20,7 @@ import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/apimachinery/pkg/api/meta" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| @@ -30,6 +31,8 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/watch" | ||||
| 	"k8s.io/client-go/rest" | ||||
| 	"k8s.io/client-go/util/consistencydetector" | ||||
| 	"k8s.io/client-go/util/watchlist" | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
|  | ||||
| type DynamicClient struct { | ||||
| @@ -293,13 +296,22 @@ func (c *dynamicResourceClient) Get(ctx context.Context, name string, opts metav | ||||
| 	return uncastObj.(*unstructured.Unstructured), nil | ||||
| } | ||||
|  | ||||
| func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (result *unstructured.UnstructuredList, err error) { | ||||
| 	defer func() { | ||||
| func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { | ||||
| 	if watchListOptions, hasWatchListOptionsPrepared, watchListOptionsErr := watchlist.PrepareWatchListOptionsFromListOptions(opts); watchListOptionsErr != nil { | ||||
| 		klog.Warningf("Failed preparing watchlist options for %v, falling back to the standard LIST semantics, err = %v", c.resource, watchListOptionsErr) | ||||
| 	} else if hasWatchListOptionsPrepared { | ||||
| 		result, err := c.watchList(ctx, watchListOptions) | ||||
| 		if err == nil { | ||||
| 			consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("list request for %v", c.resource), c.list, opts, result) | ||||
| 			consistencydetector.CheckWatchListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("watchlist request for %v", c.resource), c.list, opts, result) | ||||
| 			return result, nil | ||||
| 		} | ||||
| 	}() | ||||
| 	return c.list(ctx, opts) | ||||
| 		klog.Warningf("The watchlist request for %v ended with an error, falling back to the standard LIST semantics, err = %v", c.resource, err) | ||||
| 	} | ||||
| 	result, err := c.list(ctx, opts) | ||||
| 	if err == nil { | ||||
| 		consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("list request for %v", c.resource), c.list, opts, result) | ||||
| 	} | ||||
| 	return result, err | ||||
| } | ||||
|  | ||||
| func (c *dynamicResourceClient) list(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { | ||||
| @@ -329,6 +341,27 @@ func (c *dynamicResourceClient) list(ctx context.Context, opts metav1.ListOption | ||||
| 	return list, nil | ||||
| } | ||||
|  | ||||
| // watchList establishes a watch stream with the server and returns an unstructured list. | ||||
| func (c *dynamicResourceClient) watchList(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { | ||||
| 	if err := validateNamespaceWithOptionalName(c.namespace); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	var timeout time.Duration | ||||
| 	if opts.TimeoutSeconds != nil { | ||||
| 		timeout = time.Duration(*opts.TimeoutSeconds) * time.Second | ||||
| 	} | ||||
|  | ||||
| 	result := &unstructured.UnstructuredList{} | ||||
| 	err := c.client.client.Get().AbsPath(c.makeURLSegments("")...). | ||||
| 		SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1). | ||||
| 		Timeout(timeout). | ||||
| 		WatchList(ctx). | ||||
| 		Into(result) | ||||
|  | ||||
| 	return result, err | ||||
| } | ||||
|  | ||||
| func (c *dynamicResourceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { | ||||
| 	opts.Watch = true | ||||
| 	if err := validateNamespaceWithOptionalName(c.namespace); err != nil { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Lukasz Szaszkiewicz
					Lukasz Szaszkiewicz