kubelet: assign Node as an owner for the ResourceSlice
Co-authored-by: Patrick Ohly <patrick.ohly@intel.com>
This commit is contained in:
		@@ -306,6 +306,10 @@ func TestGetResources(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getFakeNode() (*v1.Node, error) {
 | 
			
		||||
	return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "worker"}}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestPrepareResources(t *testing.T) {
 | 
			
		||||
	fakeKubeClient := fake.NewSimpleClientset()
 | 
			
		||||
 | 
			
		||||
@@ -760,7 +764,7 @@ func TestPrepareResources(t *testing.T) {
 | 
			
		||||
			}
 | 
			
		||||
			defer draServerInfo.teardownFn()
 | 
			
		||||
 | 
			
		||||
			plg := plugin.NewRegistrationHandler(nil, "worker")
 | 
			
		||||
			plg := plugin.NewRegistrationHandler(nil, getFakeNode)
 | 
			
		||||
			if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil {
 | 
			
		||||
				t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
 | 
			
		||||
			}
 | 
			
		||||
@@ -1060,7 +1064,7 @@ func TestUnprepareResources(t *testing.T) {
 | 
			
		||||
			}
 | 
			
		||||
			defer draServerInfo.teardownFn()
 | 
			
		||||
 | 
			
		||||
			plg := plugin.NewRegistrationHandler(nil, "worker")
 | 
			
		||||
			plg := plugin.NewRegistrationHandler(nil, getFakeNode)
 | 
			
		||||
			if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil {
 | 
			
		||||
				t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
 | 
			
		||||
			}
 | 
			
		||||
 
 | 
			
		||||
@@ -28,6 +28,7 @@ import (
 | 
			
		||||
	"google.golang.org/grpc/codes"
 | 
			
		||||
	"google.golang.org/grpc/status"
 | 
			
		||||
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	resourceapi "k8s.io/api/resource/v1alpha2"
 | 
			
		||||
	apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
@@ -39,6 +40,7 @@ import (
 | 
			
		||||
	"k8s.io/client-go/util/workqueue"
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
	drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
 | 
			
		||||
	"k8s.io/utils/ptr"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
@@ -52,7 +54,7 @@ const (
 | 
			
		||||
type nodeResourcesController struct {
 | 
			
		||||
	ctx        context.Context
 | 
			
		||||
	kubeClient kubernetes.Interface
 | 
			
		||||
	nodeName   string
 | 
			
		||||
	getNode    func() (*v1.Node, error)
 | 
			
		||||
	wg         sync.WaitGroup
 | 
			
		||||
	queue      workqueue.RateLimitingInterface
 | 
			
		||||
	sliceStore cache.Store
 | 
			
		||||
@@ -84,7 +86,7 @@ type activePlugin struct {
 | 
			
		||||
// the controller is inactive. This can happen when kubelet is run stand-alone
 | 
			
		||||
// without an apiserver. In that case we can't and don't need to publish
 | 
			
		||||
// ResourceSlices.
 | 
			
		||||
func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Interface, nodeName string) *nodeResourcesController {
 | 
			
		||||
func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *nodeResourcesController {
 | 
			
		||||
	if kubeClient == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
@@ -96,7 +98,7 @@ func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Int
 | 
			
		||||
	c := &nodeResourcesController{
 | 
			
		||||
		ctx:           ctx,
 | 
			
		||||
		kubeClient:    kubeClient,
 | 
			
		||||
		nodeName:      nodeName,
 | 
			
		||||
		getNode:       getNode,
 | 
			
		||||
		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_resource_slices"),
 | 
			
		||||
		activePlugins: make(map[string]*activePlugin),
 | 
			
		||||
	}
 | 
			
		||||
@@ -252,16 +254,29 @@ func (c *nodeResourcesController) run(ctx context.Context) {
 | 
			
		||||
	// For now syncing starts immediately, with no DeleteCollection. This
 | 
			
		||||
	// can be reconsidered later.
 | 
			
		||||
 | 
			
		||||
	// While kubelet starts up, there are errors:
 | 
			
		||||
	//      E0226 13:41:19.880621  126334 reflector.go:150] k8s.io/client-go@v0.0.0/tools/cache/reflector.go:232: Failed to watch *v1alpha2.ResourceSlice: failed to list *v1alpha2.ResourceSlice: resourceslices.resource.k8s.io is forbidden: User "system:anonymous" cannot list resource "resourceslices" in API group "resource.k8s.io" at the cluster scope
 | 
			
		||||
	//
 | 
			
		||||
	// The credentials used by kubeClient seem to get swapped out later,
 | 
			
		||||
	// because eventually these list calls succeed.
 | 
			
		||||
	// TODO (https://github.com/kubernetes/kubernetes/issues/123691): can we avoid these error log entries? Perhaps wait here?
 | 
			
		||||
	// Wait until we're able to get a Node object.
 | 
			
		||||
	// This means that the object is created on the API server,
 | 
			
		||||
	// the kubeclient is functional and the node informer cache is populated with the node object.
 | 
			
		||||
	// Without this it doesn't make sense to proceed further as we need a node name and
 | 
			
		||||
	// a node UID for this controller to work.
 | 
			
		||||
	var node *v1.Node
 | 
			
		||||
	var err error
 | 
			
		||||
	for {
 | 
			
		||||
		node, err = c.getNode()
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		logger.V(5).Info("Getting Node object failed, waiting", "err", err)
 | 
			
		||||
		select {
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			return
 | 
			
		||||
		case <-time.After(time.Second):
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// We could use an indexer on driver name, but that seems overkill.
 | 
			
		||||
	informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) {
 | 
			
		||||
		options.FieldSelector = "nodeName=" + c.nodeName
 | 
			
		||||
		options.FieldSelector = "nodeName=" + node.Name
 | 
			
		||||
	})
 | 
			
		||||
	c.sliceStore = informer.GetStore()
 | 
			
		||||
	handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
 | 
			
		||||
@@ -441,13 +456,29 @@ func (c *nodeResourcesController) sync(ctx context.Context, driverName string) e
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Although node name and UID are unlikely to change
 | 
			
		||||
		// we're getting updated node object just to be on the safe side.
 | 
			
		||||
		// It's a cheap operation as it gets an object from the node informer cache.
 | 
			
		||||
		node, err := c.getNode()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("retrieve node object: %w", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Create a new slice.
 | 
			
		||||
		slice := &resourceapi.ResourceSlice{
 | 
			
		||||
			ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
				GenerateName: c.nodeName + "-" + driverName + "-",
 | 
			
		||||
				// TODO (https://github.com/kubernetes/kubernetes/issues/123692): node object as owner
 | 
			
		||||
				GenerateName: node.Name + "-" + driverName + "-",
 | 
			
		||||
				OwnerReferences: []metav1.OwnerReference{
 | 
			
		||||
					{
 | 
			
		||||
						APIVersion: v1.SchemeGroupVersion.WithKind("Node").Version,
 | 
			
		||||
						Kind:       v1.SchemeGroupVersion.WithKind("Node").Kind,
 | 
			
		||||
						Name:       node.Name,
 | 
			
		||||
						UID:        node.UID,
 | 
			
		||||
						Controller: ptr.To(true),
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			NodeName:      c.nodeName,
 | 
			
		||||
			NodeName:      node.Name,
 | 
			
		||||
			DriverName:    driverName,
 | 
			
		||||
			ResourceModel: *resource,
 | 
			
		||||
		}
 | 
			
		||||
 
 | 
			
		||||
@@ -28,6 +28,7 @@ import (
 | 
			
		||||
	"google.golang.org/grpc"
 | 
			
		||||
	"google.golang.org/grpc/connectivity"
 | 
			
		||||
	"google.golang.org/grpc/credentials/insecure"
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	utilversion "k8s.io/apimachinery/pkg/util/version"
 | 
			
		||||
	"k8s.io/client-go/kubernetes"
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
@@ -104,12 +105,12 @@ type RegistrationHandler struct {
 | 
			
		||||
// Must only be called once per process because it manages global state.
 | 
			
		||||
// If a kubeClient is provided, then it synchronizes ResourceSlices
 | 
			
		||||
// with the resource information provided by plugins.
 | 
			
		||||
func NewRegistrationHandler(kubeClient kubernetes.Interface, nodeName string) *RegistrationHandler {
 | 
			
		||||
func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler {
 | 
			
		||||
	handler := &RegistrationHandler{}
 | 
			
		||||
 | 
			
		||||
	// If kubelet ever gets an API for stopping registration handlers, then
 | 
			
		||||
	// that would need to be hooked up with stopping the controller.
 | 
			
		||||
	handler.controller = startNodeResourcesController(context.TODO(), kubeClient, nodeName)
 | 
			
		||||
	handler.controller = startNodeResourcesController(context.TODO(), kubeClient, getNode)
 | 
			
		||||
 | 
			
		||||
	return handler
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -20,11 +20,17 @@ import (
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func getFakeNode() (*v1.Node, error) {
 | 
			
		||||
	return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "worker"}}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestRegistrationHandler_ValidatePlugin(t *testing.T) {
 | 
			
		||||
	newRegistrationHandler := func() *RegistrationHandler {
 | 
			
		||||
		return NewRegistrationHandler(nil, "worker")
 | 
			
		||||
		return NewRegistrationHandler(nil, getFakeNode)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, test := range []struct {
 | 
			
		||||
 
 | 
			
		||||
@@ -1557,7 +1557,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
 | 
			
		||||
	kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
 | 
			
		||||
	// Adding Registration Callback function for DRA Plugin
 | 
			
		||||
	if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
 | 
			
		||||
		kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.hostname)))
 | 
			
		||||
		kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.getNodeAnyWay)))
 | 
			
		||||
	}
 | 
			
		||||
	// Adding Registration Callback function for Device Manager
 | 
			
		||||
	kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
 | 
			
		||||
 
 | 
			
		||||
@@ -905,10 +905,23 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
 | 
			
		||||
				resourceClient := f.ClientSet.ResourceV1alpha2().ResourceSlices()
 | 
			
		||||
				var expectedObjects []any
 | 
			
		||||
				for _, nodeName := range nodes.NodeNames {
 | 
			
		||||
					node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
 | 
			
		||||
					framework.ExpectNoError(err, "get node")
 | 
			
		||||
					expectedObjects = append(expectedObjects,
 | 
			
		||||
						gstruct.MatchAllFields(gstruct.Fields{
 | 
			
		||||
							"TypeMeta":   gstruct.Ignore(),
 | 
			
		||||
							"ObjectMeta": gstruct.Ignore(), // TODO (https://github.com/kubernetes/kubernetes/issues/123692): validate ownerref
 | 
			
		||||
							"TypeMeta": gstruct.Ignore(),
 | 
			
		||||
							"ObjectMeta": gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{
 | 
			
		||||
								"OwnerReferences": gomega.ContainElements(
 | 
			
		||||
									gstruct.MatchAllFields(gstruct.Fields{
 | 
			
		||||
										"APIVersion":         gomega.Equal("v1"),
 | 
			
		||||
										"Kind":               gomega.Equal("Node"),
 | 
			
		||||
										"Name":               gomega.Equal(nodeName),
 | 
			
		||||
										"UID":                gomega.Equal(node.UID),
 | 
			
		||||
										"Controller":         gomega.Equal(ptr.To(true)),
 | 
			
		||||
										"BlockOwnerDeletion": gomega.BeNil(),
 | 
			
		||||
									}),
 | 
			
		||||
								),
 | 
			
		||||
							}),
 | 
			
		||||
							"NodeName":   gomega.Equal(nodeName),
 | 
			
		||||
							"DriverName": gomega.Equal(driver.Name),
 | 
			
		||||
							"ResourceModel": gomega.Equal(resourcev1alpha2.ResourceModel{NamedResources: &resourcev1alpha2.NamedResourcesResources{
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user