Add a metadata client to client-go that can read PartialObjectMetadata
This client exposes operations on generic metadata (get, list, watch, delete) and allows patch operations. The client always uses protobuf and requests the server transform the response into the appropriate object. Using this client simplifies the work of generic controllers by allowing them to treat all objects the same, and also improves performance both in the amount of data sent as well as allowing protobuf on CRD resources.
This commit is contained in:
@@ -30,6 +30,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
@@ -48,10 +49,12 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer/protobuf"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/dynamic"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/metadata"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/pager"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
@@ -403,6 +406,338 @@ func TestNameInFieldSelector(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type callWrapper struct {
|
||||
nested http.RoundTripper
|
||||
req *http.Request
|
||||
resp *http.Response
|
||||
err error
|
||||
}
|
||||
|
||||
func (w *callWrapper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
w.req = req
|
||||
resp, err := w.nested.RoundTrip(req)
|
||||
w.resp = resp
|
||||
w.err = err
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func TestMetadataClient(t *testing.T) {
|
||||
tearDown, config, _, err := fixtures.StartDefaultServer(t)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer tearDown()
|
||||
|
||||
s, clientset, closeFn := setup(t)
|
||||
defer closeFn()
|
||||
|
||||
apiExtensionClient, err := apiextensionsclient.NewForConfig(config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dynamicClient, err := dynamic.NewForConfig(config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fooCRD := &apiextensionsv1beta1.CustomResourceDefinition{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foos.cr.bar.com",
|
||||
},
|
||||
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
|
||||
Group: "cr.bar.com",
|
||||
Version: "v1",
|
||||
Scope: apiextensionsv1beta1.NamespaceScoped,
|
||||
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
|
||||
Plural: "foos",
|
||||
Kind: "Foo",
|
||||
},
|
||||
},
|
||||
}
|
||||
fooCRD, err = fixtures.CreateNewCustomResourceDefinition(fooCRD, apiExtensionClient, dynamicClient)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
crdGVR := schema.GroupVersionResource{Group: fooCRD.Spec.Group, Version: fooCRD.Spec.Version, Resource: "foos"}
|
||||
|
||||
testcases := []struct {
|
||||
name string
|
||||
want func(*testing.T)
|
||||
}{
|
||||
{
|
||||
name: "list, get, patch, and delete via metadata client",
|
||||
want: func(t *testing.T) {
|
||||
ns := "metadata-builtin"
|
||||
svc, err := clientset.CoreV1().Services(ns).Create(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-1", Annotations: map[string]string{"foo": "bar"}}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create service: %v", err)
|
||||
}
|
||||
|
||||
cfg := metadata.ConfigFor(&restclient.Config{Host: s.URL})
|
||||
wrapper := &callWrapper{}
|
||||
cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper {
|
||||
wrapper.nested = rt
|
||||
return wrapper
|
||||
})
|
||||
|
||||
client := metadata.NewConfigOrDie(cfg).Resource(v1.SchemeGroupVersion.WithResource("services"))
|
||||
items, err := client.Namespace(ns).List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if items.ResourceVersion == "" {
|
||||
t.Fatalf("unexpected items: %#v", items)
|
||||
}
|
||||
if len(items.Items) != 1 {
|
||||
t.Fatalf("unexpected list: %#v", items)
|
||||
}
|
||||
if item := items.Items[0]; item.Name != "test-1" || item.UID != svc.UID || item.Annotations["foo"] != "bar" {
|
||||
t.Fatalf("unexpected object: %#v", item)
|
||||
}
|
||||
|
||||
if wrapper.resp == nil || wrapper.resp.Header.Get("Content-Type") != "application/vnd.kubernetes.protobuf" {
|
||||
t.Fatalf("unexpected response: %#v", wrapper.resp)
|
||||
}
|
||||
wrapper.resp = nil
|
||||
|
||||
item, err := client.Namespace(ns).Get("test-1", metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if item.ResourceVersion == "" || item.UID != svc.UID || item.Annotations["foo"] != "bar" {
|
||||
t.Fatalf("unexpected object: %#v", item)
|
||||
}
|
||||
if wrapper.resp == nil || wrapper.resp.Header.Get("Content-Type") != "application/vnd.kubernetes.protobuf" {
|
||||
t.Fatalf("unexpected response: %#v", wrapper.resp)
|
||||
}
|
||||
|
||||
item, err = client.Namespace(ns).Patch("test-1", types.MergePatchType, []byte(`{"metadata":{"annotations":{"foo":"baz"}}}`), metav1.PatchOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if item.Annotations["foo"] != "baz" {
|
||||
t.Fatalf("unexpected object: %#v", item)
|
||||
}
|
||||
|
||||
if err := client.Namespace(ns).Delete("test-1", &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &item.UID}}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if _, err := client.Namespace(ns).Get("test-1", metav1.GetOptions{}); !apierrors.IsNotFound(err) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "list, get, patch, and delete via metadata client on a CRD",
|
||||
want: func(t *testing.T) {
|
||||
ns := "metadata-crd"
|
||||
crclient := dynamicClient.Resource(crdGVR).Namespace(ns)
|
||||
cr, err := crclient.Create(&unstructured.Unstructured{
|
||||
Object: map[string]interface{}{
|
||||
"apiVersion": "cr.bar.com/v1",
|
||||
"kind": "Foo",
|
||||
"spec": map[string]interface{}{"field": 1},
|
||||
"metadata": map[string]interface{}{
|
||||
"name": "test-1",
|
||||
"annotations": map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create cr: %v", err)
|
||||
}
|
||||
|
||||
cfg := metadata.ConfigFor(config)
|
||||
wrapper := &callWrapper{}
|
||||
cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper {
|
||||
wrapper.nested = rt
|
||||
return wrapper
|
||||
})
|
||||
|
||||
client := metadata.NewConfigOrDie(cfg).Resource(crdGVR)
|
||||
items, err := client.Namespace(ns).List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if items.ResourceVersion == "" {
|
||||
t.Fatalf("unexpected items: %#v", items)
|
||||
}
|
||||
if len(items.Items) != 1 {
|
||||
t.Fatalf("unexpected list: %#v", items)
|
||||
}
|
||||
if item := items.Items[0]; item.Name != "test-1" || item.UID != cr.GetUID() || item.Annotations["foo"] != "bar" {
|
||||
t.Fatalf("unexpected object: %#v", item)
|
||||
}
|
||||
|
||||
if wrapper.resp == nil || wrapper.resp.Header.Get("Content-Type") != "application/vnd.kubernetes.protobuf" {
|
||||
t.Fatalf("unexpected response: %#v", wrapper.resp)
|
||||
}
|
||||
wrapper.resp = nil
|
||||
|
||||
item, err := client.Namespace(ns).Get("test-1", metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if item.ResourceVersion == "" || item.UID != cr.GetUID() || item.Annotations["foo"] != "bar" {
|
||||
t.Fatalf("unexpected object: %#v", item)
|
||||
}
|
||||
if wrapper.resp == nil || wrapper.resp.Header.Get("Content-Type") != "application/vnd.kubernetes.protobuf" {
|
||||
t.Fatalf("unexpected response: %#v", wrapper.resp)
|
||||
}
|
||||
|
||||
item, err = client.Namespace(ns).Patch("test-1", types.MergePatchType, []byte(`{"metadata":{"annotations":{"foo":"baz"}}}`), metav1.PatchOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if item.Annotations["foo"] != "baz" {
|
||||
t.Fatalf("unexpected object: %#v", item)
|
||||
}
|
||||
|
||||
if err := client.Namespace(ns).Delete("test-1", &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &item.UID}}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := client.Namespace(ns).Get("test-1", metav1.GetOptions{}); !apierrors.IsNotFound(err) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "watch via metadata client",
|
||||
want: func(t *testing.T) {
|
||||
ns := "metadata-watch"
|
||||
svc, err := clientset.CoreV1().Services(ns).Create(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-2", Annotations: map[string]string{"foo": "bar"}}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create service: %v", err)
|
||||
}
|
||||
if _, err := clientset.CoreV1().Services(ns).Patch("test-2", types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`)); err != nil {
|
||||
t.Fatalf("unable to patch cr: %v", err)
|
||||
}
|
||||
|
||||
cfg := metadata.ConfigFor(&restclient.Config{Host: s.URL})
|
||||
wrapper := &callWrapper{}
|
||||
cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper {
|
||||
wrapper.nested = rt
|
||||
return wrapper
|
||||
})
|
||||
|
||||
client := metadata.NewConfigOrDie(cfg).Resource(v1.SchemeGroupVersion.WithResource("services"))
|
||||
w, err := client.Namespace(ns).Watch(metav1.ListOptions{ResourceVersion: svc.ResourceVersion, Watch: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w.Stop()
|
||||
var r watch.Event
|
||||
select {
|
||||
case evt, ok := <-w.ResultChan():
|
||||
if !ok {
|
||||
t.Fatal("watch closed")
|
||||
}
|
||||
r = evt
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("no watch event in 5 seconds, bug")
|
||||
}
|
||||
if r.Type != watch.Modified {
|
||||
t.Fatalf("unexpected watch: %#v", r)
|
||||
}
|
||||
item, ok := r.Object.(*metav1.PartialObjectMetadata)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected object: %T", item)
|
||||
}
|
||||
if item.ResourceVersion == "" || item.Name != "test-2" || item.UID != svc.UID || item.Annotations["test"] != "1" {
|
||||
t.Fatalf("unexpected object: %#v", item)
|
||||
}
|
||||
|
||||
if wrapper.resp == nil || wrapper.resp.Header.Get("Content-Type") != "application/vnd.kubernetes.protobuf;stream=watch" {
|
||||
t.Fatalf("unexpected response: %#v", wrapper.resp)
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "watch via metadata client on a CRD",
|
||||
want: func(t *testing.T) {
|
||||
ns := "metadata-watch-crd"
|
||||
crclient := dynamicClient.Resource(crdGVR).Namespace(ns)
|
||||
cr, err := crclient.Create(&unstructured.Unstructured{
|
||||
Object: map[string]interface{}{
|
||||
"apiVersion": "cr.bar.com/v1",
|
||||
"kind": "Foo",
|
||||
"spec": map[string]interface{}{"field": 1},
|
||||
"metadata": map[string]interface{}{
|
||||
"name": "test-2",
|
||||
"annotations": map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create cr: %v", err)
|
||||
}
|
||||
|
||||
cfg := metadata.ConfigFor(config)
|
||||
client := metadata.NewConfigOrDie(cfg).Resource(crdGVR)
|
||||
|
||||
patched, err := client.Namespace(ns).Patch("test-2", types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`), metav1.PatchOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if patched.GetResourceVersion() == cr.GetResourceVersion() {
|
||||
t.Fatalf("Patch did not modify object: %#v", patched)
|
||||
}
|
||||
|
||||
wrapper := &callWrapper{}
|
||||
cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper {
|
||||
wrapper.nested = rt
|
||||
return wrapper
|
||||
})
|
||||
client = metadata.NewConfigOrDie(cfg).Resource(crdGVR)
|
||||
|
||||
w, err := client.Namespace(ns).Watch(metav1.ListOptions{ResourceVersion: cr.GetResourceVersion(), Watch: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w.Stop()
|
||||
var r watch.Event
|
||||
select {
|
||||
case evt, ok := <-w.ResultChan():
|
||||
if !ok {
|
||||
t.Fatal("watch closed")
|
||||
}
|
||||
r = evt
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("no watch event in 5 seconds, bug")
|
||||
}
|
||||
if r.Type != watch.Modified {
|
||||
t.Fatalf("unexpected watch: %#v", r)
|
||||
}
|
||||
item, ok := r.Object.(*metav1.PartialObjectMetadata)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected object: %T", item)
|
||||
}
|
||||
if item.ResourceVersion == "" || item.Name != "test-2" || item.UID != cr.GetUID() || item.Annotations["test"] != "1" {
|
||||
t.Fatalf("unexpected object: %#v", item)
|
||||
}
|
||||
|
||||
if wrapper.resp == nil || wrapper.resp.Header.Get("Content-Type") != "application/vnd.kubernetes.protobuf;stream=watch" {
|
||||
t.Fatalf("unexpected response: %#v", wrapper.resp)
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i := range testcases {
|
||||
tc := testcases[i]
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
tc.want(t)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAPICRDProtobuf(t *testing.T) {
|
||||
testNamespace := "test-api-crd-protobuf"
|
||||
tearDown, config, _, err := fixtures.StartDefaultServer(t)
|
||||
|
Reference in New Issue
Block a user