285 lines
8.4 KiB
Go
285 lines
8.4 KiB
Go
/*
|
|
Copyright 2024 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package storageversionmigrator
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/discovery"
|
|
"k8s.io/client-go/metadata"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/util/workqueue"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/kubernetes/pkg/controller"
|
|
|
|
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
svminformers "k8s.io/client-go/informers/storagemigration/v1alpha1"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
svmlisters "k8s.io/client-go/listers/storagemigration/v1alpha1"
|
|
)
|
|
|
|
const (
|
|
// this name is guaranteed to be not present in the cluster as it not a valid namespace name
|
|
fakeSVMNamespaceName string = "@fake:svm_ns!"
|
|
ResourceVersionControllerName string = "resource-version-controller"
|
|
)
|
|
|
|
// ResourceVersionController adds the resource version obtained from a randomly nonexistent namespace
|
|
// to the SVM status before the migration is initiated. This resource version is utilized for checking
|
|
// freshness of GC cache before the migration is initiated.
|
|
type ResourceVersionController struct {
|
|
discoveryClient *discovery.DiscoveryClient
|
|
metadataClient metadata.Interface
|
|
svmListers svmlisters.StorageVersionMigrationLister
|
|
svmSynced cache.InformerSynced
|
|
queue workqueue.RateLimitingInterface
|
|
kubeClient clientset.Interface
|
|
mapper meta.ResettableRESTMapper
|
|
}
|
|
|
|
func NewResourceVersionController(
|
|
ctx context.Context,
|
|
kubeClient clientset.Interface,
|
|
discoveryClient *discovery.DiscoveryClient,
|
|
metadataClient metadata.Interface,
|
|
svmInformer svminformers.StorageVersionMigrationInformer,
|
|
mapper meta.ResettableRESTMapper,
|
|
) *ResourceVersionController {
|
|
logger := klog.FromContext(ctx)
|
|
|
|
rvController := &ResourceVersionController{
|
|
kubeClient: kubeClient,
|
|
discoveryClient: discoveryClient,
|
|
metadataClient: metadataClient,
|
|
svmListers: svmInformer.Lister(),
|
|
svmSynced: svmInformer.Informer().HasSynced,
|
|
mapper: mapper,
|
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ResourceVersionControllerName),
|
|
}
|
|
|
|
_, _ = svmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
rvController.addSVM(logger, obj)
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
rvController.updateSVM(logger, oldObj, newObj)
|
|
},
|
|
})
|
|
|
|
return rvController
|
|
}
|
|
|
|
func (rv *ResourceVersionController) addSVM(logger klog.Logger, obj interface{}) {
|
|
svm := obj.(*svmv1alpha1.StorageVersionMigration)
|
|
logger.V(4).Info("Adding", "svm", klog.KObj(svm))
|
|
rv.enqueue(svm)
|
|
}
|
|
|
|
func (rv *ResourceVersionController) updateSVM(logger klog.Logger, oldObj, newObj interface{}) {
|
|
oldSVM := oldObj.(*svmv1alpha1.StorageVersionMigration)
|
|
newSVM := newObj.(*svmv1alpha1.StorageVersionMigration)
|
|
logger.V(4).Info("Updating", "svm", klog.KObj(oldSVM))
|
|
rv.enqueue(newSVM)
|
|
}
|
|
|
|
func (rv *ResourceVersionController) enqueue(svm *svmv1alpha1.StorageVersionMigration) {
|
|
key, err := controller.KeyFunc(svm)
|
|
if err != nil {
|
|
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %w", svm, err))
|
|
return
|
|
}
|
|
|
|
rv.queue.Add(key)
|
|
}
|
|
|
|
func (rv *ResourceVersionController) Run(ctx context.Context) {
|
|
defer utilruntime.HandleCrash()
|
|
defer rv.queue.ShutDown()
|
|
|
|
logger := klog.FromContext(ctx)
|
|
logger.Info("Starting", "controller", ResourceVersionControllerName)
|
|
defer logger.Info("Shutting down", "controller", ResourceVersionControllerName)
|
|
|
|
if !cache.WaitForNamedCacheSync(ResourceVersionControllerName, ctx.Done(), rv.svmSynced) {
|
|
return
|
|
}
|
|
|
|
go wait.UntilWithContext(ctx, rv.worker, time.Second)
|
|
|
|
<-ctx.Done()
|
|
}
|
|
|
|
func (rv *ResourceVersionController) worker(ctx context.Context) {
|
|
for rv.processNext(ctx) {
|
|
}
|
|
}
|
|
|
|
func (rv *ResourceVersionController) processNext(ctx context.Context) bool {
|
|
eKey, quit := rv.queue.Get()
|
|
if quit {
|
|
return false
|
|
}
|
|
defer rv.queue.Done(eKey)
|
|
|
|
key := eKey.(string)
|
|
err := rv.sync(ctx, key)
|
|
if err == nil {
|
|
rv.queue.Forget(key)
|
|
return true
|
|
}
|
|
|
|
klog.FromContext(ctx).V(2).Info("Error syncing SVM resource, retrying", "svm", key, "err", err)
|
|
rv.queue.AddRateLimited(key)
|
|
|
|
return true
|
|
}
|
|
|
|
func (rv *ResourceVersionController) sync(ctx context.Context, key string) error {
|
|
logger := klog.FromContext(ctx)
|
|
startTime := time.Now()
|
|
|
|
// SVM is a cluster scoped resource so we don't care about the namespace
|
|
_, name, err := cache.SplitMetaNamespaceKey(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
svm, err := rv.svmListers.Get(name)
|
|
if apierrors.IsNotFound(err) {
|
|
// no work to do, don't fail and requeue
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// working with copy to avoid race condition between this and migration controller
|
|
toBeProcessedSVM := svm.DeepCopy()
|
|
gvr := getGVRFromResource(toBeProcessedSVM)
|
|
|
|
if IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded) || IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationFailed) {
|
|
logger.V(4).Info("Migration has already succeeded or failed previously, skipping", "svm", name)
|
|
return nil
|
|
}
|
|
|
|
if len(toBeProcessedSVM.Status.ResourceVersion) != 0 {
|
|
logger.V(4).Info("Resource version is already set", "svm", name)
|
|
return nil
|
|
}
|
|
|
|
exists, err := rv.resourceExists(gvr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !exists {
|
|
_, err = rv.kubeClient.StoragemigrationV1alpha1().
|
|
StorageVersionMigrations().
|
|
UpdateStatus(
|
|
ctx,
|
|
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
|
|
metav1.UpdateOptions{},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
toBeProcessedSVM.Status.ResourceVersion, err = rv.getLatestResourceVersion(gvr, ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = rv.kubeClient.StoragemigrationV1alpha1().
|
|
StorageVersionMigrations().
|
|
UpdateStatus(ctx, toBeProcessedSVM, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("error updating status for %s: %w", toBeProcessedSVM.Name, err)
|
|
}
|
|
|
|
logger.V(4).Info("Resource version has been successfully added", "svm", key, "elapsed", time.Since(startTime))
|
|
return nil
|
|
}
|
|
|
|
func (rv *ResourceVersionController) getLatestResourceVersion(gvr schema.GroupVersionResource, ctx context.Context) (string, error) {
|
|
isResourceNamespaceScoped, err := rv.isResourceNamespaceScoped(gvr)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
var randomList *metav1.PartialObjectMetadataList
|
|
if isResourceNamespaceScoped {
|
|
// get list resourceVersion from random non-existent namesapce for the given GVR
|
|
randomList, err = rv.metadataClient.Resource(gvr).
|
|
Namespace(fakeSVMNamespaceName).
|
|
List(ctx, metav1.ListOptions{
|
|
Limit: 1,
|
|
})
|
|
} else {
|
|
randomList, err = rv.metadataClient.Resource(gvr).
|
|
List(ctx, metav1.ListOptions{
|
|
Limit: 1,
|
|
})
|
|
}
|
|
if err != nil {
|
|
// error here is very abstract. adding additional context for better debugging
|
|
return "", fmt.Errorf("error getting latest resourceVersion for %s: %w", gvr.String(), err)
|
|
}
|
|
|
|
return randomList.GetResourceVersion(), err
|
|
}
|
|
|
|
func (rv *ResourceVersionController) resourceExists(gvr schema.GroupVersionResource) (bool, error) {
|
|
mapperGVRs, err := rv.mapper.ResourcesFor(gvr)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, mapperGVR := range mapperGVRs {
|
|
if mapperGVR.Group == gvr.Group &&
|
|
mapperGVR.Version == gvr.Version &&
|
|
mapperGVR.Resource == gvr.Resource {
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func (rv *ResourceVersionController) isResourceNamespaceScoped(gvr schema.GroupVersionResource) (bool, error) {
|
|
resourceList, err := rv.discoveryClient.ServerResourcesForGroupVersion(gvr.GroupVersion().String())
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, resource := range resourceList.APIResources {
|
|
if resource.Name == gvr.Resource {
|
|
return resource.Namespaced, nil
|
|
}
|
|
}
|
|
|
|
return false, fmt.Errorf("resource %q not found", gvr.String())
|
|
}
|