Add TPR to CRD migration helper.

This commit is contained in:
Anthony Yeh
2017-05-30 20:24:12 -07:00
parent e0a6cde6f4
commit ba59e14d44
13 changed files with 313 additions and 26 deletions

View File

@@ -28,6 +28,7 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/endpoints/discovery"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
serverstorage "k8s.io/apiserver/pkg/server/storage"
@@ -210,7 +211,7 @@ func (c *Config) SkipComplete() completedConfig {
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
// KubeletClientConfig
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget, crdRESTOptionsGetter genericregistry.RESTOptionsGetter) (*Master, error) {
if reflect.DeepEqual(c.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}
@@ -254,7 +255,8 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
extensionsrest.RESTStorageProvider{ResourceInterface: thirdparty.NewThirdPartyResourceServer(s, s.DiscoveryGroupManager, c.StorageFactory)},
// TODO(enisoc): Remove crdRESTOptionsGetter input argument when TPR code is removed.
extensionsrest.RESTStorageProvider{ResourceInterface: thirdparty.NewThirdPartyResourceServer(s, s.DiscoveryGroupManager, c.StorageFactory, crdRESTOptionsGetter)},
networkingrest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer},

View File

@@ -54,7 +54,7 @@ func TestValidOpenAPISpec(t *testing.T) {
}
config.GenericConfig.SwaggerConfig = genericapiserver.DefaultSwaggerConfig()
master, err := config.Complete().New(genericapiserver.EmptyDelegate)
master, err := config.Complete().New(genericapiserver.EmptyDelegate, nil)
if err != nil {
t.Fatalf("Error in bringing up the master: %v", err)
}

View File

@@ -115,7 +115,7 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion
func newMaster(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
etcdserver, config, assert := setUp(t)
master, err := config.Complete().New(genericapiserver.EmptyDelegate)
master, err := config.Complete().New(genericapiserver.EmptyDelegate, nil)
if err != nil {
t.Fatalf("Error in bringing up the master: %v", err)
}
@@ -141,7 +141,7 @@ func limitedAPIResourceConfigSource() *serverstorage.ResourceConfig {
func newLimitedMaster(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
etcdserver, config, assert := setUp(t)
config.APIResourceConfigSource = limitedAPIResourceConfigSource()
master, err := config.Complete().New(genericapiserver.EmptyDelegate)
master, err := config.Complete().New(genericapiserver.EmptyDelegate, nil)
if err != nil {
t.Fatalf("Error in bringing up the master: %v", err)
}

View File

@@ -27,9 +27,12 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints:go_default_library",
@@ -40,12 +43,16 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/k8s.io/client-go/discovery:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/apis/apiextensions:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/apiserver:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/listers/apiextensions/internalversion:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/registry/customresource:go_default_library",
],
)

View File

@@ -25,16 +25,26 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/json"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
genericapi "k8s.io/apiserver/pkg/endpoints"
"k8s.io/apiserver/pkg/endpoints/discovery"
"k8s.io/apiserver/pkg/endpoints/request"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorgage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
discoveryclient "k8s.io/client-go/discovery"
"k8s.io/kube-apiextensions-server/pkg/apis/apiextensions"
apiextensionsserver "k8s.io/kube-apiextensions-server/pkg/apiserver"
apiextensionsclient "k8s.io/kube-apiextensions-server/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion"
"k8s.io/kube-apiextensions-server/pkg/registry/customresource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
extensionsrest "k8s.io/kubernetes/pkg/registry/extensions/rest"
@@ -71,13 +81,16 @@ type ThirdPartyResourceServer struct {
// Useful for reliable testing. Shouldn't be used otherwise.
disableThirdPartyControllerForTesting bool
crdRESTOptionsGetter generic.RESTOptionsGetter
}
func NewThirdPartyResourceServer(genericAPIServer *genericapiserver.GenericAPIServer, availableGroupManager discovery.GroupManager, storageFactory serverstorgage.StorageFactory) *ThirdPartyResourceServer {
func NewThirdPartyResourceServer(genericAPIServer *genericapiserver.GenericAPIServer, availableGroupManager discovery.GroupManager, storageFactory serverstorgage.StorageFactory, crdRESTOptionsGetter generic.RESTOptionsGetter) *ThirdPartyResourceServer {
ret := &ThirdPartyResourceServer{
genericAPIServer: genericAPIServer,
thirdPartyResources: map[string]*thirdPartyEntry{},
availableGroupManager: availableGroupManager,
crdRESTOptionsGetter: crdRESTOptionsGetter,
}
var err error
@@ -130,7 +143,7 @@ func (m *ThirdPartyResourceServer) removeThirdPartyStorage(path, resource string
if !found {
return nil
}
if err := m.removeAllThirdPartyResources(storage); err != nil {
if err := m.removeThirdPartyResourceData(&entry.group, resource, storage); err != nil {
return err
}
delete(entry.storage, resource)
@@ -166,25 +179,132 @@ func (m *ThirdPartyResourceServer) RemoveThirdPartyResource(path string) error {
return nil
}
func (m *ThirdPartyResourceServer) removeAllThirdPartyResources(registry *thirdpartyresourcedatastore.REST) error {
ctx := genericapirequest.NewDefaultContext()
func (m *ThirdPartyResourceServer) removeThirdPartyResourceData(group *metav1.APIGroup, resource string, registry *thirdpartyresourcedatastore.REST) error {
// Freeze TPR data to prevent new writes via this apiserver process.
// Other apiservers can still write. This is best-effort because there
// are worse problems with TPR data than the possibility of going back
// in time when migrating to CRD [citation needed].
registry.Freeze()
ctx := genericapirequest.NewContext()
existingData, err := registry.List(ctx, nil)
if err != nil {
return err
}
list, ok := existingData.(*extensions.ThirdPartyResourceDataList)
if !ok {
return fmt.Errorf("expected a *ThirdPartyResourceDataList, got %#v", list)
return fmt.Errorf("expected a *ThirdPartyResourceDataList, got %T", existingData)
}
for ix := range list.Items {
item := &list.Items[ix]
if _, _, err := registry.Delete(ctx, item.Name, nil); err != nil {
// Migrate TPR data to CRD if requested.
gvk := schema.GroupVersionKind{Group: group.Name, Version: group.PreferredVersion.Version, Kind: registry.Kind()}
migrationRequested, err := m.migrateThirdPartyResourceData(gvk, resource, list)
if err != nil {
// Migration is best-effort. Log and continue.
utilruntime.HandleError(fmt.Errorf("failed to migrate TPR data: %v", err))
}
// Skip deletion of TPR data if migration was requested (whether or not it succeeded).
// This leaves the etcd data around for rollback, and to avoid sending DELETE watch events.
if migrationRequested {
return nil
}
for i := range list.Items {
item := &list.Items[i]
// Use registry.Store.Delete() to bypass the frozen registry.Delete().
if _, _, err := registry.Store.Delete(genericapirequest.WithNamespace(ctx, item.Namespace), item.Name, nil); err != nil {
return err
}
}
return nil
}
func (m *ThirdPartyResourceServer) findMatchingCRD(gvk schema.GroupVersionKind, resource string) (*apiextensions.CustomResourceDefinition, error) {
// CustomResourceDefinitionList does not implement the protobuf marshalling interface.
config := *m.genericAPIServer.LoopbackClientConfig
config.ContentType = "application/json"
crdClient, err := apiextensionsclient.NewForConfig(&config)
if err != nil {
return nil, fmt.Errorf("can't create apiextensions client: %v", err)
}
crdList, err := crdClient.CustomResourceDefinitions().List(metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("can't list CustomResourceDefinitions: %v", err)
}
for i := range crdList.Items {
item := &crdList.Items[i]
if item.Spec.Scope == apiextensions.NamespaceScoped &&
item.Spec.Group == gvk.Group && item.Spec.Version == gvk.Version &&
item.Status.AcceptedNames.Kind == gvk.Kind && item.Status.AcceptedNames.Plural == resource {
return item, nil
}
}
return nil, nil
}
func (m *ThirdPartyResourceServer) migrateThirdPartyResourceData(gvk schema.GroupVersionKind, resource string, dataList *extensions.ThirdPartyResourceDataList) (bool, error) {
// A matching CustomResourceDefinition implies migration is requested.
crd, err := m.findMatchingCRD(gvk, resource)
if err != nil {
return false, fmt.Errorf("can't determine if TPR should migrate: %v", err)
}
if crd == nil {
// No migration requested.
return false, nil
}
// Talk directly to CustomResource storage.
// We have to bypass the API server because TPR is shadowing CRD at this point.
storage := customresource.NewREST(
schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural},
schema.GroupVersionKind{Group: crd.Spec.Group, Version: crd.Spec.Version, Kind: crd.Spec.Names.ListKind},
apiextensionsserver.UnstructuredCopier{},
customresource.NewStrategy(discoveryclient.NewUnstructuredObjectTyper(nil), true),
m.crdRESTOptionsGetter,
)
// Copy TPR data to CustomResource.
var errs []error
ctx := request.NewContext()
for i := range dataList.Items {
item := &dataList.Items[i]
// Convert TPR data to Unstructured.
objMap := make(map[string]interface{})
if err := json.Unmarshal(item.Data, &objMap); err != nil {
errs = append(errs, fmt.Errorf("can't unmarshal TPR data %q: %v", item.Name, err))
continue
}
// Convert metadata to Unstructured and merge with data.
// cf. thirdpartyresourcedata.encodeToJSON()
metaMap := make(map[string]interface{})
buf, err := json.Marshal(&item.ObjectMeta)
if err != nil {
errs = append(errs, fmt.Errorf("can't marshal metadata for TPR data %q: %v", item.Name, err))
continue
}
if err := json.Unmarshal(buf, &metaMap); err != nil {
errs = append(errs, fmt.Errorf("can't unmarshal TPR data %q: %v", item.Name, err))
continue
}
// resourceVersion cannot be set when creating objects.
delete(metaMap, "resourceVersion")
objMap["metadata"] = metaMap
// Store CustomResource.
obj := &unstructured.Unstructured{Object: objMap}
createCtx := request.WithNamespace(ctx, obj.GetNamespace())
if _, err := storage.Create(createCtx, obj); err != nil {
errs = append(errs, fmt.Errorf("can't create CustomResource for TPR data %q: %v", item.Name, err))
continue
}
}
return true, utilerrors.NewAggregate(errs)
}
// ListThirdPartyResources lists all currently installed third party resources
// The format is <path>/<resource-plural-name>
func (m *ThirdPartyResourceServer) ListThirdPartyResources() []string {

View File

@@ -35,9 +35,14 @@ go_library(
"//pkg/apis/extensions:go_default_library",
"//pkg/registry/cachesize:go_default_library",
"//pkg/registry/extensions/thirdpartyresourcedata:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
],
)

View File

@@ -18,20 +18,72 @@ package storage
import (
"strings"
"sync/atomic"
"k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
)
// REST implements a RESTStorage for ThirdPartyResourceData
// errFrozen is a transient error to indicate that clients should retry with backoff.
var errFrozen = errors.NewServiceUnavailable("TPR data is temporarily frozen")
// REST implements a RESTStorage for ThirdPartyResourceData.
type REST struct {
*genericregistry.Store
kind string
kind string
frozen atomic.Value
}
// Freeze causes all future calls to Create/Update/Delete/DeleteCollection to return a transient error.
// This is irreversible and meant for use when the TPR data is being deleted or migrated/abandoned.
func (r *REST) Freeze() {
r.frozen.Store(true)
}
func (r *REST) isFrozen() bool {
return r.frozen.Load() != nil
}
// Create is a wrapper to support Freeze.
func (r *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) {
if r.isFrozen() {
return nil, errFrozen
}
return r.Store.Create(ctx, obj)
}
// Update is a wrapper to support Freeze.
func (r *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo) (runtime.Object, bool, error) {
if r.isFrozen() {
return nil, false, errFrozen
}
return r.Store.Update(ctx, name, objInfo)
}
// Delete is a wrapper to support Freeze.
func (r *REST) Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
if r.isFrozen() {
return nil, false, errFrozen
}
return r.Store.Delete(ctx, name, options)
}
// DeleteCollection is a wrapper to support Freeze.
func (r *REST) DeleteCollection(ctx genericapirequest.Context, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
if r.isFrozen() {
return nil, errFrozen
}
return r.Store.DeleteCollection(ctx, options, listOptions)
}
// NewREST returns a registry which will store ThirdPartyResourceData in the given helper