Merge pull request #119801 from wojtek-t/refactor_watch_encoders
Refactor apiserver endpoint transformers to more natively use Encoders
This commit is contained in:
commit
def694bbe0
@ -257,3 +257,26 @@ func (d WithoutVersionDecoder) Decode(data []byte, defaults *schema.GroupVersion
|
|||||||
}
|
}
|
||||||
return obj, gvk, err
|
return obj, gvk, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type encoderWithAllocator struct {
|
||||||
|
encoder EncoderWithAllocator
|
||||||
|
memAllocator MemoryAllocator
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEncoderWithAllocator returns a new encoder
|
||||||
|
func NewEncoderWithAllocator(e EncoderWithAllocator, a MemoryAllocator) Encoder {
|
||||||
|
return &encoderWithAllocator{
|
||||||
|
encoder: e,
|
||||||
|
memAllocator: a,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode writes the provided object to the nested writer
|
||||||
|
func (e *encoderWithAllocator) Encode(obj Object, w io.Writer) error {
|
||||||
|
return e.encoder.EncodeWithAllocator(obj, w, e.memAllocator)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Identifier returns identifier of this encoder.
|
||||||
|
func (e *encoderWithAllocator) Identifier() Identifier {
|
||||||
|
return e.encoder.Identifier()
|
||||||
|
}
|
||||||
|
@ -134,23 +134,3 @@ func (e *encoder) Encode(obj runtime.Object) error {
|
|||||||
e.buf.Reset()
|
e.buf.Reset()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
type encoderWithAllocator struct {
|
|
||||||
writer io.Writer
|
|
||||||
encoder runtime.EncoderWithAllocator
|
|
||||||
memAllocator runtime.MemoryAllocator
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewEncoderWithAllocator returns a new streaming encoder
|
|
||||||
func NewEncoderWithAllocator(w io.Writer, e runtime.EncoderWithAllocator, a runtime.MemoryAllocator) Encoder {
|
|
||||||
return &encoderWithAllocator{
|
|
||||||
writer: w,
|
|
||||||
encoder: e,
|
|
||||||
memAllocator: a,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode writes the provided object to the nested writer
|
|
||||||
func (e *encoderWithAllocator) Encode(obj runtime.Object) error {
|
|
||||||
return e.encoder.EncodeWithAllocator(obj, e.writer, e.memAllocator)
|
|
||||||
}
|
|
||||||
|
@ -18,8 +18,11 @@ package handlers
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
@ -29,48 +32,119 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation"
|
"k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||||
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
|
||||||
|
klog "k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// transformObject takes the object as returned by storage and ensures it is in
|
// watchEmbeddedEncoder performs encoding of the embedded object.
|
||||||
// the client's desired form, as well as ensuring any API level fields like self-link
|
//
|
||||||
// are properly set.
|
// NOTE: watchEmbeddedEncoder is NOT thread-safe.
|
||||||
func transformObject(ctx context.Context, obj runtime.Object, opts interface{}, mediaType negotiation.MediaTypeOptions, scope *RequestScope, req *http.Request) (runtime.Object, error) {
|
type watchEmbeddedEncoder struct {
|
||||||
if co, ok := obj.(runtime.CacheableObject); ok {
|
encoder runtime.Encoder
|
||||||
if mediaType.Convert != nil {
|
|
||||||
// Non-nil mediaType.Convert means that some conversion of the object
|
ctx context.Context
|
||||||
// has to happen. Currently conversion may potentially modify the
|
|
||||||
// object or assume something about it (e.g. asTable operates on
|
// target, if non-nil, configures transformation type.
|
||||||
// reflection, which won't work for any wrapper).
|
// The other options are ignored if target is nil.
|
||||||
// To ensure it will work correctly, let's operate on base objects
|
target *schema.GroupVersionKind
|
||||||
// and not cache it for now.
|
tableOptions *metav1.TableOptions
|
||||||
//
|
scope *RequestScope
|
||||||
// TODO: Long-term, transformObject should be changed so that it
|
|
||||||
// implements runtime.Encoder interface.
|
// identifier of the encoder, computed lazily
|
||||||
return doTransformObject(ctx, co.GetObject(), opts, mediaType, scope, req)
|
identifier runtime.Identifier
|
||||||
}
|
|
||||||
}
|
|
||||||
return doTransformObject(ctx, obj, opts, mediaType, scope, req)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}, mediaType negotiation.MediaTypeOptions, scope *RequestScope, req *http.Request) (runtime.Object, error) {
|
func newWatchEmbeddedEncoder(ctx context.Context, encoder runtime.Encoder, target *schema.GroupVersionKind, tableOptions *metav1.TableOptions, scope *RequestScope) *watchEmbeddedEncoder {
|
||||||
|
return &watchEmbeddedEncoder{
|
||||||
|
encoder: encoder,
|
||||||
|
ctx: ctx,
|
||||||
|
target: target,
|
||||||
|
tableOptions: tableOptions,
|
||||||
|
scope: scope,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode implements runtime.Encoder interface.
|
||||||
|
func (e *watchEmbeddedEncoder) Encode(obj runtime.Object, w io.Writer) error {
|
||||||
|
if co, ok := obj.(runtime.CacheableObject); ok {
|
||||||
|
return co.CacheEncode(e.Identifier(), e.doEncode, w)
|
||||||
|
}
|
||||||
|
return e.doEncode(obj, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *watchEmbeddedEncoder) doEncode(obj runtime.Object, w io.Writer) error {
|
||||||
|
result, err := doTransformObject(e.ctx, obj, e.tableOptions, e.target, e.scope)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
|
||||||
|
result = obj
|
||||||
|
}
|
||||||
|
|
||||||
|
// When we are tranforming to a table, use the original table options when
|
||||||
|
// we should print headers only on the first object - headers should be
|
||||||
|
// omitted on subsequent events.
|
||||||
|
if e.tableOptions != nil && !e.tableOptions.NoHeaders {
|
||||||
|
e.tableOptions.NoHeaders = true
|
||||||
|
// With options change, we should recompute the identifier.
|
||||||
|
// Clearing this will trigger lazy recompute when needed.
|
||||||
|
e.identifier = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.encoder.Encode(result, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Identifier implements runtime.Encoder interface.
|
||||||
|
func (e *watchEmbeddedEncoder) Identifier() runtime.Identifier {
|
||||||
|
if e.identifier == "" {
|
||||||
|
e.identifier = e.embeddedIdentifier()
|
||||||
|
}
|
||||||
|
return e.identifier
|
||||||
|
}
|
||||||
|
|
||||||
|
type watchEmbeddedEncoderIdentifier struct {
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
Encoder string `json:"encoder,omitempty"`
|
||||||
|
Target string `json:"target,omitempty"`
|
||||||
|
Options metav1.TableOptions `json:"options,omitempty"`
|
||||||
|
NoHeaders bool `json:"noHeaders,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *watchEmbeddedEncoder) embeddedIdentifier() runtime.Identifier {
|
||||||
|
if e.target == nil {
|
||||||
|
// If no conversion is performed, we effective only use
|
||||||
|
// the embedded identifier.
|
||||||
|
return e.encoder.Identifier()
|
||||||
|
}
|
||||||
|
identifier := watchEmbeddedEncoderIdentifier{
|
||||||
|
Name: "watch-embedded",
|
||||||
|
Encoder: string(e.encoder.Identifier()),
|
||||||
|
Target: e.target.String(),
|
||||||
|
}
|
||||||
|
if e.target.Kind == "Table" && e.tableOptions != nil {
|
||||||
|
identifier.Options = *e.tableOptions
|
||||||
|
identifier.NoHeaders = e.tableOptions.NoHeaders
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := json.Marshal(identifier)
|
||||||
|
if err != nil {
|
||||||
|
klog.Fatalf("Failed marshaling identifier for watchEmbeddedEncoder: %v", err)
|
||||||
|
}
|
||||||
|
return runtime.Identifier(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// doTransformResponseObject is used for handling all requests, including watch.
|
||||||
|
func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}, target *schema.GroupVersionKind, scope *RequestScope) (runtime.Object, error) {
|
||||||
if _, ok := obj.(*metav1.Status); ok {
|
if _, ok := obj.(*metav1.Status); ok {
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure that for empty lists we don't return <nil> items.
|
switch {
|
||||||
// This is safe to modify without deep-copying the object, as
|
|
||||||
// List objects themselves are never cached.
|
|
||||||
if meta.IsListType(obj) && meta.LenList(obj) == 0 {
|
|
||||||
if err := meta.SetList(obj, []runtime.Object{}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
switch target := mediaType.Convert; {
|
|
||||||
case target == nil:
|
case target == nil:
|
||||||
|
// If we ever change that from a no-op, the identifier of
|
||||||
|
// the watchEmbeddedEncoder has to be adjusted accordingly.
|
||||||
return obj, nil
|
return obj, nil
|
||||||
|
|
||||||
case target.Kind == "PartialObjectMetadata":
|
case target.Kind == "PartialObjectMetadata":
|
||||||
@ -128,6 +202,7 @@ func targetEncodingForTransform(scope *RequestScope, mediaType negotiation.Media
|
|||||||
|
|
||||||
// transformResponseObject takes an object loaded from storage and performs any necessary transformations.
|
// transformResponseObject takes an object loaded from storage and performs any necessary transformations.
|
||||||
// Will write the complete response object.
|
// Will write the complete response object.
|
||||||
|
// transformResponseObject is used only for handling non-streaming requests.
|
||||||
func transformResponseObject(ctx context.Context, scope *RequestScope, req *http.Request, w http.ResponseWriter, statusCode int, mediaType negotiation.MediaTypeOptions, result runtime.Object) {
|
func transformResponseObject(ctx context.Context, scope *RequestScope, req *http.Request, w http.ResponseWriter, statusCode int, mediaType negotiation.MediaTypeOptions, result runtime.Object) {
|
||||||
options, err := optionsForTransform(mediaType, req)
|
options, err := optionsForTransform(mediaType, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -135,9 +210,19 @@ func transformResponseObject(ctx context.Context, scope *RequestScope, req *http
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ensure that for empty lists we don't return <nil> items.
|
||||||
|
// This is safe to modify without deep-copying the object, as
|
||||||
|
// List objects themselves are never cached.
|
||||||
|
if meta.IsListType(result) && meta.LenList(result) == 0 {
|
||||||
|
if err := meta.SetList(result, []runtime.Object{}); err != nil {
|
||||||
|
scope.err(err, w, req)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var obj runtime.Object
|
var obj runtime.Object
|
||||||
do := func() {
|
do := func() {
|
||||||
obj, err = transformObject(ctx, result, options, mediaType, scope, req)
|
obj, err = doTransformObject(ctx, result, options, mediaType.Convert, scope)
|
||||||
}
|
}
|
||||||
endpointsrequest.TrackTransformResponseObjectLatency(ctx, do)
|
endpointsrequest.TrackTransformResponseObjectLatency(ctx, do)
|
||||||
|
|
||||||
|
@ -30,7 +30,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
)
|
)
|
||||||
@ -64,7 +63,7 @@ func (m *mockCacheableObject) SetGroupVersionKind(gvk schema.GroupVersionKind) {
|
|||||||
|
|
||||||
// CacheEncode implements runtime.CacheableObject interface.
|
// CacheEncode implements runtime.CacheableObject interface.
|
||||||
func (m *mockCacheableObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error {
|
func (m *mockCacheableObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error {
|
||||||
return fmt.Errorf("unimplemented")
|
return encode(m.obj.DeepCopyObject(), w)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObject implements runtime.CacheableObject interface.
|
// GetObject implements runtime.CacheableObject interface.
|
||||||
@ -78,6 +77,19 @@ func (*mockNamer) Namespace(_ *http.Request) (string, error) { return
|
|||||||
func (*mockNamer) Name(_ *http.Request) (string, string, error) { return "", "", nil }
|
func (*mockNamer) Name(_ *http.Request) (string, string, error) { return "", "", nil }
|
||||||
func (*mockNamer) ObjectName(_ runtime.Object) (string, string, error) { return "", "", nil }
|
func (*mockNamer) ObjectName(_ runtime.Object) (string, string, error) { return "", "", nil }
|
||||||
|
|
||||||
|
type mockEncoder struct {
|
||||||
|
obj runtime.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *mockEncoder) Encode(obj runtime.Object, _ io.Writer) error {
|
||||||
|
e.obj = obj
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *mockEncoder) Identifier() runtime.Identifier {
|
||||||
|
return runtime.Identifier("")
|
||||||
|
}
|
||||||
|
|
||||||
func TestCacheableObject(t *testing.T) {
|
func TestCacheableObject(t *testing.T) {
|
||||||
pomGVK := metav1.SchemeGroupVersion.WithKind("PartialObjectMetadata")
|
pomGVK := metav1.SchemeGroupVersion.WithKind("PartialObjectMetadata")
|
||||||
tableGVK := metav1.SchemeGroupVersion.WithKind("Table")
|
tableGVK := metav1.SchemeGroupVersion.WithKind("Table")
|
||||||
@ -107,10 +119,10 @@ func TestCacheableObject(t *testing.T) {
|
|||||||
tableConvertor := rest.NewDefaultTableConvertor(examplev1.Resource("Pod"))
|
tableConvertor := rest.NewDefaultTableConvertor(examplev1.Resource("Pod"))
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
desc string
|
desc string
|
||||||
object runtime.Object
|
object runtime.Object
|
||||||
opts *metav1beta1.TableOptions
|
opts *metav1beta1.TableOptions
|
||||||
mediaType negotiation.MediaTypeOptions
|
target *schema.GroupVersionKind
|
||||||
|
|
||||||
expectedUnwrap bool
|
expectedUnwrap bool
|
||||||
expectedObj runtime.Object
|
expectedObj runtime.Object
|
||||||
@ -125,14 +137,14 @@ func TestCacheableObject(t *testing.T) {
|
|||||||
{
|
{
|
||||||
desc: "cacheableObject nil convert",
|
desc: "cacheableObject nil convert",
|
||||||
object: &mockCacheableObject{obj: pod},
|
object: &mockCacheableObject{obj: pod},
|
||||||
mediaType: negotiation.MediaTypeOptions{},
|
target: nil,
|
||||||
expectedObj: &mockCacheableObject{obj: pod},
|
expectedObj: pod,
|
||||||
expectedErr: nil,
|
expectedErr: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "cacheableObject as PartialObjectMeta",
|
desc: "cacheableObject as PartialObjectMeta",
|
||||||
object: &mockCacheableObject{obj: pod},
|
object: &mockCacheableObject{obj: pod},
|
||||||
mediaType: negotiation.MediaTypeOptions{Convert: &pomGVK},
|
target: &pomGVK,
|
||||||
expectedObj: podMeta,
|
expectedObj: podMeta,
|
||||||
expectedErr: nil,
|
expectedErr: nil,
|
||||||
},
|
},
|
||||||
@ -140,7 +152,7 @@ func TestCacheableObject(t *testing.T) {
|
|||||||
desc: "cacheableObject as Table",
|
desc: "cacheableObject as Table",
|
||||||
object: &mockCacheableObject{obj: pod},
|
object: &mockCacheableObject{obj: pod},
|
||||||
opts: &metav1beta1.TableOptions{NoHeaders: true, IncludeObject: metav1.IncludeNone},
|
opts: &metav1beta1.TableOptions{NoHeaders: true, IncludeObject: metav1.IncludeNone},
|
||||||
mediaType: negotiation.MediaTypeOptions{Convert: &tableGVK},
|
target: &tableGVK,
|
||||||
expectedObj: podTable,
|
expectedObj: podTable,
|
||||||
expectedErr: nil,
|
expectedErr: nil,
|
||||||
},
|
},
|
||||||
@ -148,20 +160,22 @@ func TestCacheableObject(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range testCases {
|
for _, test := range testCases {
|
||||||
t.Run(test.desc, func(t *testing.T) {
|
t.Run(test.desc, func(t *testing.T) {
|
||||||
result, err := transformObject(
|
internalEncoder := &mockEncoder{}
|
||||||
|
watchEncoder := newWatchEmbeddedEncoder(
|
||||||
request.WithRequestInfo(context.TODO(), &request.RequestInfo{}),
|
request.WithRequestInfo(context.TODO(), &request.RequestInfo{}),
|
||||||
test.object, test.opts, test.mediaType,
|
internalEncoder, test.target, test.opts,
|
||||||
&RequestScope{
|
&RequestScope{
|
||||||
Namer: &mockNamer{},
|
Namer: &mockNamer{},
|
||||||
TableConvertor: tableConvertor,
|
TableConvertor: tableConvertor,
|
||||||
},
|
},
|
||||||
nil)
|
)
|
||||||
|
|
||||||
|
err := watchEncoder.Encode(test.object, nil)
|
||||||
if err != test.expectedErr {
|
if err != test.expectedErr {
|
||||||
t.Errorf("unexpected error: %v, expected: %v", err, test.expectedErr)
|
t.Errorf("unexpected error: %v, expected: %v", err, test.expectedErr)
|
||||||
}
|
}
|
||||||
if a, e := result, test.expectedObj; !reflect.DeepEqual(a, e) {
|
if a, e := internalEncoder.obj, test.expectedObj; !reflect.DeepEqual(a, e) {
|
||||||
t.Errorf("unexpected result: %v, expected: %v", a, e)
|
t.Errorf("unexpected result: %#v, expected: %#v", a, e)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -19,9 +19,7 @@ package handlers
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
@ -92,6 +90,8 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
|
|||||||
mediaType += ";stream=watch"
|
mediaType += ";stream=watch"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx := req.Context()
|
||||||
|
|
||||||
// locate the appropriate embedded encoder based on the transform
|
// locate the appropriate embedded encoder based on the transform
|
||||||
var embeddedEncoder runtime.Encoder
|
var embeddedEncoder runtime.Encoder
|
||||||
contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req)
|
contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req)
|
||||||
@ -106,13 +106,41 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
|
|||||||
embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
|
embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var memoryAllocator runtime.MemoryAllocator
|
||||||
|
|
||||||
|
if encoderWithAllocator, supportsAllocator := embeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
|
||||||
|
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
|
||||||
|
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
|
||||||
|
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
|
||||||
|
defer runtime.AllocatorPool.Put(memoryAllocator)
|
||||||
|
embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator)
|
||||||
|
}
|
||||||
|
var tableOptions *metav1.TableOptions
|
||||||
|
if options != nil {
|
||||||
|
if passedOptions, ok := options.(*metav1.TableOptions); ok {
|
||||||
|
tableOptions = passedOptions
|
||||||
|
} else {
|
||||||
|
scope.err(fmt.Errorf("unexpected options type: %T", options), w, req)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
embeddedEncoder = newWatchEmbeddedEncoder(ctx, embeddedEncoder, mediaTypeOptions.Convert, tableOptions, scope)
|
||||||
|
|
||||||
|
if encoderWithAllocator, supportsAllocator := encoder.(runtime.EncoderWithAllocator); supportsAllocator {
|
||||||
|
if memoryAllocator == nil {
|
||||||
|
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
|
||||||
|
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
|
||||||
|
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
|
||||||
|
defer runtime.AllocatorPool.Put(memoryAllocator)
|
||||||
|
}
|
||||||
|
encoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator)
|
||||||
|
}
|
||||||
|
|
||||||
var serverShuttingDownCh <-chan struct{}
|
var serverShuttingDownCh <-chan struct{}
|
||||||
if signals := apirequest.ServerShutdownSignalFrom(req.Context()); signals != nil {
|
if signals := apirequest.ServerShutdownSignalFrom(req.Context()); signals != nil {
|
||||||
serverShuttingDownCh = signals.ShuttingDown()
|
serverShuttingDownCh = signals.ShuttingDown()
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := req.Context()
|
|
||||||
|
|
||||||
server := &WatchServer{
|
server := &WatchServer{
|
||||||
Watching: watcher,
|
Watching: watcher,
|
||||||
Scope: scope,
|
Scope: scope,
|
||||||
@ -123,21 +151,6 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
|
|||||||
Encoder: encoder,
|
Encoder: encoder,
|
||||||
EmbeddedEncoder: embeddedEncoder,
|
EmbeddedEncoder: embeddedEncoder,
|
||||||
|
|
||||||
Fixup: func(obj runtime.Object) runtime.Object {
|
|
||||||
result, err := transformObject(ctx, obj, options, mediaTypeOptions, scope, req)
|
|
||||||
if err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
|
|
||||||
return obj
|
|
||||||
}
|
|
||||||
// When we are transformed to a table, use the table options as the state for whether we
|
|
||||||
// should print headers - on watch, we only want to print table headers on the first object
|
|
||||||
// and omit them on subsequent events.
|
|
||||||
if tableOptions, ok := options.(*metav1.TableOptions); ok {
|
|
||||||
tableOptions.NoHeaders = true
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
},
|
|
||||||
|
|
||||||
TimeoutFactory: &realTimeoutFactory{timeout},
|
TimeoutFactory: &realTimeoutFactory{timeout},
|
||||||
ServerShuttingDownCh: serverShuttingDownCh,
|
ServerShuttingDownCh: serverShuttingDownCh,
|
||||||
}
|
}
|
||||||
@ -160,8 +173,6 @@ type WatchServer struct {
|
|||||||
Encoder runtime.Encoder
|
Encoder runtime.Encoder
|
||||||
// used to encode the nested object in the watch stream
|
// used to encode the nested object in the watch stream
|
||||||
EmbeddedEncoder runtime.Encoder
|
EmbeddedEncoder runtime.Encoder
|
||||||
// used to correct the object before we send it to the serializer
|
|
||||||
Fixup func(runtime.Object) runtime.Object
|
|
||||||
|
|
||||||
TimeoutFactory TimeoutFactory
|
TimeoutFactory TimeoutFactory
|
||||||
ServerShuttingDownCh <-chan struct{}
|
ServerShuttingDownCh <-chan struct{}
|
||||||
@ -196,15 +207,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var e streaming.Encoder
|
var e streaming.Encoder
|
||||||
var memoryAllocator runtime.MemoryAllocator
|
e = streaming.NewEncoder(framer, s.Encoder)
|
||||||
|
|
||||||
if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator {
|
|
||||||
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
|
|
||||||
defer runtime.AllocatorPool.Put(memoryAllocator)
|
|
||||||
e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator)
|
|
||||||
} else {
|
|
||||||
e = streaming.NewEncoder(framer, s.Encoder)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ensure the connection times out
|
// ensure the connection times out
|
||||||
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
|
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
|
||||||
@ -223,19 +226,6 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
ch := s.Watching.ResultChan()
|
ch := s.Watching.ResultChan()
|
||||||
done := req.Context().Done()
|
done := req.Context().Done()
|
||||||
|
|
||||||
embeddedEncodeFn := s.EmbeddedEncoder.Encode
|
|
||||||
if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
|
|
||||||
if memoryAllocator == nil {
|
|
||||||
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
|
|
||||||
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
|
|
||||||
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
|
|
||||||
defer runtime.AllocatorPool.Put(memoryAllocator)
|
|
||||||
}
|
|
||||||
embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error {
|
|
||||||
return encoder.EncodeWithAllocator(obj, w, memoryAllocator)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.ServerShuttingDownCh:
|
case <-s.ServerShuttingDownCh:
|
||||||
@ -258,10 +248,9 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
||||||
|
|
||||||
obj := s.Fixup(event.Object)
|
if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil {
|
||||||
if err := embeddedEncodeFn(obj, buf); err != nil {
|
|
||||||
// unexpected error
|
// unexpected error
|
||||||
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
|
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", event.Object, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -326,10 +315,10 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
|
|||||||
// End of results.
|
// End of results.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
obj := s.Fixup(event.Object)
|
|
||||||
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
|
if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil {
|
||||||
// unexpected error
|
// unexpected error
|
||||||
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
|
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", event.Object, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -647,7 +647,6 @@ func TestWatchHTTPErrors(t *testing.T) {
|
|||||||
Encoder: newCodec,
|
Encoder: newCodec,
|
||||||
EmbeddedEncoder: newCodec,
|
EmbeddedEncoder: newCodec,
|
||||||
|
|
||||||
Fixup: func(obj runtime.Object) runtime.Object { return obj },
|
|
||||||
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -712,7 +711,6 @@ func TestWatchHTTPErrorsBeforeServe(t *testing.T) {
|
|||||||
Encoder: newCodec,
|
Encoder: newCodec,
|
||||||
EmbeddedEncoder: newCodec,
|
EmbeddedEncoder: newCodec,
|
||||||
|
|
||||||
Fixup: func(obj runtime.Object) runtime.Object { return obj },
|
|
||||||
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -771,7 +769,6 @@ func TestWatchHTTPDynamicClientErrors(t *testing.T) {
|
|||||||
Encoder: newCodec,
|
Encoder: newCodec,
|
||||||
EmbeddedEncoder: newCodec,
|
EmbeddedEncoder: newCodec,
|
||||||
|
|
||||||
Fixup: func(obj runtime.Object) runtime.Object { return obj },
|
|
||||||
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -814,7 +811,6 @@ func TestWatchHTTPTimeout(t *testing.T) {
|
|||||||
Encoder: newCodec,
|
Encoder: newCodec,
|
||||||
EmbeddedEncoder: newCodec,
|
EmbeddedEncoder: newCodec,
|
||||||
|
|
||||||
Fixup: func(obj runtime.Object) runtime.Object { return obj },
|
|
||||||
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,6 +37,7 @@ import (
|
|||||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||||
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||||
"k8s.io/apiextensions-apiserver/test/integration/fixtures"
|
"k8s.io/apiextensions-apiserver/test/integration/fixtures"
|
||||||
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
|
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
|
||||||
@ -2307,6 +2308,142 @@ func TestTransform(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchTransformCaching(t *testing.T) {
|
||||||
|
ctx, clientSet, _, tearDownFn := setup(t)
|
||||||
|
defer tearDownFn()
|
||||||
|
|
||||||
|
ns := framework.CreateNamespaceOrDie(clientSet, "watch-transform", t)
|
||||||
|
defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
|
||||||
|
|
||||||
|
list, err := clientSet.CoreV1().ConfigMaps(ns.Name).List(ctx, metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to list objects: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
timeout := 30 * time.Second
|
||||||
|
listOptions := &metav1.ListOptions{
|
||||||
|
ResourceVersion: list.ResourceVersion,
|
||||||
|
Watch: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
wMeta, err := clientSet.CoreV1().RESTClient().Get().
|
||||||
|
AbsPath("/api/v1/namespaces/watch-transform/configmaps").
|
||||||
|
SetHeader("Accept", "application/vnd.kubernetes.protobuf;as=PartialObjectMetadata;g=meta.k8s.io;v=v1beta1").
|
||||||
|
VersionedParams(listOptions, metav1.ParameterCodec).
|
||||||
|
Timeout(timeout).
|
||||||
|
Stream(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to start meta watch: %v", err)
|
||||||
|
}
|
||||||
|
defer wMeta.Close()
|
||||||
|
|
||||||
|
wTableIncludeMeta, err := clientSet.CoreV1().RESTClient().Get().
|
||||||
|
AbsPath("/api/v1/namespaces/watch-transform/configmaps").
|
||||||
|
SetHeader("Accept", "application/json;as=Table;g=meta.k8s.io;v=v1beta1").
|
||||||
|
VersionedParams(listOptions, metav1.ParameterCodec).
|
||||||
|
Param("includeObject", string(metav1.IncludeMetadata)).
|
||||||
|
Timeout(timeout).
|
||||||
|
Stream(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to start table meta watch: %v", err)
|
||||||
|
}
|
||||||
|
defer wTableIncludeMeta.Close()
|
||||||
|
|
||||||
|
wTableIncludeObject, err := clientSet.CoreV1().RESTClient().Get().
|
||||||
|
AbsPath("/api/v1/namespaces/watch-transform/configmaps").
|
||||||
|
SetHeader("Accept", "application/json;as=Table;g=meta.k8s.io;v=v1beta1").
|
||||||
|
VersionedParams(listOptions, metav1.ParameterCodec).
|
||||||
|
Param("includeObject", string(metav1.IncludeObject)).
|
||||||
|
Timeout(timeout).
|
||||||
|
Stream(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to start table object watch: %v", err)
|
||||||
|
}
|
||||||
|
defer wTableIncludeObject.Close()
|
||||||
|
|
||||||
|
configMap, err := clientSet.CoreV1().ConfigMaps("watch-transform").Create(ctx, &v1.ConfigMap{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "test1"},
|
||||||
|
Data: map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
},
|
||||||
|
}, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create a configMap: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
listOptionsDelayed := &metav1.ListOptions{
|
||||||
|
ResourceVersion: configMap.ResourceVersion,
|
||||||
|
Watch: true,
|
||||||
|
}
|
||||||
|
wTableIncludeObjectDelayed, err := clientSet.CoreV1().RESTClient().Get().
|
||||||
|
AbsPath("/api/v1/namespaces/watch-transform/configmaps").
|
||||||
|
SetHeader("Accept", "application/json;as=Table;g=meta.k8s.io;v=v1beta1").
|
||||||
|
VersionedParams(listOptionsDelayed, metav1.ParameterCodec).
|
||||||
|
Param("includeObject", string(metav1.IncludeObject)).
|
||||||
|
Timeout(timeout).
|
||||||
|
Stream(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to start table object watch: %v", err)
|
||||||
|
}
|
||||||
|
defer wTableIncludeObjectDelayed.Close()
|
||||||
|
|
||||||
|
configMap2, err := clientSet.CoreV1().ConfigMaps("watch-transform").Create(ctx, &v1.ConfigMap{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "test2"},
|
||||||
|
Data: map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
},
|
||||||
|
}, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create a second configMap: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
metaChecks := []partialObjectMetadataCheck{
|
||||||
|
func(res *metav1beta1.PartialObjectMetadata) {
|
||||||
|
if !apiequality.Semantic.DeepEqual(configMap.ObjectMeta, res.ObjectMeta) {
|
||||||
|
t.Errorf("expected object: %#v, got: %#v", configMap.ObjectMeta, res.ObjectMeta)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
func(res *metav1beta1.PartialObjectMetadata) {
|
||||||
|
if !apiequality.Semantic.DeepEqual(configMap2.ObjectMeta, res.ObjectMeta) {
|
||||||
|
t.Errorf("expected object: %#v, got: %#v", configMap2.ObjectMeta, res.ObjectMeta)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
expectPartialObjectMetaEventsProtobufChecks(t, wMeta, metaChecks)
|
||||||
|
|
||||||
|
tableMetaCheck := func(expected *v1.ConfigMap, got []byte) {
|
||||||
|
var obj metav1.PartialObjectMetadata
|
||||||
|
if err := json.Unmarshal(got, &obj); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !apiequality.Semantic.DeepEqual(expected.ObjectMeta, obj.ObjectMeta) {
|
||||||
|
t.Errorf("expected object: %#v, got: %#v", expected, obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
objectMetas := expectTableWatchEvents(t, 2, 3, metav1.IncludeMetadata, json.NewDecoder(wTableIncludeMeta))
|
||||||
|
tableMetaCheck(configMap, objectMetas[0])
|
||||||
|
tableMetaCheck(configMap2, objectMetas[1])
|
||||||
|
|
||||||
|
tableObjectCheck := func(expected *v1.ConfigMap, got []byte) {
|
||||||
|
var obj *v1.ConfigMap
|
||||||
|
if err := json.Unmarshal(got, &obj); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
obj.TypeMeta = metav1.TypeMeta{}
|
||||||
|
if !apiequality.Semantic.DeepEqual(expected, obj) {
|
||||||
|
t.Errorf("expected object: %#v, got: %#v", expected, obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
objects := expectTableWatchEvents(t, 2, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObject))
|
||||||
|
tableObjectCheck(configMap, objects[0])
|
||||||
|
tableObjectCheck(configMap2, objects[1])
|
||||||
|
|
||||||
|
delayedObjects := expectTableWatchEvents(t, 1, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObjectDelayed))
|
||||||
|
tableObjectCheck(configMap2, delayedObjects[0])
|
||||||
|
}
|
||||||
|
|
||||||
func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.IncludeObjectPolicy, d *json.Decoder) [][]byte {
|
func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.IncludeObjectPolicy, d *json.Decoder) [][]byte {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
@ -2347,6 +2484,7 @@ func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.Incl
|
|||||||
if meta.TypeMeta != partialObj {
|
if meta.TypeMeta != partialObj {
|
||||||
t.Fatalf("expected partial object: %#v", meta)
|
t.Fatalf("expected partial object: %#v", meta)
|
||||||
}
|
}
|
||||||
|
objects = append(objects, row.Object.Raw)
|
||||||
case metav1.IncludeNone:
|
case metav1.IncludeNone:
|
||||||
if len(row.Object.Raw) != 0 {
|
if len(row.Object.Raw) != 0 {
|
||||||
t.Fatalf("Expected no object: %s", string(row.Object.Raw))
|
t.Fatalf("Expected no object: %s", string(row.Object.Raw))
|
||||||
@ -2383,7 +2521,22 @@ func expectPartialObjectMetaEvents(t *testing.T, d *json.Decoder, values ...stri
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type partialObjectMetadataCheck func(*metav1beta1.PartialObjectMetadata)
|
||||||
|
|
||||||
func expectPartialObjectMetaEventsProtobuf(t *testing.T, r io.Reader, values ...string) {
|
func expectPartialObjectMetaEventsProtobuf(t *testing.T, r io.Reader, values ...string) {
|
||||||
|
checks := []partialObjectMetadataCheck{}
|
||||||
|
for i, value := range values {
|
||||||
|
i, value := i, value
|
||||||
|
checks = append(checks, func(meta *metav1beta1.PartialObjectMetadata) {
|
||||||
|
if meta.Annotations["test"] != value {
|
||||||
|
t.Fatalf("expected event %d to have value %q instead of %q", i+1, value, meta.Annotations["test"])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
expectPartialObjectMetaEventsProtobufChecks(t, r, checks)
|
||||||
|
}
|
||||||
|
|
||||||
|
func expectPartialObjectMetaEventsProtobufChecks(t *testing.T, r io.Reader, checks []partialObjectMetadataCheck) {
|
||||||
scheme := runtime.NewScheme()
|
scheme := runtime.NewScheme()
|
||||||
metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
|
metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
|
||||||
rs := protobuf.NewRawSerializer(scheme, scheme)
|
rs := protobuf.NewRawSerializer(scheme, scheme)
|
||||||
@ -2393,7 +2546,7 @@ func expectPartialObjectMetaEventsProtobuf(t *testing.T, r io.Reader, values ...
|
|||||||
)
|
)
|
||||||
ds := metainternalversionscheme.Codecs.UniversalDeserializer()
|
ds := metainternalversionscheme.Codecs.UniversalDeserializer()
|
||||||
|
|
||||||
for i, value := range values {
|
for _, check := range checks {
|
||||||
var evt metav1.WatchEvent
|
var evt metav1.WatchEvent
|
||||||
if _, _, err := d.Decode(nil, &evt); err != nil {
|
if _, _, err := d.Decode(nil, &evt); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -2410,9 +2563,7 @@ func expectPartialObjectMetaEventsProtobuf(t *testing.T, r io.Reader, values ...
|
|||||||
if !reflect.DeepEqual(expected, gvk) {
|
if !reflect.DeepEqual(expected, gvk) {
|
||||||
t.Fatalf("expected partial object: %#v", meta)
|
t.Fatalf("expected partial object: %#v", meta)
|
||||||
}
|
}
|
||||||
if meta.Annotations["test"] != value {
|
check(meta)
|
||||||
t.Fatalf("expected event %d to have value %q instead of %q", i+1, value, meta.Annotations["test"])
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user