diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go b/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go index 7bd1a3a6a5b..f46a24cc6c3 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go @@ -257,3 +257,26 @@ func (d WithoutVersionDecoder) Decode(data []byte, defaults *schema.GroupVersion } return obj, gvk, err } + +type encoderWithAllocator struct { + encoder EncoderWithAllocator + memAllocator MemoryAllocator +} + +// NewEncoderWithAllocator returns a new encoder +func NewEncoderWithAllocator(e EncoderWithAllocator, a MemoryAllocator) Encoder { + return &encoderWithAllocator{ + encoder: e, + memAllocator: a, + } +} + +// Encode writes the provided object to the nested writer +func (e *encoderWithAllocator) Encode(obj Object, w io.Writer) error { + return e.encoder.EncodeWithAllocator(obj, w, e.memAllocator) +} + +// Identifier returns identifier of this encoder. +func (e *encoderWithAllocator) Identifier() Identifier { + return e.encoder.Identifier() +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming/streaming.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming/streaming.go index 87b3fec3f2d..971c46d496a 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming/streaming.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming/streaming.go @@ -134,23 +134,3 @@ func (e *encoder) Encode(obj runtime.Object) error { e.buf.Reset() return err } - -type encoderWithAllocator struct { - writer io.Writer - encoder runtime.EncoderWithAllocator - memAllocator runtime.MemoryAllocator -} - -// NewEncoderWithAllocator returns a new streaming encoder -func NewEncoderWithAllocator(w io.Writer, e runtime.EncoderWithAllocator, a runtime.MemoryAllocator) Encoder { - return &encoderWithAllocator{ - writer: w, - encoder: e, - memAllocator: a, - } -} - -// Encode writes the provided object to the nested writer -func (e *encoderWithAllocator) Encode(obj runtime.Object) error { - return e.encoder.EncodeWithAllocator(obj, e.writer, e.memAllocator) -} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go index 4780c59fd42..02111d9b0e7 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go @@ -18,8 +18,11 @@ package handlers import ( "context" + "encoding/json" "fmt" + "io" "net/http" + "reflect" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -29,48 +32,119 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" + + klog "k8s.io/klog/v2" ) -// transformObject takes the object as returned by storage and ensures it is in -// the client's desired form, as well as ensuring any API level fields like self-link -// are properly set. -func transformObject(ctx context.Context, obj runtime.Object, opts interface{}, mediaType negotiation.MediaTypeOptions, scope *RequestScope, req *http.Request) (runtime.Object, error) { - if co, ok := obj.(runtime.CacheableObject); ok { - if mediaType.Convert != nil { - // Non-nil mediaType.Convert means that some conversion of the object - // has to happen. Currently conversion may potentially modify the - // object or assume something about it (e.g. asTable operates on - // reflection, which won't work for any wrapper). - // To ensure it will work correctly, let's operate on base objects - // and not cache it for now. - // - // TODO: Long-term, transformObject should be changed so that it - // implements runtime.Encoder interface. - return doTransformObject(ctx, co.GetObject(), opts, mediaType, scope, req) - } - } - return doTransformObject(ctx, obj, opts, mediaType, scope, req) +// watchEmbeddedEncoder performs encoding of the embedded object. +// +// NOTE: watchEmbeddedEncoder is NOT thread-safe. +type watchEmbeddedEncoder struct { + encoder runtime.Encoder + + ctx context.Context + + // target, if non-nil, configures transformation type. + // The other options are ignored if target is nil. + target *schema.GroupVersionKind + tableOptions *metav1.TableOptions + scope *RequestScope + + // identifier of the encoder, computed lazily + identifier runtime.Identifier } -func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}, mediaType negotiation.MediaTypeOptions, scope *RequestScope, req *http.Request) (runtime.Object, error) { +func newWatchEmbeddedEncoder(ctx context.Context, encoder runtime.Encoder, target *schema.GroupVersionKind, tableOptions *metav1.TableOptions, scope *RequestScope) *watchEmbeddedEncoder { + return &watchEmbeddedEncoder{ + encoder: encoder, + ctx: ctx, + target: target, + tableOptions: tableOptions, + scope: scope, + } +} + +// Encode implements runtime.Encoder interface. +func (e *watchEmbeddedEncoder) Encode(obj runtime.Object, w io.Writer) error { + if co, ok := obj.(runtime.CacheableObject); ok { + return co.CacheEncode(e.Identifier(), e.doEncode, w) + } + return e.doEncode(obj, w) +} + +func (e *watchEmbeddedEncoder) doEncode(obj runtime.Object, w io.Writer) error { + result, err := doTransformObject(e.ctx, obj, e.tableOptions, e.target, e.scope) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err)) + result = obj + } + + // When we are tranforming to a table, use the original table options when + // we should print headers only on the first object - headers should be + // omitted on subsequent events. + if e.tableOptions != nil && !e.tableOptions.NoHeaders { + e.tableOptions.NoHeaders = true + // With options change, we should recompute the identifier. + // Clearing this will trigger lazy recompute when needed. + e.identifier = "" + } + + return e.encoder.Encode(result, w) +} + +// Identifier implements runtime.Encoder interface. +func (e *watchEmbeddedEncoder) Identifier() runtime.Identifier { + if e.identifier == "" { + e.identifier = e.embeddedIdentifier() + } + return e.identifier +} + +type watchEmbeddedEncoderIdentifier struct { + Name string `json:"name,omitempty"` + Encoder string `json:"encoder,omitempty"` + Target string `json:"target,omitempty"` + Options metav1.TableOptions `json:"options,omitempty"` + NoHeaders bool `json:"noHeaders,omitempty"` +} + +func (e *watchEmbeddedEncoder) embeddedIdentifier() runtime.Identifier { + if e.target == nil { + // If no conversion is performed, we effective only use + // the embedded identifier. + return e.encoder.Identifier() + } + identifier := watchEmbeddedEncoderIdentifier{ + Name: "watch-embedded", + Encoder: string(e.encoder.Identifier()), + Target: e.target.String(), + } + if e.target.Kind == "Table" && e.tableOptions != nil { + identifier.Options = *e.tableOptions + identifier.NoHeaders = e.tableOptions.NoHeaders + } + + result, err := json.Marshal(identifier) + if err != nil { + klog.Fatalf("Failed marshaling identifier for watchEmbeddedEncoder: %v", err) + } + return runtime.Identifier(result) +} + +// doTransformResponseObject is used for handling all requests, including watch. +func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}, target *schema.GroupVersionKind, scope *RequestScope) (runtime.Object, error) { if _, ok := obj.(*metav1.Status); ok { return obj, nil } - // ensure that for empty lists we don't return items. - // This is safe to modify without deep-copying the object, as - // List objects themselves are never cached. - if meta.IsListType(obj) && meta.LenList(obj) == 0 { - if err := meta.SetList(obj, []runtime.Object{}); err != nil { - return nil, err - } - } - - switch target := mediaType.Convert; { + switch { case target == nil: + // If we ever change that from a no-op, the identifier of + // the watchEmbeddedEncoder has to be adjusted accordingly. return obj, nil case target.Kind == "PartialObjectMetadata": @@ -128,6 +202,7 @@ func targetEncodingForTransform(scope *RequestScope, mediaType negotiation.Media // transformResponseObject takes an object loaded from storage and performs any necessary transformations. // Will write the complete response object. +// transformResponseObject is used only for handling non-streaming requests. func transformResponseObject(ctx context.Context, scope *RequestScope, req *http.Request, w http.ResponseWriter, statusCode int, mediaType negotiation.MediaTypeOptions, result runtime.Object) { options, err := optionsForTransform(mediaType, req) if err != nil { @@ -135,9 +210,19 @@ func transformResponseObject(ctx context.Context, scope *RequestScope, req *http return } + // ensure that for empty lists we don't return items. + // This is safe to modify without deep-copying the object, as + // List objects themselves are never cached. + if meta.IsListType(result) && meta.LenList(result) == 0 { + if err := meta.SetList(result, []runtime.Object{}); err != nil { + scope.err(err, w, req) + return + } + } + var obj runtime.Object do := func() { - obj, err = transformObject(ctx, result, options, mediaType, scope, req) + obj, err = doTransformObject(ctx, result, options, mediaType.Convert, scope) } endpointsrequest.TrackTransformResponseObjectLatency(ctx, do) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go index 70ce483682e..69f9d44f328 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" - "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" ) @@ -64,7 +63,7 @@ func (m *mockCacheableObject) SetGroupVersionKind(gvk schema.GroupVersionKind) { // CacheEncode implements runtime.CacheableObject interface. func (m *mockCacheableObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error { - return fmt.Errorf("unimplemented") + return encode(m.obj.DeepCopyObject(), w) } // GetObject implements runtime.CacheableObject interface. @@ -78,6 +77,19 @@ func (*mockNamer) Namespace(_ *http.Request) (string, error) { return func (*mockNamer) Name(_ *http.Request) (string, string, error) { return "", "", nil } func (*mockNamer) ObjectName(_ runtime.Object) (string, string, error) { return "", "", nil } +type mockEncoder struct { + obj runtime.Object +} + +func (e *mockEncoder) Encode(obj runtime.Object, _ io.Writer) error { + e.obj = obj + return nil +} + +func (e *mockEncoder) Identifier() runtime.Identifier { + return runtime.Identifier("") +} + func TestCacheableObject(t *testing.T) { pomGVK := metav1.SchemeGroupVersion.WithKind("PartialObjectMetadata") tableGVK := metav1.SchemeGroupVersion.WithKind("Table") @@ -107,10 +119,10 @@ func TestCacheableObject(t *testing.T) { tableConvertor := rest.NewDefaultTableConvertor(examplev1.Resource("Pod")) testCases := []struct { - desc string - object runtime.Object - opts *metav1beta1.TableOptions - mediaType negotiation.MediaTypeOptions + desc string + object runtime.Object + opts *metav1beta1.TableOptions + target *schema.GroupVersionKind expectedUnwrap bool expectedObj runtime.Object @@ -125,14 +137,14 @@ func TestCacheableObject(t *testing.T) { { desc: "cacheableObject nil convert", object: &mockCacheableObject{obj: pod}, - mediaType: negotiation.MediaTypeOptions{}, - expectedObj: &mockCacheableObject{obj: pod}, + target: nil, + expectedObj: pod, expectedErr: nil, }, { desc: "cacheableObject as PartialObjectMeta", object: &mockCacheableObject{obj: pod}, - mediaType: negotiation.MediaTypeOptions{Convert: &pomGVK}, + target: &pomGVK, expectedObj: podMeta, expectedErr: nil, }, @@ -140,7 +152,7 @@ func TestCacheableObject(t *testing.T) { desc: "cacheableObject as Table", object: &mockCacheableObject{obj: pod}, opts: &metav1beta1.TableOptions{NoHeaders: true, IncludeObject: metav1.IncludeNone}, - mediaType: negotiation.MediaTypeOptions{Convert: &tableGVK}, + target: &tableGVK, expectedObj: podTable, expectedErr: nil, }, @@ -148,20 +160,22 @@ func TestCacheableObject(t *testing.T) { for _, test := range testCases { t.Run(test.desc, func(t *testing.T) { - result, err := transformObject( + internalEncoder := &mockEncoder{} + watchEncoder := newWatchEmbeddedEncoder( request.WithRequestInfo(context.TODO(), &request.RequestInfo{}), - test.object, test.opts, test.mediaType, + internalEncoder, test.target, test.opts, &RequestScope{ Namer: &mockNamer{}, TableConvertor: tableConvertor, }, - nil) + ) + err := watchEncoder.Encode(test.object, nil) if err != test.expectedErr { t.Errorf("unexpected error: %v, expected: %v", err, test.expectedErr) } - if a, e := result, test.expectedObj; !reflect.DeepEqual(a, e) { - t.Errorf("unexpected result: %v, expected: %v", a, e) + if a, e := internalEncoder.obj, test.expectedObj; !reflect.DeepEqual(a, e) { + t.Errorf("unexpected result: %#v, expected: %#v", a, e) } }) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index 79cb11ca600..d15819f1145 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -19,9 +19,7 @@ package handlers import ( "bytes" "fmt" - "io" "net/http" - "reflect" "time" "golang.org/x/net/websocket" @@ -92,6 +90,8 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n mediaType += ";stream=watch" } + ctx := req.Context() + // locate the appropriate embedded encoder based on the transform var embeddedEncoder runtime.Encoder contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req) @@ -106,13 +106,41 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion()) } + var memoryAllocator runtime.MemoryAllocator + + if encoderWithAllocator, supportsAllocator := embeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator { + // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. + // instead, we allocate the buffer for the entire watch session and release it when we close the connection. + memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) + defer runtime.AllocatorPool.Put(memoryAllocator) + embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator) + } + var tableOptions *metav1.TableOptions + if options != nil { + if passedOptions, ok := options.(*metav1.TableOptions); ok { + tableOptions = passedOptions + } else { + scope.err(fmt.Errorf("unexpected options type: %T", options), w, req) + return + } + } + embeddedEncoder = newWatchEmbeddedEncoder(ctx, embeddedEncoder, mediaTypeOptions.Convert, tableOptions, scope) + + if encoderWithAllocator, supportsAllocator := encoder.(runtime.EncoderWithAllocator); supportsAllocator { + if memoryAllocator == nil { + // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. + // instead, we allocate the buffer for the entire watch session and release it when we close the connection. + memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) + defer runtime.AllocatorPool.Put(memoryAllocator) + } + encoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator) + } + var serverShuttingDownCh <-chan struct{} if signals := apirequest.ServerShutdownSignalFrom(req.Context()); signals != nil { serverShuttingDownCh = signals.ShuttingDown() } - ctx := req.Context() - server := &WatchServer{ Watching: watcher, Scope: scope, @@ -123,21 +151,6 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n Encoder: encoder, EmbeddedEncoder: embeddedEncoder, - Fixup: func(obj runtime.Object) runtime.Object { - result, err := transformObject(ctx, obj, options, mediaTypeOptions, scope, req) - if err != nil { - utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err)) - return obj - } - // When we are transformed to a table, use the table options as the state for whether we - // should print headers - on watch, we only want to print table headers on the first object - // and omit them on subsequent events. - if tableOptions, ok := options.(*metav1.TableOptions); ok { - tableOptions.NoHeaders = true - } - return result - }, - TimeoutFactory: &realTimeoutFactory{timeout}, ServerShuttingDownCh: serverShuttingDownCh, } @@ -160,8 +173,6 @@ type WatchServer struct { Encoder runtime.Encoder // used to encode the nested object in the watch stream EmbeddedEncoder runtime.Encoder - // used to correct the object before we send it to the serializer - Fixup func(runtime.Object) runtime.Object TimeoutFactory TimeoutFactory ServerShuttingDownCh <-chan struct{} @@ -196,15 +207,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { } var e streaming.Encoder - var memoryAllocator runtime.MemoryAllocator - - if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator { - memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) - defer runtime.AllocatorPool.Put(memoryAllocator) - e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator) - } else { - e = streaming.NewEncoder(framer, s.Encoder) - } + e = streaming.NewEncoder(framer, s.Encoder) // ensure the connection times out timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh() @@ -223,19 +226,6 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { ch := s.Watching.ResultChan() done := req.Context().Done() - embeddedEncodeFn := s.EmbeddedEncoder.Encode - if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator { - if memoryAllocator == nil { - // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. - // instead, we allocate the buffer for the entire watch session and release it when we close the connection. - memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) - defer runtime.AllocatorPool.Put(memoryAllocator) - } - embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error { - return encoder.EncodeWithAllocator(obj, w, memoryAllocator) - } - } - for { select { case <-s.ServerShuttingDownCh: @@ -258,10 +248,9 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { } metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() - obj := s.Fixup(event.Object) - if err := embeddedEncodeFn(obj, buf); err != nil { + if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil { // unexpected error - utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", event.Object, err)) return } @@ -326,10 +315,10 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) { // End of results. return } - obj := s.Fixup(event.Object) - if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil { + + if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil { // unexpected error - utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", event.Object, err)) return } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/watch_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/watch_test.go index e3d650a94a0..e18e0c4f0a7 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/watch_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/watch_test.go @@ -647,7 +647,6 @@ func TestWatchHTTPErrors(t *testing.T) { Encoder: newCodec, EmbeddedEncoder: newCodec, - Fixup: func(obj runtime.Object) runtime.Object { return obj }, TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done}, } @@ -712,7 +711,6 @@ func TestWatchHTTPErrorsBeforeServe(t *testing.T) { Encoder: newCodec, EmbeddedEncoder: newCodec, - Fixup: func(obj runtime.Object) runtime.Object { return obj }, TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done}, } @@ -771,7 +769,6 @@ func TestWatchHTTPDynamicClientErrors(t *testing.T) { Encoder: newCodec, EmbeddedEncoder: newCodec, - Fixup: func(obj runtime.Object) runtime.Object { return obj }, TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done}, } @@ -814,7 +811,6 @@ func TestWatchHTTPTimeout(t *testing.T) { Encoder: newCodec, EmbeddedEncoder: newCodec, - Fixup: func(obj runtime.Object) runtime.Object { return obj }, TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done}, } diff --git a/test/integration/apiserver/apiserver_test.go b/test/integration/apiserver/apiserver_test.go index 13a46da2cb5..816220ee3e9 100644 --- a/test/integration/apiserver/apiserver_test.go +++ b/test/integration/apiserver/apiserver_test.go @@ -37,6 +37,7 @@ import ( apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apiextensions-apiserver/test/integration/fixtures" + apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" @@ -2307,6 +2308,142 @@ func TestTransform(t *testing.T) { } } +func TestWatchTransformCaching(t *testing.T) { + ctx, clientSet, _, tearDownFn := setup(t) + defer tearDownFn() + + ns := framework.CreateNamespaceOrDie(clientSet, "watch-transform", t) + defer framework.DeleteNamespaceOrDie(clientSet, ns, t) + + list, err := clientSet.CoreV1().ConfigMaps(ns.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list objects: %v", err) + } + + timeout := 30 * time.Second + listOptions := &metav1.ListOptions{ + ResourceVersion: list.ResourceVersion, + Watch: true, + } + + wMeta, err := clientSet.CoreV1().RESTClient().Get(). + AbsPath("/api/v1/namespaces/watch-transform/configmaps"). + SetHeader("Accept", "application/vnd.kubernetes.protobuf;as=PartialObjectMetadata;g=meta.k8s.io;v=v1beta1"). + VersionedParams(listOptions, metav1.ParameterCodec). + Timeout(timeout). + Stream(ctx) + if err != nil { + t.Fatalf("Failed to start meta watch: %v", err) + } + defer wMeta.Close() + + wTableIncludeMeta, err := clientSet.CoreV1().RESTClient().Get(). + AbsPath("/api/v1/namespaces/watch-transform/configmaps"). + SetHeader("Accept", "application/json;as=Table;g=meta.k8s.io;v=v1beta1"). + VersionedParams(listOptions, metav1.ParameterCodec). + Param("includeObject", string(metav1.IncludeMetadata)). + Timeout(timeout). + Stream(ctx) + if err != nil { + t.Fatalf("Failed to start table meta watch: %v", err) + } + defer wTableIncludeMeta.Close() + + wTableIncludeObject, err := clientSet.CoreV1().RESTClient().Get(). + AbsPath("/api/v1/namespaces/watch-transform/configmaps"). + SetHeader("Accept", "application/json;as=Table;g=meta.k8s.io;v=v1beta1"). + VersionedParams(listOptions, metav1.ParameterCodec). + Param("includeObject", string(metav1.IncludeObject)). + Timeout(timeout). + Stream(ctx) + if err != nil { + t.Fatalf("Failed to start table object watch: %v", err) + } + defer wTableIncludeObject.Close() + + configMap, err := clientSet.CoreV1().ConfigMaps("watch-transform").Create(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test1"}, + Data: map[string]string{ + "foo": "bar", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create a configMap: %v", err) + } + + listOptionsDelayed := &metav1.ListOptions{ + ResourceVersion: configMap.ResourceVersion, + Watch: true, + } + wTableIncludeObjectDelayed, err := clientSet.CoreV1().RESTClient().Get(). + AbsPath("/api/v1/namespaces/watch-transform/configmaps"). + SetHeader("Accept", "application/json;as=Table;g=meta.k8s.io;v=v1beta1"). + VersionedParams(listOptionsDelayed, metav1.ParameterCodec). + Param("includeObject", string(metav1.IncludeObject)). + Timeout(timeout). + Stream(ctx) + if err != nil { + t.Fatalf("Failed to start table object watch: %v", err) + } + defer wTableIncludeObjectDelayed.Close() + + configMap2, err := clientSet.CoreV1().ConfigMaps("watch-transform").Create(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test2"}, + Data: map[string]string{ + "foo": "bar", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create a second configMap: %v", err) + } + + metaChecks := []partialObjectMetadataCheck{ + func(res *metav1beta1.PartialObjectMetadata) { + if !apiequality.Semantic.DeepEqual(configMap.ObjectMeta, res.ObjectMeta) { + t.Errorf("expected object: %#v, got: %#v", configMap.ObjectMeta, res.ObjectMeta) + } + }, + func(res *metav1beta1.PartialObjectMetadata) { + if !apiequality.Semantic.DeepEqual(configMap2.ObjectMeta, res.ObjectMeta) { + t.Errorf("expected object: %#v, got: %#v", configMap2.ObjectMeta, res.ObjectMeta) + } + }, + } + expectPartialObjectMetaEventsProtobufChecks(t, wMeta, metaChecks) + + tableMetaCheck := func(expected *v1.ConfigMap, got []byte) { + var obj metav1.PartialObjectMetadata + if err := json.Unmarshal(got, &obj); err != nil { + t.Fatal(err) + } + if !apiequality.Semantic.DeepEqual(expected.ObjectMeta, obj.ObjectMeta) { + t.Errorf("expected object: %#v, got: %#v", expected, obj) + } + } + + objectMetas := expectTableWatchEvents(t, 2, 3, metav1.IncludeMetadata, json.NewDecoder(wTableIncludeMeta)) + tableMetaCheck(configMap, objectMetas[0]) + tableMetaCheck(configMap2, objectMetas[1]) + + tableObjectCheck := func(expected *v1.ConfigMap, got []byte) { + var obj *v1.ConfigMap + if err := json.Unmarshal(got, &obj); err != nil { + t.Fatal(err) + } + obj.TypeMeta = metav1.TypeMeta{} + if !apiequality.Semantic.DeepEqual(expected, obj) { + t.Errorf("expected object: %#v, got: %#v", expected, obj) + } + } + + objects := expectTableWatchEvents(t, 2, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObject)) + tableObjectCheck(configMap, objects[0]) + tableObjectCheck(configMap2, objects[1]) + + delayedObjects := expectTableWatchEvents(t, 1, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObjectDelayed)) + tableObjectCheck(configMap2, delayedObjects[0]) +} + func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.IncludeObjectPolicy, d *json.Decoder) [][]byte { t.Helper() @@ -2347,6 +2484,7 @@ func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.Incl if meta.TypeMeta != partialObj { t.Fatalf("expected partial object: %#v", meta) } + objects = append(objects, row.Object.Raw) case metav1.IncludeNone: if len(row.Object.Raw) != 0 { t.Fatalf("Expected no object: %s", string(row.Object.Raw)) @@ -2383,7 +2521,22 @@ func expectPartialObjectMetaEvents(t *testing.T, d *json.Decoder, values ...stri } } +type partialObjectMetadataCheck func(*metav1beta1.PartialObjectMetadata) + func expectPartialObjectMetaEventsProtobuf(t *testing.T, r io.Reader, values ...string) { + checks := []partialObjectMetadataCheck{} + for i, value := range values { + i, value := i, value + checks = append(checks, func(meta *metav1beta1.PartialObjectMetadata) { + if meta.Annotations["test"] != value { + t.Fatalf("expected event %d to have value %q instead of %q", i+1, value, meta.Annotations["test"]) + } + }) + } + expectPartialObjectMetaEventsProtobufChecks(t, r, checks) +} + +func expectPartialObjectMetaEventsProtobufChecks(t *testing.T, r io.Reader, checks []partialObjectMetadataCheck) { scheme := runtime.NewScheme() metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"}) rs := protobuf.NewRawSerializer(scheme, scheme) @@ -2393,7 +2546,7 @@ func expectPartialObjectMetaEventsProtobuf(t *testing.T, r io.Reader, values ... ) ds := metainternalversionscheme.Codecs.UniversalDeserializer() - for i, value := range values { + for _, check := range checks { var evt metav1.WatchEvent if _, _, err := d.Decode(nil, &evt); err != nil { t.Fatal(err) @@ -2410,9 +2563,7 @@ func expectPartialObjectMetaEventsProtobuf(t *testing.T, r io.Reader, values ... if !reflect.DeepEqual(expected, gvk) { t.Fatalf("expected partial object: %#v", meta) } - if meta.Annotations["test"] != value { - t.Fatalf("expected event %d to have value %q instead of %q", i+1, value, meta.Annotations["test"]) - } + check(meta) } }