DRA: move ResourceSlice publishing into DRA drivers

This is a first step towards making kubelet independent of the resource.k8s.io
API versioning because it now doesn't need to copy structs defined by that API
from the driver to the API server. The next step is removing the other
direction (reading ResourceClaim status and passing the resource handle to
drivers).

The drivers must get deployed so that they have their own connection to the API
server. Securing at least the writes via a validating admission policy should
be possible.

As before, the kubelet removes all ResourceSlices for its node at startup, then
DRA drivers recreate them if (and only if) they start up again. This ensures
that there are no orphaned ResourceSlices when a driver gets removed while the
kubelet was down.

While at it, logging gets cleaned up and updated to use structured, contextual
logging as much as possible. gRPC requests and streams now use a shared,
per-process request ID and streams also get logged.
This commit is contained in:
Patrick Ohly
2024-04-11 16:20:34 +02:00
parent 8d814298bb
commit 616a014347
17 changed files with 1014 additions and 1309 deletions

View File

@@ -18,18 +18,28 @@ package plugin
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/klog/v2"
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
)
const PluginClientTimeout = 45 * time.Second
func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
// NewDRAPluginClient returns a wrapper around those gRPC methods of a DRA
// driver kubelet plugin which need to be called by kubelet. The wrapper
// handles gRPC connection management and logging. Connections are reused
// across different NewDRAPluginClient calls.
func NewDRAPluginClient(pluginName string) (*Plugin, error) {
if pluginName == "" {
return nil, fmt.Errorf("plugin name is empty")
}
@@ -42,13 +52,64 @@ func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
return existingPlugin, nil
}
func (p *plugin) NodePrepareResources(
type Plugin struct {
backgroundCtx context.Context
cancel func(cause error)
mutex sync.Mutex
conn *grpc.ClientConn
endpoint string
highestSupportedVersion *utilversion.Version
clientTimeout time.Duration
}
func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.conn != nil {
return p.conn, nil
}
ctx := p.backgroundCtx
logger := klog.FromContext(ctx)
network := "unix"
logger.V(4).Info("Creating new gRPC connection", "protocol", network, "endpoint", p.endpoint)
// grpc.Dial is deprecated. grpc.NewClient should be used instead.
// For now this gets ignored because this function is meant to establish
// the connection, with the one second timeout below. Perhaps that
// approach should be reconsidered?
//nolint:staticcheck
conn, err := grpc.Dial(
p.endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
}),
)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if ok := conn.WaitForStateChange(ctx, connectivity.Connecting); !ok {
return nil, errors.New("timed out waiting for gRPC connection to be ready")
}
p.conn = conn
return p.conn, nil
}
func (p *Plugin) NodePrepareResources(
ctx context.Context,
req *drapb.NodePrepareResourcesRequest,
opts ...grpc.CallOption,
) (*drapb.NodePrepareResourcesResponse, error) {
logger := klog.FromContext(ctx)
logger.V(4).Info(log("calling NodePrepareResources rpc"), "request", req)
logger.V(4).Info("Calling NodePrepareResources rpc", "request", req)
conn, err := p.getOrCreateGRPCConn()
if err != nil {
@@ -60,17 +121,17 @@ func (p *plugin) NodePrepareResources(
nodeClient := drapb.NewNodeClient(conn)
response, err := nodeClient.NodePrepareResources(ctx, req)
logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", response, "err", err)
logger.V(4).Info("Done calling NodePrepareResources rpc", "response", response, "err", err)
return response, err
}
func (p *plugin) NodeUnprepareResources(
func (p *Plugin) NodeUnprepareResources(
ctx context.Context,
req *drapb.NodeUnprepareResourcesRequest,
opts ...grpc.CallOption,
) (*drapb.NodeUnprepareResourcesResponse, error) {
logger := klog.FromContext(ctx)
logger.V(4).Info(log("calling NodeUnprepareResource rpc"), "request", req)
logger.V(4).Info("Calling NodeUnprepareResource rpc", "request", req)
conn, err := p.getOrCreateGRPCConn()
if err != nil {
@@ -82,23 +143,6 @@ func (p *plugin) NodeUnprepareResources(
nodeClient := drapb.NewNodeClient(conn)
response, err := nodeClient.NodeUnprepareResources(ctx, req)
logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", response, "err", err)
logger.V(4).Info("Done calling NodeUnprepareResources rpc", "response", response, "err", err)
return response, err
}
func (p *plugin) NodeListAndWatchResources(
ctx context.Context,
req *drapb.NodeListAndWatchResourcesRequest,
opts ...grpc.CallOption,
) (drapb.Node_NodeListAndWatchResourcesClient, error) {
logger := klog.FromContext(ctx)
logger.V(4).Info(log("calling NodeListAndWatchResources rpc"), "request", req)
conn, err := p.getOrCreateGRPCConn()
if err != nil {
return nil, err
}
nodeClient := drapb.NewNodeClient(conn)
return nodeClient.NodeListAndWatchResources(ctx, req, opts...)
}

View File

@@ -28,6 +28,11 @@ import (
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
drapbv1alpha3 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
"k8s.io/kubernetes/test/utils/ktesting"
)
const (
v1alpha3Version = "v1alpha3"
)
type fakeV1alpha3GRPCServer struct {
@@ -45,16 +50,6 @@ func (f *fakeV1alpha3GRPCServer) NodeUnprepareResources(ctx context.Context, in
return &drapbv1alpha3.NodeUnprepareResourcesResponse{}, nil
}
func (f *fakeV1alpha3GRPCServer) NodeListAndWatchResources(req *drapbv1alpha3.NodeListAndWatchResourcesRequest, srv drapbv1alpha3.Node_NodeListAndWatchResourcesServer) error {
if err := srv.Send(&drapbv1alpha3.NodeListAndWatchResourcesResponse{}); err != nil {
return err
}
if err := srv.Send(&drapbv1alpha3.NodeListAndWatchResourcesResponse{}); err != nil {
return err
}
return nil
}
type tearDown func()
func setupFakeGRPCServer(version string) (string, tearDown, error) {
@@ -95,6 +90,7 @@ func setupFakeGRPCServer(version string) (string, tearDown, error) {
}
func TestGRPCConnIsReused(t *testing.T) {
ctx := ktesting.Init(t)
addr, teardown, err := setupFakeGRPCServer(v1alpha3Version)
if err != nil {
t.Fatal(err)
@@ -105,8 +101,9 @@ func TestGRPCConnIsReused(t *testing.T) {
wg := sync.WaitGroup{}
m := sync.Mutex{}
p := &plugin{
endpoint: addr,
p := &Plugin{
backgroundCtx: ctx,
endpoint: addr,
}
conn, err := p.getOrCreateGRPCConn()
@@ -147,9 +144,9 @@ func TestGRPCConnIsReused(t *testing.T) {
}
client.NodePrepareResources(context.TODO(), req)
client.(*plugin).Lock()
conn := client.(*plugin).conn
client.(*plugin).Unlock()
client.mutex.Lock()
conn := client.conn
client.mutex.Unlock()
m.Lock()
defer m.Unlock()
@@ -193,7 +190,7 @@ func TestNewDRAPluginClient(t *testing.T) {
{
description: "plugin exists",
setup: func(name string) tearDown {
draPlugins.add(name, &plugin{})
draPlugins.add(name, &Plugin{})
return func() {
draPlugins.delete(name)
}
@@ -232,13 +229,15 @@ func TestNodeUnprepareResources(t *testing.T) {
},
} {
t.Run(test.description, func(t *testing.T) {
ctx := ktesting.Init(t)
addr, teardown, err := setupFakeGRPCServer(test.serverVersion)
if err != nil {
t.Fatal(err)
}
defer teardown()
p := &plugin{
p := &Plugin{
backgroundCtx: ctx,
endpoint: addr,
clientTimeout: PluginClientTimeout,
}
@@ -269,74 +268,3 @@ func TestNodeUnprepareResources(t *testing.T) {
})
}
}
func TestListAndWatchResources(t *testing.T) {
for _, test := range []struct {
description string
serverSetup func(string) (string, tearDown, error)
serverVersion string
request *drapbv1alpha3.NodeListAndWatchResourcesRequest
responses []*drapbv1alpha3.NodeListAndWatchResourcesResponse
expectError string
}{
{
description: "server supports NodeResources API",
serverSetup: setupFakeGRPCServer,
serverVersion: v1alpha3Version,
request: &drapbv1alpha3.NodeListAndWatchResourcesRequest{},
responses: []*drapbv1alpha3.NodeListAndWatchResourcesResponse{
{},
{},
},
expectError: "EOF",
},
} {
t.Run(test.description, func(t *testing.T) {
addr, teardown, err := setupFakeGRPCServer(test.serverVersion)
if err != nil {
t.Fatal(err)
}
defer teardown()
p := &plugin{
endpoint: addr,
}
conn, err := p.getOrCreateGRPCConn()
defer func() {
err := conn.Close()
if err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}
draPlugins.add("dummy-plugin", p)
defer draPlugins.delete("dummy-plugin")
client, err := NewDRAPluginClient("dummy-plugin")
if err != nil {
t.Fatal(err)
}
stream, err := client.NodeListAndWatchResources(context.Background(), test.request)
if err != nil {
t.Fatal(err)
}
var actualResponses []*drapbv1alpha3.NodeListAndWatchResourcesResponse
var actualErr error
for {
resp, err := stream.Recv()
if err != nil {
actualErr = err
break
}
actualResponses = append(actualResponses, resp)
}
assert.Equal(t, test.responses, actualResponses)
assert.Contains(t, actualErr.Error(), test.expectError)
})
}
}

View File

@@ -1,520 +0,0 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha2"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
resourceinformers "k8s.io/client-go/informers/resource/v1alpha2"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
"k8s.io/utils/ptr"
)
const (
// resyncPeriod for informer
// TODO (https://github.com/kubernetes/kubernetes/issues/123688): disable?
resyncPeriod = time.Duration(10 * time.Minute)
retryPeriod = 5 * time.Second
maxRetryPeriod = 180 * time.Second
backoffFactor = 2.0 // Introduce a backoff multiplier as jitter factor
)
// nodeResourcesController collects resource information from all registered
// plugins and synchronizes that information with ResourceSlice objects.
type nodeResourcesController struct {
ctx context.Context
kubeClient kubernetes.Interface
getNode func() (*v1.Node, error)
wg sync.WaitGroup
queue workqueue.TypedRateLimitingInterface[string]
sliceStore cache.Store
mutex sync.RWMutex
activePlugins map[string]*activePlugin
}
// activePlugin holds the resource information about one plugin
// and the gRPC stream that is used to retrieve that. The context
// used by that stream can be canceled separately to stop
// the monitoring.
type activePlugin struct {
// cancel is the function which cancels the monitorPlugin goroutine
// for this plugin.
cancel func(reason error)
// resources is protected by the nodeResourcesController read/write lock.
// When receiving updates from the driver, the entire slice gets replaced,
// so it is okay to not do a deep copy of it. Only retrieving the slice
// must be protected by a read lock.
resources []*resourceapi.ResourceModel
}
// startNodeResourcesController constructs a new controller and starts it.
//
// If a kubeClient is provided, then it synchronizes ResourceSlices
// with the resource information provided by plugins. Without it,
// the controller is inactive. This can happen when kubelet is run stand-alone
// without an apiserver. In that case we can't and don't need to publish
// ResourceSlices.
func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *nodeResourcesController {
if kubeClient == nil {
return nil
}
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "node resources controller")
ctx = klog.NewContext(ctx, logger)
c := &nodeResourcesController{
ctx: ctx,
kubeClient: kubeClient,
getNode: getNode,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"},
),
activePlugins: make(map[string]*activePlugin),
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.run(ctx)
}()
return c
}
// waitForStop blocks until all background activity spawned by
// the controller has stopped. The context passed to start must
// be canceled for that to happen.
//
// Not needed at the moment, but if it was, this is what it would
// look like...
// func (c *nodeResourcesController) waitForStop() {
// if c == nil {
// return
// }
//
// c.wg.Wait()
// }
// addPlugin is called whenever a plugin has been (re-)registered.
func (c *nodeResourcesController) addPlugin(driverName string, pluginInstance *plugin) {
if c == nil {
return
}
klog.FromContext(c.ctx).V(2).Info("Adding plugin", "driverName", driverName)
c.mutex.Lock()
defer c.mutex.Unlock()
if active := c.activePlugins[driverName]; active != nil {
active.cancel(errors.New("plugin has re-registered"))
}
active := &activePlugin{}
cancelCtx, cancel := context.WithCancelCause(c.ctx)
active.cancel = cancel
c.activePlugins[driverName] = active
c.queue.Add(driverName)
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.monitorPlugin(cancelCtx, active, driverName, pluginInstance)
}()
}
// removePlugin is called whenever a plugin has been unregistered.
func (c *nodeResourcesController) removePlugin(driverName string) {
if c == nil {
return
}
klog.FromContext(c.ctx).V(2).Info("Removing plugin", "driverName", driverName)
c.mutex.Lock()
defer c.mutex.Unlock()
if active, ok := c.activePlugins[driverName]; ok {
active.cancel(errors.New("plugin has unregistered"))
delete(c.activePlugins, driverName)
c.queue.Add(driverName)
}
}
// monitorPlugin calls the plugin to retrieve resource information and caches
// all responses that it gets for processing in the sync method. It keeps
// retrying until an error or EOF response indicates that no further data is
// going to be sent, then watch resources of the plugin stops until it
// re-registers.
func (c *nodeResourcesController) monitorPlugin(ctx context.Context, active *activePlugin, driverName string, pluginInstance *plugin) {
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "driverName", driverName)
logger.Info("Starting to monitor node resources of the plugin")
defer func() {
r := recover()
logger.Info("Stopping to monitor node resources of the plugin", "reason", context.Cause(ctx), "err", ctx.Err(), "recover", r)
}()
backOff := flowcontrol.NewBackOffWithJitter(retryPeriod, maxRetryPeriod, backoffFactor)
backOffID := "retry"
// Keep trying until canceled.
for ctx.Err() == nil {
logger.V(5).Info("Calling NodeListAndWatchResources")
stream, err := pluginInstance.NodeListAndWatchResources(ctx, new(drapb.NodeListAndWatchResourcesRequest))
if err != nil {
switch {
case status.Convert(err).Code() == codes.Unimplemented:
// The plugin simply doesn't provide node resources.
active.cancel(errors.New("plugin does not support node resource reporting"))
default:
// This is a problem, report it and retry.
logger.Error(err, "Creating gRPC stream for node resources failed")
select {
case <-time.After(backOff.Get(backOffID)):
backOff.Next(backOffID, time.Now())
case <-ctx.Done():
}
}
continue
}
for {
response, err := stream.Recv()
if err != nil {
switch {
case errors.Is(err, io.EOF):
// This is okay. Some plugins might never change their
// resources after reporting them once.
active.cancel(errors.New("plugin has closed the stream"))
case status.Convert(err).Code() == codes.Unimplemented:
// The plugin has the method, does not really implement it.
active.cancel(errors.New("plugin does not support node resource reporting"))
case ctx.Err() == nil:
// This is a problem, report it and retry.
logger.Error(err, "Reading node resources from gRPC stream failed")
select {
case <-time.After(backOff.Get(backOffID)):
backOff.Next(backOffID, time.Now())
case <-ctx.Done():
}
}
break
}
if loggerV := logger.V(6); loggerV.Enabled() {
loggerV.Info("Driver resources updated", "resources", response.Resources)
} else {
logger.V(5).Info("Driver resources updated", "numResources", len(response.Resources))
}
c.mutex.Lock()
active.resources = response.Resources
c.mutex.Unlock()
c.queue.Add(driverName)
}
}
}
// run is running in the background. It handles blocking initialization (like
// syncing the informer) and then syncs the actual with the desired state.
func (c *nodeResourcesController) run(ctx context.Context) {
logger := klog.FromContext(ctx)
// When kubelet starts, we have two choices:
// - Sync immediately, which in practice will delete all ResourceSlices
// because no plugin has registered yet. We could do a DeleteCollection
// to speed this up.
// - Wait a bit, then sync. If all plugins have re-registered in the meantime,
// we might not need to change any ResourceSlice.
//
// For now syncing starts immediately, with no DeleteCollection. This
// can be reconsidered later.
// Wait until we're able to get a Node object.
// This means that the object is created on the API server,
// the kubeclient is functional and the node informer cache is populated with the node object.
// Without this it doesn't make sense to proceed further as we need a node name and
// a node UID for this controller to work.
var node *v1.Node
var err error
for {
node, err = c.getNode()
if err == nil {
break
}
logger.V(5).Info("Getting Node object failed, waiting", "err", err)
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
}
}
// We could use an indexer on driver name, but that seems overkill.
informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) {
options.FieldSelector = "nodeName=" + node.Name
})
c.sliceStore = informer.GetStore()
handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
return
}
logger.V(5).Info("ResourceSlice add", "slice", klog.KObj(slice))
c.queue.Add(slice.DriverName)
},
UpdateFunc: func(old, new any) {
oldSlice, ok := old.(*resourceapi.ResourceSlice)
if !ok {
return
}
newSlice, ok := new.(*resourceapi.ResourceSlice)
if !ok {
return
}
if loggerV := logger.V(6); loggerV.Enabled() {
loggerV.Info("ResourceSlice update", "slice", klog.KObj(newSlice), "diff", cmp.Diff(oldSlice, newSlice))
} else {
logger.V(5).Info("ResourceSlice update", "slice", klog.KObj(newSlice))
}
c.queue.Add(newSlice.DriverName)
},
DeleteFunc: func(obj any) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
return
}
logger.V(5).Info("ResourceSlice delete", "slice", klog.KObj(slice))
c.queue.Add(slice.DriverName)
},
})
if err != nil {
logger.Error(err, "Registering event handler on the ResourceSlice informer failed, disabling resource monitoring")
return
}
// Start informer and wait for our cache to be populated.
c.wg.Add(1)
go func() {
defer c.wg.Done()
informer.Run(ctx.Done())
}()
for !handler.HasSynced() {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
}
logger.Info("ResourceSlice informer has synced")
for c.processNextWorkItem(ctx) {
}
}
func (c *nodeResourcesController) processNextWorkItem(ctx context.Context) bool {
key, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(key)
driverName := key
// Panics are caught and treated like errors.
var err error
func() {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("internal error: %v", r)
}
}()
err = c.sync(ctx, driverName)
}()
if err != nil {
// TODO (https://github.com/kubernetes/enhancements/issues/3077): contextual logging in utilruntime
utilruntime.HandleError(fmt.Errorf("processing driver %v: %v", driverName, err))
c.queue.AddRateLimited(key)
// Return without removing the work item from the queue.
// It will be retried.
return true
}
c.queue.Forget(key)
return true
}
func (c *nodeResourcesController) sync(ctx context.Context, driverName string) error {
logger := klog.FromContext(ctx)
// Gather information about the actual and desired state.
slices := c.sliceStore.List()
var driverResources []*resourceapi.ResourceModel
c.mutex.RLock()
if active, ok := c.activePlugins[driverName]; ok {
// No need for a deep copy, the entire slice gets replaced on writes.
driverResources = active.resources
}
c.mutex.RUnlock()
// Resources that are not yet stored in any slice need to be published.
// Here we track the indices of any resources that are already stored.
storedResourceIndices := sets.New[int]()
// Slices that don't match any driver resource can either be updated (if there
// are new driver resources that need to be stored) or they need to be deleted.
obsoleteSlices := make([]*resourceapi.ResourceSlice, 0, len(slices))
// Match slices with resource information.
for _, obj := range slices {
slice := obj.(*resourceapi.ResourceSlice)
if slice.DriverName != driverName {
continue
}
index := indexOfModel(driverResources, &slice.ResourceModel)
if index >= 0 {
storedResourceIndices.Insert(index)
continue
}
obsoleteSlices = append(obsoleteSlices, slice)
}
if loggerV := logger.V(6); loggerV.Enabled() {
// Dump entire resource information.
loggerV.Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "resources", driverResources)
} else {
logger.V(5).Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "numResources", len(driverResources))
}
// Update stale slices before removing what's left.
//
// We don't really know which of these slices might have
// been used for "the" driver resource because they don't
// have a unique ID. In practice, a driver is most likely
// to just give us one ResourceModel, in which case
// this isn't a problem at all. If we have more than one,
// then at least conceptually it currently doesn't matter
// where we publish it.
//
// The long-term goal is to move the handling of
// ResourceSlice objects into the driver, with kubelet
// just acting as a REST proxy. The advantage of that will
// be that kubelet won't need to support the same
// resource API version as the driver and the control plane.
// With that approach, the driver will be able to match
// up objects more intelligently.
numObsoleteSlices := len(obsoleteSlices)
for index, resource := range driverResources {
if storedResourceIndices.Has(index) {
// No need to do anything, it is already stored exactly
// like this in an existing slice.
continue
}
if numObsoleteSlices > 0 {
// Update one existing slice.
slice := obsoleteSlices[numObsoleteSlices-1]
numObsoleteSlices--
slice = slice.DeepCopy()
slice.ResourceModel = *resource
logger.V(5).Info("Reusing existing node resource slice", "slice", klog.KObj(slice))
if _, err := c.kubeClient.ResourceV1alpha2().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("update node resource slice: %w", err)
}
continue
}
// Although node name and UID are unlikely to change
// we're getting updated node object just to be on the safe side.
// It's a cheap operation as it gets an object from the node informer cache.
node, err := c.getNode()
if err != nil {
return fmt.Errorf("retrieve node object: %w", err)
}
// Create a new slice.
slice := &resourceapi.ResourceSlice{
ObjectMeta: metav1.ObjectMeta{
GenerateName: node.Name + "-" + driverName + "-",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: v1.SchemeGroupVersion.WithKind("Node").Version,
Kind: v1.SchemeGroupVersion.WithKind("Node").Kind,
Name: node.Name,
UID: node.UID,
Controller: ptr.To(true),
},
},
},
NodeName: node.Name,
DriverName: driverName,
ResourceModel: *resource,
}
logger.V(5).Info("Creating new node resource slice", "slice", klog.KObj(slice))
if _, err := c.kubeClient.ResourceV1alpha2().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("create node resource slice: %w", err)
}
}
// All remaining slices are truly orphaned.
for i := 0; i < numObsoleteSlices; i++ {
slice := obsoleteSlices[i]
logger.V(5).Info("Deleting obsolete node resource slice", "slice", klog.KObj(slice))
if err := c.kubeClient.ResourceV1alpha2().ResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("delete node resource slice: %w", err)
}
}
return nil
}
func indexOfModel(models []*resourceapi.ResourceModel, model *resourceapi.ResourceModel) int {
for index, m := range models {
if apiequality.Semantic.DeepEqual(m, model) {
return index
}
}
return -1
}

View File

@@ -20,94 +20,125 @@ import (
"context"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
)
const (
// DRAPluginName is the name of the in-tree DRA Plugin.
DRAPluginName = "kubernetes.io/dra"
v1alpha3Version = "v1alpha3"
)
// Plugin is a description of a DRA Plugin, defined by an endpoint.
type plugin struct {
sync.Mutex
conn *grpc.ClientConn
endpoint string
highestSupportedVersion *utilversion.Version
clientTimeout time.Duration
}
func (p *plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
p.Lock()
defer p.Unlock()
if p.conn != nil {
return p.conn, nil
}
network := "unix"
klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", p.endpoint)
conn, err := grpc.Dial(
p.endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
}),
)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if ok := conn.WaitForStateChange(ctx, connectivity.Connecting); !ok {
return nil, errors.New("timed out waiting for gRPC connection to be ready")
}
p.conn = conn
return p.conn, nil
}
// RegistrationHandler is the handler which is fed to the pluginwatcher API.
type RegistrationHandler struct {
controller *nodeResourcesController
// backgroundCtx is used for all future activities of the handler.
// This is necessary because it implements APIs which don't
// provide a context.
backgroundCtx context.Context
kubeClient kubernetes.Interface
getNode func() (*v1.Node, error)
}
var _ cache.PluginHandler = &RegistrationHandler{}
// NewPluginHandler returns new registration handler.
//
// Must only be called once per process because it manages global state.
// If a kubeClient is provided, then it synchronizes ResourceSlices
// with the resource information provided by plugins.
func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler {
handler := &RegistrationHandler{}
handler := &RegistrationHandler{
// The context and thus logger should come from the caller.
backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")),
kubeClient: kubeClient,
getNode: getNode,
}
// If kubelet ever gets an API for stopping registration handlers, then
// that would need to be hooked up with stopping the controller.
handler.controller = startNodeResourcesController(context.TODO(), kubeClient, getNode)
// When kubelet starts up, no DRA driver has registered yet. None of
// the drivers are usable until they come back, which might not happen
// at all. Therefore it is better to not advertise any local resources
// because pods could get stuck on the node waiting for the driver
// to start up.
//
// This has to run in the background.
go handler.wipeResourceSlices("")
return handler
}
// wipeResourceSlices deletes ResourceSlices of the node, optionally just for a specific driver.
func (h *RegistrationHandler) wipeResourceSlices(pluginName string) {
if h.kubeClient == nil {
return
}
ctx := h.backgroundCtx
logger := klog.FromContext(ctx)
backoff := wait.Backoff{
Duration: time.Second,
Factor: 2,
Jitter: 0.2,
Cap: 5 * time.Minute,
Steps: 100,
}
// Error logging is done inside the loop. Context cancellation doesn't get logged.
_ = wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
node, err := h.getNode()
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
logger.Error(err, "Unexpected error checking for node")
return false, nil
}
fieldSelector := fields.Set{"nodeName": node.Name}
if pluginName != "" {
fieldSelector["driverName"] = pluginName
}
err = h.kubeClient.ResourceV1alpha2().ResourceSlices().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{FieldSelector: fieldSelector.String()})
switch {
case err == nil:
logger.V(3).Info("Deleted ResourceSlices", "fieldSelector", fieldSelector)
return true, nil
case apierrors.IsUnauthorized(err):
// This can happen while kubelet is still figuring out
// its credentials.
logger.V(5).Info("Deleting ResourceSlice failed, retrying", "fieldSelector", fieldSelector, "err", err)
return false, nil
default:
// Log and retry for other errors.
logger.V(3).Info("Deleting ResourceSlice failed, retrying", "fieldSelector", fieldSelector, "err", err)
return false, nil
}
})
}
// RegisterPlugin is called when a plugin can be registered.
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
klog.InfoS("Register new DRA plugin", "name", pluginName, "endpoint", endpoint)
// Prepare a context with its own logger for the plugin.
//
// The lifecycle of the plugin's background activities is tied to our
// root context, so canceling that will also cancel the plugin.
//
// The logger injects the plugin name as additional value
// into all log output related to the plugin.
ctx := h.backgroundCtx
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "pluginName", pluginName)
ctx = klog.NewContext(ctx, logger)
highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, versions)
logger.V(3).Info("Register new DRA plugin", "endpoint", endpoint)
highestSupportedVersion, err := h.validateVersions(pluginName, versions)
if err != nil {
return err
return fmt.Errorf("version check of plugin %s failed: %w", pluginName, err)
}
var timeout time.Duration
@@ -117,7 +148,11 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
timeout = *pluginClientTimeout
}
pluginInstance := &plugin{
ctx, cancel := context.WithCancelCause(ctx)
pluginInstance := &Plugin{
backgroundCtx: ctx,
cancel: cancel,
conn: nil,
endpoint: endpoint,
highestSupportedVersion: highestSupportedVersion,
@@ -126,40 +161,27 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
// all other DRA components will be able to get the actual socket of DRA plugins by its name.
// By default we assume the supported plugin version is v1alpha3
draPlugins.add(pluginName, pluginInstance)
h.controller.addPlugin(pluginName, pluginInstance)
if draPlugins.add(pluginName, pluginInstance) {
logger.V(1).Info("Already registered, previous plugin was replaced")
}
return nil
}
func (h *RegistrationHandler) validateVersions(
callerName string,
pluginName string,
versions []string,
) (*utilversion.Version, error) {
if len(versions) == 0 {
return nil, errors.New(
log(
"%s for DRA plugin %q failed. Plugin returned an empty list for supported versions",
callerName,
pluginName,
),
)
return nil, errors.New("empty list for supported versions")
}
// Validate version
newPluginHighestVersion, err := utilversion.HighestSupportedVersion(versions)
if err != nil {
return nil, errors.New(
log(
"%s for DRA plugin %q failed. None of the versions specified %q are supported. err=%v",
callerName,
pluginName,
versions,
err,
),
)
// HighestSupportedVersion includes the list of versions in its error
// if relevant, no need to repeat it here.
return nil, fmt.Errorf("none of the versions are supported: %w", err)
}
existingPlugin := draPlugins.get(pluginName)
@@ -169,26 +191,26 @@ func (h *RegistrationHandler) validateVersions(
if existingPlugin.highestSupportedVersion.LessThan(newPluginHighestVersion) {
return newPluginHighestVersion, nil
}
return nil, errors.New(
log(
"%s for DRA plugin %q failed. Another plugin with the same name is already registered with a higher supported version: %q",
callerName,
pluginName,
existingPlugin.highestSupportedVersion,
),
)
}
func deregisterPlugin(pluginName string) {
draPlugins.delete(pluginName)
return nil, fmt.Errorf("another plugin instance is already registered with a higher supported version: %q < %q", newPluginHighestVersion, existingPlugin.highestSupportedVersion)
}
// DeRegisterPlugin is called when a plugin has removed its socket,
// signaling it is no longer available.
func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
klog.InfoS("DeRegister DRA plugin", "name", pluginName)
deregisterPlugin(pluginName)
h.controller.removePlugin(pluginName)
if p := draPlugins.delete(pluginName); p != nil {
logger := klog.FromContext(p.backgroundCtx)
logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint)
// Clean up the ResourceSlices for the deleted Plugin since it
// may have died without doing so itself and might never come
// back.
go h.wipeResourceSlices(pluginName)
return
}
logger := klog.FromContext(h.backgroundCtx)
logger.V(3).Info("Deregister DRA plugin not necessary, was already removed")
}
// ValidatePlugin is called by kubelet's plugin watcher upon detection
@@ -196,15 +218,10 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
klog.InfoS("Validate DRA plugin", "name", pluginName, "endpoint", endpoint, "versions", strings.Join(versions, ","))
_, err := h.validateVersions("ValidatePlugin", pluginName, versions)
_, err := h.validateVersions(pluginName, versions)
if err != nil {
return fmt.Errorf("validation failed for DRA plugin %s at endpoint %s: %+v", pluginName, endpoint, err)
return fmt.Errorf("invalid versions of plugin %s: %w", pluginName, err)
}
return err
}
// log prepends log string with `kubernetes.io/dra`.
func log(msg string, parts ...interface{}) string {
return fmt.Sprintf(fmt.Sprintf("%s: %s", DRAPluginName, msg), parts...)
}

View File

@@ -17,15 +17,14 @@ limitations under the License.
package plugin
import (
"errors"
"sync"
"k8s.io/klog/v2"
)
// PluginsStore holds a list of DRA Plugins.
type pluginsStore struct {
sync.RWMutex
store map[string]*plugin
store map[string]*Plugin
}
// draPlugins map keeps track of all registered DRA plugins on the node
@@ -34,7 +33,7 @@ var draPlugins = &pluginsStore{}
// Get lets you retrieve a DRA Plugin by name.
// This method is protected by a mutex.
func (s *pluginsStore) get(pluginName string) *plugin {
func (s *pluginsStore) get(pluginName string) *Plugin {
s.RLock()
defer s.RUnlock()
@@ -43,26 +42,33 @@ func (s *pluginsStore) get(pluginName string) *plugin {
// Set lets you save a DRA Plugin to the list and give it a specific name.
// This method is protected by a mutex.
func (s *pluginsStore) add(pluginName string, p *plugin) {
func (s *pluginsStore) add(pluginName string, p *Plugin) (replaced bool) {
s.Lock()
defer s.Unlock()
if s.store == nil {
s.store = make(map[string]*plugin)
s.store = make(map[string]*Plugin)
}
_, exists := s.store[pluginName]
if exists {
klog.V(1).InfoS(log("plugin: %s already registered, previous plugin will be overridden", pluginName))
}
s.store[pluginName] = p
return exists
}
// Delete lets you delete a DRA Plugin by name.
// This method is protected by a mutex.
func (s *pluginsStore) delete(pluginName string) {
func (s *pluginsStore) delete(pluginName string) *Plugin {
s.Lock()
defer s.Unlock()
p, exists := s.store[pluginName]
if !exists {
return nil
}
if p.cancel != nil {
p.cancel(errors.New("plugin got removed"))
}
delete(s.store, pluginName)
return p
}

View File

@@ -17,13 +17,19 @@ limitations under the License.
package kubeletplugin
import (
"context"
"errors"
"fmt"
"net"
"sync"
"google.golang.org/grpc"
"k8s.io/klog/v2"
resourceapi "k8s.io/api/resource/v1alpha2"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/resourceslice"
drapbv1alpha3 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
)
@@ -39,6 +45,18 @@ type DRAPlugin interface {
// received yet.
RegistrationStatus() *registerapi.RegistrationStatus
// PublishResources may be called one or more times to publish
// resource information in ResourceSlice objects. If it never gets
// called, then the kubelet plugin does not manage any ResourceSlice
// objects.
//
// PublishResources does not block, so it might still take a while
// after it returns before all information is actually written
// to the API server.
//
// The caller must not modify the content of the slice.
PublishResources(ctx context.Context, nodeResources []*resourceapi.ResourceModel)
// This unexported method ensures that we can modify the interface
// without causing an API break of the package
// (https://pkg.go.dev/golang.org/x/exp/apidiff#section-readme).
@@ -57,14 +75,6 @@ func DriverName(driverName string) Option {
}
}
// Logger overrides the default klog.Background logger.
func Logger(logger klog.Logger) Option {
return func(o *options) error {
o.logger = logger
return nil
}
}
// GRPCVerbosity sets the verbosity for logging gRPC calls. Default is 4. A negative
// value disables logging.
func GRPCVerbosity(level int) Option {
@@ -162,15 +172,50 @@ func NodeV1alpha3(enabled bool) Option {
}
}
// KubeClient grants the plugin access to the API server. This is needed
// for syncing ResourceSlice objects. It's the responsibility of the DRA driver
// developer to ensure that this client has permission to read, write,
// patch and list such objects. It also needs permission to read node objects.
// Ideally, a validating admission policy should be used to limit write
// access to ResourceSlices which belong to the node.
func KubeClient(kubeClient kubernetes.Interface) Option {
return func(o *options) error {
o.kubeClient = kubeClient
return nil
}
}
// NodeName tells the plugin on which node it is running. This is needed for
// syncing ResourceSlice objects.
func NodeName(nodeName string) Option {
return func(o *options) error {
o.nodeName = nodeName
return nil
}
}
// NodeUID tells the plugin the UID of the v1.Node object. This is used
// when syncing ResourceSlice objects, but doesn't have to be used. If
// not supplied, the controller will look up the object once.
func NodeUID(nodeUID types.UID) Option {
return func(o *options) error {
o.nodeUID = nodeUID
return nil
}
}
type options struct {
logger klog.Logger
grpcVerbosity int
driverName string
nodeName string
nodeUID types.UID
draEndpoint endpoint
draAddress string
pluginRegistrationEndpoint endpoint
unaryInterceptors []grpc.UnaryServerInterceptor
streamInterceptors []grpc.StreamServerInterceptor
kubeClient kubernetes.Interface
nodeV1alpha3 bool
}
@@ -178,18 +223,36 @@ type options struct {
// draPlugin combines the kubelet registration service and the DRA node plugin
// service.
type draPlugin struct {
registrar *nodeRegistrar
plugin *grpcServer
// backgroundCtx is for activities that are started later.
backgroundCtx context.Context
// cancel cancels the backgroundCtx.
cancel func(cause error)
wg sync.WaitGroup
registrar *nodeRegistrar
plugin *grpcServer
driverName string
nodeName string
nodeUID types.UID
kubeClient kubernetes.Interface
// Information about resource publishing changes concurrently and thus
// must be protected by the mutex. The controller gets started only
// if needed.
mutex sync.Mutex
resourceSliceController *resourceslice.Controller
}
// Start sets up two gRPC servers (one for registration, one for the DRA node
// client). By default, all APIs implemented by the nodeServer get registered.
func Start(nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr error) {
d := &draPlugin{}
//
// The context and/or DRAPlugin.Stop can be used to stop all background activity.
// Stop also blocks. A logger can be stored in the context to add values or
// a name to all log entries.
func Start(ctx context.Context, nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr error) {
logger := klog.FromContext(ctx)
o := options{
logger: klog.Background(),
grpcVerbosity: 4,
grpcVerbosity: 6, // Logs requests and responses, which can be large.
nodeV1alpha3: true,
}
for _, option := range opts {
@@ -212,11 +275,41 @@ func Start(nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr e
return nil, errors.New("a Unix domain socket path and/or listener must be set for the registrar")
}
d := &draPlugin{
driverName: o.driverName,
nodeName: o.nodeName,
nodeUID: o.nodeUID,
kubeClient: o.kubeClient,
}
// Stop calls cancel and therefore both cancellation
// and Stop cause goroutines to stop.
ctx, cancel := context.WithCancelCause(ctx)
d.backgroundCtx, d.cancel = ctx, cancel
logger.V(3).Info("Starting")
d.wg.Add(1)
go func() {
defer d.wg.Done()
defer logger.V(3).Info("Stopping")
<-ctx.Done()
}()
// Clean up if we don't finish succcessfully.
defer func() {
if r := recover(); r != nil {
d.Stop()
panic(r)
}
if finalErr != nil {
d.Stop()
}
}()
// Run the node plugin gRPC server first to ensure that it is ready.
implemented := false
plugin, err := startGRPCServer(klog.LoggerWithName(o.logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) {
plugin, err := startGRPCServer(klog.NewContext(ctx, klog.LoggerWithName(logger, "dra")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) {
if nodeServer, ok := nodeServer.(drapbv1alpha3.NodeServer); ok && o.nodeV1alpha3 {
o.logger.V(5).Info("registering drapbv1alpha3.NodeServer")
logger.V(5).Info("registering drapbv1alpha3.NodeServer")
drapbv1alpha3.RegisterNodeServer(grpcServer, nodeServer)
implemented = true
}
@@ -225,38 +318,77 @@ func Start(nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr e
return nil, fmt.Errorf("start node client: %v", err)
}
d.plugin = plugin
defer func() {
// Clean up if we didn't finish succcessfully.
if r := recover(); r != nil {
plugin.stop()
panic(r)
}
if finalErr != nil {
plugin.stop()
}
}()
if !implemented {
return nil, errors.New("no supported DRA gRPC API is implemented and enabled")
}
// Now make it available to kubelet.
registrar, err := startRegistrar(klog.LoggerWithName(o.logger, "registrar"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, o.draAddress, o.pluginRegistrationEndpoint)
registrar, err := startRegistrar(klog.NewContext(ctx, klog.LoggerWithName(logger, "registrar")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, o.draAddress, o.pluginRegistrationEndpoint)
if err != nil {
return nil, fmt.Errorf("start registrar: %v", err)
}
d.registrar = registrar
// startGRPCServer and startRegistrar don't implement cancellation
// themselves, we add that for both here.
d.wg.Add(1)
go func() {
defer d.wg.Done()
<-ctx.Done()
// Time to stop.
d.plugin.stop()
d.registrar.stop()
// d.resourceSliceController is set concurrently.
d.mutex.Lock()
d.resourceSliceController.Stop()
d.mutex.Unlock()
}()
return d, nil
}
// Stop implements [DRAPlugin.Stop].
func (d *draPlugin) Stop() {
if d == nil {
return
}
d.registrar.stop()
d.plugin.stop()
d.cancel(errors.New("DRA plugin was stopped"))
// Wait for goroutines in Start to clean up and exit.
d.wg.Wait()
}
// PublishResources implements [DRAPlugin.PublishResources].
func (d *draPlugin) PublishResources(ctx context.Context, nodeResources []*resourceapi.ResourceModel) {
d.mutex.Lock()
defer d.mutex.Unlock()
owner := resourceslice.Owner{
APIVersion: "v1",
Kind: "Node",
Name: d.nodeName,
UID: d.nodeUID, // Optional, will be determined by controller if empty.
}
resources := &resourceslice.Resources{NodeResources: nodeResources}
if d.resourceSliceController == nil {
// Start publishing the information. The controller is using
// our background context, not the one passed into this
// function, and thus is connected to the lifecycle of the
// plugin.
controllerCtx := d.backgroundCtx
controllerLogger := klog.FromContext(controllerCtx)
controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller")
controllerCtx = klog.NewContext(controllerCtx, controllerLogger)
d.resourceSliceController = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, resources)
return
}
// Inform running controller about new information.
d.resourceSliceController.Update(resources)
}
// RegistrationStatus implements [DRAPlugin.RegistrationStatus].
func (d *draPlugin) RegistrationStatus() *registerapi.RegistrationStatus {
if d.registrar == nil {
return nil

View File

@@ -17,30 +17,30 @@ limitations under the License.
package kubeletplugin
import (
"context"
"fmt"
"google.golang.org/grpc"
"k8s.io/klog/v2"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
)
type nodeRegistrar struct {
logger klog.Logger
registrationServer
server *grpcServer
}
// startRegistrar returns a running instance.
func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
//
// The context is only used for additional values, cancellation is ignored.
func startRegistrar(valueCtx context.Context, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
n := &nodeRegistrar{
logger: logger,
registrationServer: registrationServer{
driverName: driverName,
endpoint: endpoint,
supportedVersions: []string{"1.0.0"}, // TODO: is this correct?
},
}
s, err := startGRPCServer(logger, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) {
s, err := startGRPCServer(valueCtx, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) {
registerapi.RegisterRegistrationServer(grpcServer, n)
})
if err != nil {

View File

@@ -26,15 +26,17 @@ import (
"google.golang.org/grpc"
"k8s.io/klog/v2"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)
var requestID int64
type grpcServer struct {
logger klog.Logger
grpcVerbosity int
wg sync.WaitGroup
endpoint endpoint
server *grpc.Server
requestID int64
}
type registerService func(s *grpc.Server)
@@ -54,9 +56,11 @@ type endpoint struct {
// startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine
// which handles requests for arbitrary services.
func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
//
// The context is only used for additional values, cancellation is ignored.
func startGRPCServer(valueCtx context.Context, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
logger := klog.FromContext(valueCtx)
s := &grpcServer{
logger: logger,
endpoint: endpoint,
grpcVerbosity: grpcVerbosity,
}
@@ -79,10 +83,11 @@ func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []
// Run a gRPC server. It will close the listening socket when
// shutting down, so we don't need to do that.
var opts []grpc.ServerOption
var finalUnaryInterceptors []grpc.UnaryServerInterceptor
var finalStreamInterceptors []grpc.StreamServerInterceptor
finalUnaryInterceptors := []grpc.UnaryServerInterceptor{unaryContextInterceptor(valueCtx)}
finalStreamInterceptors := []grpc.StreamServerInterceptor{streamContextInterceptor(valueCtx)}
if grpcVerbosity >= 0 {
finalUnaryInterceptors = append(finalUnaryInterceptors, s.interceptor)
finalStreamInterceptors = append(finalStreamInterceptors, s.streamInterceptor)
}
finalUnaryInterceptors = append(finalUnaryInterceptors, unaryInterceptors...)
finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...)
@@ -103,16 +108,66 @@ func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []
}
}()
logger.Info("GRPC server started")
logger.V(3).Info("GRPC server started")
return s, nil
}
// unaryContextInterceptor injects values from the context into the context
// used by the call chain.
func unaryContextInterceptor(valueCtx context.Context) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
ctx = mergeContexts(ctx, valueCtx)
return handler(ctx, req)
}
}
// streamContextInterceptor does the same as UnaryContextInterceptor for streams.
func streamContextInterceptor(valueCtx context.Context) grpc.StreamServerInterceptor {
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx := mergeContexts(ss.Context(), valueCtx)
return handler(srv, mergeServerStream{ServerStream: ss, ctx: ctx})
}
}
type mergeServerStream struct {
grpc.ServerStream
ctx context.Context
}
func (m mergeServerStream) Context() context.Context {
return m.ctx
}
// mergeContexts creates a new context where cancellation is handled by the
// root context. The values stored by the value context are used as fallback if
// the root context doesn't have a certain value.
func mergeContexts(rootCtx, valueCtx context.Context) context.Context {
return mergeCtx{
Context: rootCtx,
valueCtx: valueCtx,
}
}
type mergeCtx struct {
context.Context
valueCtx context.Context
}
func (m mergeCtx) Value(i interface{}) interface{} {
if v := m.Context.Value(i); v != nil {
return v
}
return m.valueCtx.Value(i)
}
// interceptor is called for each request. It creates a logger with a unique,
// sequentially increasing request ID and adds that logger to the context. It
// also logs request and response.
func (s *grpcServer) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
requestID := atomic.AddInt64(&s.requestID, 1)
logger := klog.LoggerWithValues(s.logger, "requestID", requestID)
requestID := atomic.AddInt64(&requestID, 1)
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "requestID", requestID, "method", info.FullMethod)
ctx = klog.NewContext(ctx, logger)
logger.V(s.grpcVerbosity).Info("handling request", "request", req)
defer func() {
@@ -123,13 +178,57 @@ func (s *grpcServer) interceptor(ctx context.Context, req interface{}, info *grp
}()
resp, err = handler(ctx, req)
if err != nil {
logger.Error(err, "handling request failed", "request", req)
logger.Error(err, "handling request failed")
} else {
logger.V(s.grpcVerbosity).Info("handling request succeeded", "response", resp)
}
return
}
func (s *grpcServer) streamInterceptor(server interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
requestID := atomic.AddInt64(&requestID, 1)
ctx := stream.Context()
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "requestID", requestID, "method", info.FullMethod)
ctx = klog.NewContext(ctx, logger)
stream = logStream{
ServerStream: stream,
ctx: ctx,
grpcVerbosity: s.grpcVerbosity,
}
logger.V(s.grpcVerbosity).Info("handling stream")
err := handler(server, stream)
if err != nil {
logger.Error(err, "handling stream failed")
} else {
logger.V(s.grpcVerbosity).Info("handling stream succeeded")
}
return err
}
type logStream struct {
grpc.ServerStream
ctx context.Context
grpcVerbosity int
}
func (l logStream) Context() context.Context {
return l.ctx
}
func (l logStream) SendMsg(msg interface{}) error {
logger := klog.FromContext(l.ctx)
logger.V(l.grpcVerbosity).Info("sending stream message", "message", msg)
err := l.ServerStream.SendMsg(msg)
if err != nil {
logger.Error(err, "sending stream message failed")
} else {
logger.V(l.grpcVerbosity).Info("sending stream message succeeded")
}
return err
}
// stop ensures that the server is not running anymore and cleans up all resources.
// It is idempotent and may be called with a nil pointer.
func (s *grpcServer) stop() {
@@ -143,8 +242,7 @@ func (s *grpcServer) stop() {
s.server = nil
if s.endpoint.path != "" {
if err := os.Remove(s.endpoint.path); err != nil && !os.IsNotExist(err) {
s.logger.Error(err, "remove Unix socket")
utilruntime.HandleError(fmt.Errorf("remove Unix socket: %w", err))
}
}
s.logger.V(3).Info("GRPC server stopped")
}

View File

@@ -0,0 +1,391 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourceslice
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/google/go-cmp/cmp"
resourceapi "k8s.io/api/resource/v1alpha2"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
resourceinformers "k8s.io/client-go/informers/resource/v1alpha2"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
const (
// resyncPeriod for informer
// TODO (https://github.com/kubernetes/kubernetes/issues/123688): disable?
resyncPeriod = time.Duration(10 * time.Minute)
)
// Controller synchronizes information about resources of one
// driver with ResourceSlice objects. It currently supports node-local
// resources. A DRA driver for node-local resources typically runs this
// controller as part of its kubelet plugin.
//
// Support for network-attached resources will be added later.
type Controller struct {
cancel func(cause error)
driverName string
owner Owner
kubeClient kubernetes.Interface
wg sync.WaitGroup
queue workqueue.TypedRateLimitingInterface[string]
sliceStore cache.Store
mutex sync.RWMutex
// When receiving updates from the driver, the entire pointer replaced,
// so it is okay to not do a deep copy of it when reading it. Only reading
// the pointer itself must be protected by a read lock.
resources *Resources
}
// Resources is a complete description of all resources synchronized by the controller.
type Resources struct {
// NodeResources are resources that are local to one node.
NodeResources []*resourceapi.ResourceModel
}
type Owner struct {
APIVersion string
Kind string
Name string
UID types.UID
}
// StartController constructs a new controller and starts it.
// If the owner is a v1.Node, then the NodeName field in the
// ResourceSlice objects is set and used to identify objects
// managed by the controller. The UID is not needed in that
// case, the controller will determine it automatically.
//
// If a kubeClient is provided, then it synchronizes ResourceSlices
// with the resource information provided by plugins. Without it,
// the controller is inactive. This can happen when kubelet is run stand-alone
// without an apiserver. In that case we can't and don't need to publish
// ResourceSlices.
func StartController(ctx context.Context, kubeClient kubernetes.Interface, driverName string, owner Owner, resources *Resources) *Controller {
if kubeClient == nil {
return nil
}
logger := klog.FromContext(ctx)
ctx, cancel := context.WithCancelCause(ctx)
c := &Controller{
cancel: cancel,
kubeClient: kubeClient,
driverName: driverName,
owner: owner,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"},
),
resources: resources,
}
logger.V(3).Info("Starting")
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer logger.V(3).Info("Stopping")
c.run(ctx)
}()
// Sync once.
c.queue.Add("")
return c
}
// Stop cancels all background activity and blocks until the controller has stopped.
func (c *Controller) Stop() {
if c == nil {
return
}
c.cancel(errors.New("ResourceSlice controller was asked to stop"))
c.wg.Wait()
}
// Update sets the new desired state of the resource information.
func (c *Controller) Update(resources *Resources) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.resources = resources
c.queue.Add("")
}
// run is running in the background. It handles blocking initialization (like
// syncing the informer) and then syncs the actual with the desired state.
func (c *Controller) run(ctx context.Context) {
logger := klog.FromContext(ctx)
// We always filter by driver name, by node name only for node-local resources.
selector := fields.Set{"driverName": c.driverName}
if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
selector["nodeName"] = c.owner.Name
}
informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) {
options.FieldSelector = selector.String()
})
c.sliceStore = informer.GetStore()
handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
return
}
logger.V(5).Info("ResourceSlice add", "slice", klog.KObj(slice))
c.queue.Add("")
},
UpdateFunc: func(old, new any) {
oldSlice, ok := old.(*resourceapi.ResourceSlice)
if !ok {
return
}
newSlice, ok := new.(*resourceapi.ResourceSlice)
if !ok {
return
}
if loggerV := logger.V(6); loggerV.Enabled() {
loggerV.Info("ResourceSlice update", "slice", klog.KObj(newSlice), "diff", cmp.Diff(oldSlice, newSlice))
} else {
logger.V(5).Info("ResourceSlice update", "slice", klog.KObj(newSlice))
}
c.queue.Add("")
},
DeleteFunc: func(obj any) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
return
}
logger.V(5).Info("ResourceSlice delete", "slice", klog.KObj(slice))
c.queue.Add("")
},
})
if err != nil {
logger.Error(err, "Registering event handler on the ResourceSlice informer failed, disabling resource monitoring")
return
}
// Start informer and wait for our cache to be populated.
logger.V(3).Info("Starting ResourceSlice informer and waiting for it to sync")
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer logger.V(3).Info("ResourceSlice informer has stopped")
defer c.queue.ShutDown() // Once we get here, we must have been asked to stop.
informer.Run(ctx.Done())
}()
for !handler.HasSynced() {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
}
logger.V(3).Info("ResourceSlice informer has synced")
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
key, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(key)
// Panics are caught and treated like errors.
var err error
func() {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("internal error: %v", r)
}
}()
err = c.sync(ctx)
}()
if err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "processing ResourceSlice objects")
c.queue.AddRateLimited(key)
// Return without removing the work item from the queue.
// It will be retried.
return true
}
c.queue.Forget(key)
return true
}
func (c *Controller) sync(ctx context.Context) error {
logger := klog.FromContext(ctx)
// Gather information about the actual and desired state.
slices := c.sliceStore.List()
var resources *Resources
c.mutex.RLock()
resources = c.resources
c.mutex.RUnlock()
// Resources that are not yet stored in any slice need to be published.
// Here we track the indices of any resources that are already stored.
storedResourceIndices := sets.New[int]()
// Slices that don't match any driver resource can either be updated (if there
// are new driver resources that need to be stored) or they need to be deleted.
obsoleteSlices := make([]*resourceapi.ResourceSlice, 0, len(slices))
// Match slices with resource information.
for _, obj := range slices {
slice := obj.(*resourceapi.ResourceSlice)
// TODO: network-attached resources.
index := indexOfModel(resources.NodeResources, &slice.ResourceModel)
if index >= 0 {
storedResourceIndices.Insert(index)
continue
}
obsoleteSlices = append(obsoleteSlices, slice)
}
if loggerV := logger.V(6); loggerV.Enabled() {
// Dump entire resource information.
loggerV.Info("Syncing existing driver resource slices with driver resources", "slices", klog.KObjSlice(slices), "resources", resources)
} else {
logger.V(5).Info("Syncing existing driver resource slices with driver resources", "slices", klog.KObjSlice(slices), "numResources", len(resources.NodeResources))
}
// Retrieve node object to get UID?
// The result gets cached and is expected to not change while
// the controller runs.
if c.owner.UID == "" && c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
node, err := c.kubeClient.CoreV1().Nodes().Get(ctx, c.owner.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("retrieve node %q: %w", c.owner.Name, err)
}
// There is only one worker, so no locking needed.
c.owner.UID = node.UID
}
// Update stale slices before removing what's left.
//
// We don't really know which of these slices might have
// been used for "the" driver resource because they don't
// have a unique ID. In practice, a driver is most likely
// to just give us one ResourceModel, in which case
// this isn't a problem at all. If we have more than one,
// then at least conceptually it currently doesn't matter
// where we publish it.
//
// The long-term goal is to move the handling of
// ResourceSlice objects into the driver, with kubelet
// just acting as a REST proxy. The advantage of that will
// be that kubelet won't need to support the same
// resource API version as the driver and the control plane.
// With that approach, the driver will be able to match
// up objects more intelligently.
numObsoleteSlices := len(obsoleteSlices)
for index, resource := range resources.NodeResources {
if storedResourceIndices.Has(index) {
// No need to do anything, it is already stored exactly
// like this in an existing slice.
continue
}
if numObsoleteSlices > 0 {
// Update one existing slice.
slice := obsoleteSlices[numObsoleteSlices-1]
numObsoleteSlices--
slice = slice.DeepCopy()
slice.ResourceModel = *resource
logger.V(5).Info("Reusing existing resource slice", "slice", klog.KObj(slice))
if _, err := c.kubeClient.ResourceV1alpha2().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("update resource slice: %w", err)
}
continue
}
// Create a new slice.
slice := &resourceapi.ResourceSlice{
ObjectMeta: metav1.ObjectMeta{
GenerateName: c.owner.Name + "-" + c.driverName + "-",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: c.owner.APIVersion,
Kind: c.owner.Kind,
Name: c.owner.Name,
UID: c.owner.UID,
Controller: ptr.To(true),
},
},
},
DriverName: c.driverName,
ResourceModel: *resource,
}
if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
slice.NodeName = c.owner.Name
}
logger.V(5).Info("Creating new resource slice", "slice", klog.KObj(slice))
if _, err := c.kubeClient.ResourceV1alpha2().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("create resource slice: %w", err)
}
}
// All remaining slices are truly orphaned.
for i := 0; i < numObsoleteSlices; i++ {
slice := obsoleteSlices[i]
logger.V(5).Info("Deleting obsolete resource slice", "slice", klog.KObj(slice))
if err := c.kubeClient.ResourceV1alpha2().ResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("delete resource slice: %w", err)
}
}
return nil
}
func indexOfModel(models []*resourceapi.ResourceModel, model *resourceapi.ResourceModel) int {
for index, m := range models {
if apiequality.Semantic.DeepEqual(m, model) {
return index
}
}
return -1
}

View File

@@ -342,88 +342,6 @@ func (m *NodeUnprepareResourceResponse) GetError() string {
return ""
}
type NodeListAndWatchResourcesRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *NodeListAndWatchResourcesRequest) Reset() { *m = NodeListAndWatchResourcesRequest{} }
func (*NodeListAndWatchResourcesRequest) ProtoMessage() {}
func (*NodeListAndWatchResourcesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{6}
}
func (m *NodeListAndWatchResourcesRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *NodeListAndWatchResourcesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_NodeListAndWatchResourcesRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *NodeListAndWatchResourcesRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeListAndWatchResourcesRequest.Merge(m, src)
}
func (m *NodeListAndWatchResourcesRequest) XXX_Size() int {
return m.Size()
}
func (m *NodeListAndWatchResourcesRequest) XXX_DiscardUnknown() {
xxx_messageInfo_NodeListAndWatchResourcesRequest.DiscardUnknown(m)
}
var xxx_messageInfo_NodeListAndWatchResourcesRequest proto.InternalMessageInfo
type NodeListAndWatchResourcesResponse struct {
Resources []*v1alpha2.ResourceModel `protobuf:"bytes,1,rep,name=resources,proto3" json:"resources,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *NodeListAndWatchResourcesResponse) Reset() { *m = NodeListAndWatchResourcesResponse{} }
func (*NodeListAndWatchResourcesResponse) ProtoMessage() {}
func (*NodeListAndWatchResourcesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{7}
}
func (m *NodeListAndWatchResourcesResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *NodeListAndWatchResourcesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_NodeListAndWatchResourcesResponse.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *NodeListAndWatchResourcesResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeListAndWatchResourcesResponse.Merge(m, src)
}
func (m *NodeListAndWatchResourcesResponse) XXX_Size() int {
return m.Size()
}
func (m *NodeListAndWatchResourcesResponse) XXX_DiscardUnknown() {
xxx_messageInfo_NodeListAndWatchResourcesResponse.DiscardUnknown(m)
}
var xxx_messageInfo_NodeListAndWatchResourcesResponse proto.InternalMessageInfo
func (m *NodeListAndWatchResourcesResponse) GetResources() []*v1alpha2.ResourceModel {
if m != nil {
return m.Resources
}
return nil
}
type Claim struct {
// The ResourceClaim namespace (ResourceClaim.meta.Namespace).
// This field is REQUIRED.
@@ -450,7 +368,7 @@ type Claim struct {
func (m *Claim) Reset() { *m = Claim{} }
func (*Claim) ProtoMessage() {}
func (*Claim) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{8}
return fileDescriptor_00212fb1f9d3bf1c, []int{6}
}
func (m *Claim) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@@ -523,55 +441,49 @@ func init() {
proto.RegisterType((*NodeUnprepareResourcesResponse)(nil), "v1alpha3.NodeUnprepareResourcesResponse")
proto.RegisterMapType((map[string]*NodeUnprepareResourceResponse)(nil), "v1alpha3.NodeUnprepareResourcesResponse.ClaimsEntry")
proto.RegisterType((*NodeUnprepareResourceResponse)(nil), "v1alpha3.NodeUnprepareResourceResponse")
proto.RegisterType((*NodeListAndWatchResourcesRequest)(nil), "v1alpha3.NodeListAndWatchResourcesRequest")
proto.RegisterType((*NodeListAndWatchResourcesResponse)(nil), "v1alpha3.NodeListAndWatchResourcesResponse")
proto.RegisterType((*Claim)(nil), "v1alpha3.Claim")
}
func init() { proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) }
var fileDescriptor_00212fb1f9d3bf1c = []byte{
// 631 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xcf, 0x6e, 0xd3, 0x4c,
0x10, 0xcf, 0xb6, 0x4d, 0xf5, 0x65, 0x22, 0xb5, 0x9f, 0x56, 0x15, 0x0a, 0xa6, 0x98, 0x60, 0x51,
0x12, 0x51, 0xb0, 0xc1, 0x05, 0x54, 0x81, 0x38, 0xd0, 0x16, 0xd4, 0xa2, 0x82, 0x90, 0x11, 0x42,
0xe2, 0x52, 0x36, 0xde, 0xc5, 0xb1, 0xe2, 0xd8, 0x66, 0xd7, 0xae, 0xe8, 0x8d, 0x47, 0xe0, 0xb1,
0x7a, 0xe0, 0x80, 0x38, 0xf5, 0x84, 0x68, 0xb8, 0xf1, 0x14, 0xc8, 0x6b, 0x6f, 0xda, 0x44, 0x4e,
0x52, 0x89, 0xdb, 0xec, 0xfc, 0xf9, 0xcd, 0xce, 0x6f, 0x66, 0x76, 0xa1, 0x46, 0x62, 0xdf, 0x8c,
0x79, 0x94, 0x44, 0xf8, 0xbf, 0xc3, 0x7b, 0x24, 0x88, 0xbb, 0x64, 0x43, 0xbb, 0xe3, 0xf9, 0x49,
0x37, 0xed, 0x98, 0x6e, 0xd4, 0xb7, 0xbc, 0xc8, 0x8b, 0x2c, 0xe9, 0xd0, 0x49, 0x3f, 0xca, 0x93,
0x3c, 0x48, 0x29, 0x0f, 0xd4, 0x6e, 0xf7, 0x36, 0x85, 0xe9, 0x47, 0x16, 0x89, 0x7d, 0x8b, 0x33,
0x11, 0xa5, 0xdc, 0x65, 0x56, 0x01, 0x66, 0x5b, 0x1e, 0x0b, 0x19, 0x27, 0x09, 0xa3, 0xb9, 0xb7,
0xf1, 0x1c, 0xae, 0xbc, 0x8a, 0x28, 0x7b, 0xcd, 0x59, 0x4c, 0x38, 0x73, 0x0a, 0x7f, 0xe1, 0xb0,
0x4f, 0x29, 0x13, 0x09, 0x6e, 0xc1, 0xa2, 0x1b, 0x10, 0xbf, 0x2f, 0x1a, 0xa8, 0x39, 0xdf, 0xae,
0xdb, 0xcb, 0xa6, 0xba, 0x96, 0xb9, 0x9d, 0xe9, 0x9d, 0xc2, 0x6c, 0x7c, 0x43, 0xb0, 0x5a, 0x0e,
0x24, 0xe2, 0x28, 0x14, 0x0c, 0xbf, 0x18, 0x43, 0xb2, 0xcf, 0x90, 0xa6, 0xc5, 0xe5, 0x69, 0xc4,
0xb3, 0x30, 0xe1, 0x47, 0x2a, 0x99, 0xf6, 0x01, 0xea, 0xe7, 0xd4, 0xf8, 0x7f, 0x98, 0xef, 0xb1,
0xa3, 0x06, 0x6a, 0xa2, 0x76, 0xcd, 0xc9, 0x44, 0xfc, 0x18, 0xaa, 0x87, 0x24, 0x48, 0x59, 0x63,
0xae, 0x89, 0xda, 0x75, 0x7b, 0x6d, 0x6a, 0x2e, 0x95, 0xca, 0xc9, 0x63, 0x1e, 0xcd, 0x6d, 0x22,
0x83, 0x96, 0xd2, 0x32, 0x2c, 0xc6, 0x82, 0xba, 0x4b, 0xfd, 0x03, 0xca, 0x0e, 0x7d, 0x97, 0xe5,
0x15, 0xd5, 0xb6, 0x96, 0x06, 0x3f, 0xaf, 0xc1, 0xf6, 0xce, 0xde, 0x4e, 0xae, 0x75, 0xc0, 0xa5,
0x7e, 0x21, 0xe3, 0x15, 0xa8, 0x32, 0xce, 0x23, 0x2e, 0x2f, 0x54, 0x73, 0xf2, 0x83, 0xb1, 0x0b,
0x57, 0xb3, 0x2c, 0x6f, 0xc3, 0xf8, 0x5f, 0xe9, 0xff, 0x81, 0x40, 0x9f, 0x04, 0x55, 0xdc, 0x79,
0x7f, 0x0c, 0xeb, 0xfe, 0x28, 0x29, 0x93, 0x23, 0x4b, 0x5b, 0xd0, 0x99, 0xd5, 0x82, 0x27, 0xa3,
0x2d, 0x68, 0xcd, 0xc8, 0x56, 0xd6, 0x84, 0x07, 0x13, 0xe8, 0x19, 0x96, 0x34, 0x64, 0x15, 0x9d,
0x67, 0xd5, 0x80, 0x66, 0x16, 0xb6, 0xef, 0x8b, 0xe4, 0x69, 0x48, 0xdf, 0x91, 0xc4, 0xed, 0x8e,
0x13, 0x6b, 0x84, 0x70, 0x7d, 0x8a, 0x4f, 0x01, 0xbf, 0x07, 0x35, 0xb5, 0x40, 0x8a, 0xb4, 0x75,
0x33, 0xdf, 0x2e, 0x33, 0x5b, 0x54, 0x65, 0x54, 0xa5, 0xd9, 0xa6, 0xc2, 0x78, 0x19, 0x51, 0x16,
0x38, 0x67, 0xd1, 0xc6, 0x1f, 0x04, 0x55, 0xc9, 0x17, 0x5e, 0x85, 0x5a, 0x48, 0xfa, 0x4c, 0xc4,
0xc4, 0x65, 0xc5, 0xbd, 0xcf, 0x14, 0x19, 0x8f, 0xa9, 0x4f, 0x8b, 0x29, 0xc9, 0x44, 0x8c, 0x61,
0x21, 0x33, 0x37, 0xe6, 0xa5, 0x4a, 0xca, 0xb8, 0x05, 0xcb, 0x0a, 0xfa, 0xa0, 0x4b, 0x42, 0x1a,
0xb0, 0xc6, 0x82, 0x34, 0x2f, 0x29, 0xf5, 0xae, 0xd4, 0xe2, 0x04, 0x34, 0x91, 0xf0, 0xd4, 0x4d,
0x52, 0xce, 0xe8, 0xc1, 0x78, 0x4c, 0x55, 0x96, 0xf4, 0x70, 0x7a, 0x49, 0x6f, 0x86, 0xf1, 0xce,
0x08, 0xb6, 0xd3, 0x10, 0x13, 0x2c, 0xf6, 0xc9, 0x1c, 0x2c, 0x64, 0xec, 0x62, 0x0f, 0x56, 0xca,
0x76, 0x1b, 0xaf, 0xcd, 0xda, 0x7d, 0xd9, 0x24, 0xed, 0xe6, 0xc5, 0x9e, 0x08, 0xa3, 0x82, 0xfb,
0x70, 0xa9, 0x7c, 0x86, 0x71, 0x6b, 0xf6, 0x94, 0xe7, 0xc9, 0xda, 0x17, 0x5d, 0x07, 0xa3, 0x82,
0x3f, 0xc3, 0xe5, 0x89, 0xd3, 0x83, 0x6f, 0x8d, 0x02, 0x4d, 0x1b, 0x43, 0x6d, 0xfd, 0x42, 0xbe,
0x2a, 0xef, 0x5d, 0xb4, 0xb5, 0x75, 0x7c, 0xaa, 0xa3, 0x93, 0x53, 0xbd, 0xf2, 0x65, 0xa0, 0xa3,
0xe3, 0x81, 0x8e, 0xbe, 0x0f, 0x74, 0xf4, 0x6b, 0xa0, 0xa3, 0xaf, 0xbf, 0xf5, 0xca, 0xfb, 0x1b,
0xc5, 0xd3, 0xdf, 0x4b, 0x3b, 0x2c, 0x60, 0x89, 0x15, 0xf7, 0xbc, 0xec, 0x1b, 0x10, 0x16, 0xe5,
0x44, 0x7d, 0x01, 0x1b, 0x9d, 0x45, 0xf9, 0xf2, 0x6f, 0xfc, 0x0d, 0x00, 0x00, 0xff, 0xff, 0xb2,
0x0b, 0x57, 0x0c, 0x6d, 0x06, 0x00, 0x00,
// 562 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0xce, 0x36, 0x49, 0x45, 0x26, 0x52, 0x8b, 0x56, 0x15, 0xb2, 0x42, 0x31, 0x91, 0x45, 0x49,
0x0e, 0x60, 0x0b, 0x07, 0x50, 0x05, 0xe2, 0x92, 0x16, 0x54, 0x10, 0x42, 0xc8, 0x88, 0x0b, 0x97,
0xb0, 0xb1, 0x07, 0xc7, 0x4a, 0x62, 0x9b, 0x5d, 0x3b, 0x52, 0x6f, 0x3c, 0x02, 0x8f, 0xd5, 0x03,
0x07, 0xc4, 0x89, 0x53, 0x45, 0xcd, 0x8d, 0xa7, 0x40, 0x5e, 0xdb, 0x69, 0x13, 0x39, 0x4d, 0xa5,
0xde, 0x66, 0xe7, 0xef, 0x9b, 0xfd, 0xe6, 0x07, 0x1a, 0x2c, 0xf4, 0xf4, 0x90, 0x07, 0x51, 0x40,
0x6f, 0xcc, 0x1e, 0xb1, 0x49, 0x38, 0x62, 0xbd, 0xd6, 0x43, 0xd7, 0x8b, 0x46, 0xf1, 0x50, 0xb7,
0x83, 0xa9, 0xe1, 0x06, 0x6e, 0x60, 0x48, 0x87, 0x61, 0xfc, 0x45, 0xbe, 0xe4, 0x43, 0x4a, 0x59,
0x60, 0xeb, 0xc1, 0x78, 0x5f, 0xe8, 0x5e, 0x60, 0xb0, 0xd0, 0x33, 0x38, 0x8a, 0x20, 0xe6, 0x36,
0x1a, 0x79, 0x32, 0xd3, 0x70, 0xd1, 0x47, 0xce, 0x22, 0x74, 0x32, 0x6f, 0xed, 0x15, 0xdc, 0x7e,
0x17, 0x38, 0xf8, 0x9e, 0x63, 0xc8, 0x38, 0x5a, 0xb9, 0xbf, 0xb0, 0xf0, 0x6b, 0x8c, 0x22, 0xa2,
0x1d, 0xd8, 0xb4, 0x27, 0xcc, 0x9b, 0x0a, 0x85, 0xb4, 0xab, 0xdd, 0xa6, 0xb9, 0xad, 0x17, 0x65,
0xe9, 0x07, 0xa9, 0xde, 0xca, 0xcd, 0xda, 0x0f, 0x02, 0xbb, 0xe5, 0x89, 0x44, 0x18, 0xf8, 0x02,
0xe9, 0x9b, 0xa5, 0x4c, 0xe6, 0x79, 0xa6, 0xcb, 0xe2, 0x32, 0x18, 0xf1, 0xd2, 0x8f, 0xf8, 0x71,
0x01, 0xd6, 0xfa, 0x0c, 0xcd, 0x0b, 0x6a, 0x7a, 0x13, 0xaa, 0x63, 0x3c, 0x56, 0x48, 0x9b, 0x74,
0x1b, 0x56, 0x2a, 0xd2, 0xe7, 0x50, 0x9f, 0xb1, 0x49, 0x8c, 0xca, 0x46, 0x9b, 0x74, 0x9b, 0xe6,
0xde, 0xa5, 0x58, 0x05, 0x94, 0x95, 0xc5, 0x3c, 0xdb, 0xd8, 0x27, 0x9a, 0x53, 0x4a, 0xcb, 0xfc,
0x33, 0x06, 0x34, 0x6d, 0xc7, 0x1b, 0x38, 0x38, 0xf3, 0x6c, 0xcc, 0x7e, 0xd4, 0xe8, 0x6f, 0x25,
0xa7, 0x77, 0xe1, 0xe0, 0xf0, 0xf5, 0x61, 0xa6, 0xb5, 0xc0, 0x76, 0xbc, 0x5c, 0xa6, 0x3b, 0x50,
0x47, 0xce, 0x03, 0x2e, 0x0b, 0x6a, 0x58, 0xd9, 0x43, 0x3b, 0x82, 0x3b, 0x29, 0xca, 0x47, 0x3f,
0xbc, 0x2e, 0xfd, 0xbf, 0x08, 0xa8, 0xab, 0x52, 0xe5, 0x35, 0xbf, 0x5d, 0xca, 0xf5, 0x78, 0x91,
0x94, 0xd5, 0x91, 0xa5, 0x2d, 0x18, 0xae, 0x6b, 0xc1, 0x8b, 0xc5, 0x16, 0x74, 0xd6, 0xa0, 0x95,
0x35, 0xe1, 0xc9, 0x0a, 0x7a, 0xe6, 0x5f, 0x9a, 0xb3, 0x4a, 0x2e, 0xb2, 0xfa, 0x8f, 0x40, 0x5d,
0xd6, 0x46, 0x77, 0xa1, 0xe1, 0xb3, 0x29, 0x8a, 0x90, 0xd9, 0x98, 0xfb, 0x9c, 0x2b, 0xd2, 0x9a,
0x63, 0xcf, 0xc9, 0x3b, 0x92, 0x8a, 0x94, 0x42, 0x2d, 0x35, 0x2b, 0x55, 0xa9, 0x92, 0x32, 0xed,
0xc0, 0x76, 0xb1, 0x45, 0x83, 0x11, 0xf3, 0x9d, 0x09, 0x2a, 0x35, 0x69, 0xde, 0x2a, 0xd4, 0x47,
0x52, 0x4b, 0x23, 0x68, 0x89, 0x88, 0xc7, 0x76, 0x14, 0x73, 0x74, 0x06, 0xcb, 0x31, 0x75, 0xc9,
0xf9, 0x53, 0x3d, 0x5b, 0x4e, 0x3d, 0xdd, 0xf3, 0xc2, 0xa5, 0x60, 0xc6, 0xd4, 0x3f, 0xcc, 0xe3,
0xad, 0x85, 0xdc, 0x96, 0x22, 0x56, 0x58, 0xcc, 0x53, 0x02, 0xb5, 0x94, 0x24, 0xea, 0xc2, 0x4e,
0xd9, 0x1e, 0xd1, 0xbd, 0x75, 0x7b, 0x26, 0x27, 0xad, 0x75, 0xff, 0x6a, 0xeb, 0xa8, 0x55, 0xe8,
0x14, 0x6e, 0x95, 0xcf, 0x0b, 0xed, 0xac, 0x9f, 0xa8, 0x0c, 0xac, 0x7b, 0xd5, 0xd1, 0xd3, 0x2a,
0xfd, 0xfe, 0xc9, 0x99, 0x4a, 0x7e, 0x9f, 0xa9, 0x95, 0x6f, 0x89, 0x4a, 0x4e, 0x12, 0x95, 0xfc,
0x4c, 0x54, 0xf2, 0x27, 0x51, 0xc9, 0xf7, 0xbf, 0x6a, 0xe5, 0xd3, 0xbd, 0xfc, 0xd8, 0x8d, 0xe3,
0x21, 0x4e, 0x30, 0x32, 0xc2, 0xb1, 0x9b, 0x1e, 0x3e, 0x61, 0x38, 0x9c, 0x15, 0x47, 0xaf, 0x37,
0xdc, 0x94, 0xb7, 0xae, 0xf7, 0x3f, 0x00, 0x00, 0xff, 0xff, 0xfd, 0x14, 0x30, 0xd4, 0x5f, 0x05,
0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -594,10 +506,6 @@ type NodeClient interface {
// NodeUnprepareResources is the opposite of NodePrepareResources.
// The same error handling rules apply,
NodeUnprepareResources(ctx context.Context, in *NodeUnprepareResourcesRequest, opts ...grpc.CallOption) (*NodeUnprepareResourcesResponse, error)
// NodeListAndWatchResources returns a stream of NodeResourcesResponse objects.
// At the start and whenever resource availability changes, the
// plugin must send one such object with all information to the Kubelet.
NodeListAndWatchResources(ctx context.Context, in *NodeListAndWatchResourcesRequest, opts ...grpc.CallOption) (Node_NodeListAndWatchResourcesClient, error)
}
type nodeClient struct {
@@ -626,38 +534,6 @@ func (c *nodeClient) NodeUnprepareResources(ctx context.Context, in *NodeUnprepa
return out, nil
}
func (c *nodeClient) NodeListAndWatchResources(ctx context.Context, in *NodeListAndWatchResourcesRequest, opts ...grpc.CallOption) (Node_NodeListAndWatchResourcesClient, error) {
stream, err := c.cc.NewStream(ctx, &_Node_serviceDesc.Streams[0], "/v1alpha3.Node/NodeListAndWatchResources", opts...)
if err != nil {
return nil, err
}
x := &nodeNodeListAndWatchResourcesClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Node_NodeListAndWatchResourcesClient interface {
Recv() (*NodeListAndWatchResourcesResponse, error)
grpc.ClientStream
}
type nodeNodeListAndWatchResourcesClient struct {
grpc.ClientStream
}
func (x *nodeNodeListAndWatchResourcesClient) Recv() (*NodeListAndWatchResourcesResponse, error) {
m := new(NodeListAndWatchResourcesResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// NodeServer is the server API for Node service.
type NodeServer interface {
// NodePrepareResources prepares several ResourceClaims
@@ -668,10 +544,6 @@ type NodeServer interface {
// NodeUnprepareResources is the opposite of NodePrepareResources.
// The same error handling rules apply,
NodeUnprepareResources(context.Context, *NodeUnprepareResourcesRequest) (*NodeUnprepareResourcesResponse, error)
// NodeListAndWatchResources returns a stream of NodeResourcesResponse objects.
// At the start and whenever resource availability changes, the
// plugin must send one such object with all information to the Kubelet.
NodeListAndWatchResources(*NodeListAndWatchResourcesRequest, Node_NodeListAndWatchResourcesServer) error
}
// UnimplementedNodeServer can be embedded to have forward compatible implementations.
@@ -684,9 +556,6 @@ func (*UnimplementedNodeServer) NodePrepareResources(ctx context.Context, req *N
func (*UnimplementedNodeServer) NodeUnprepareResources(ctx context.Context, req *NodeUnprepareResourcesRequest) (*NodeUnprepareResourcesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method NodeUnprepareResources not implemented")
}
func (*UnimplementedNodeServer) NodeListAndWatchResources(req *NodeListAndWatchResourcesRequest, srv Node_NodeListAndWatchResourcesServer) error {
return status.Errorf(codes.Unimplemented, "method NodeListAndWatchResources not implemented")
}
func RegisterNodeServer(s *grpc.Server, srv NodeServer) {
s.RegisterService(&_Node_serviceDesc, srv)
@@ -728,27 +597,6 @@ func _Node_NodeUnprepareResources_Handler(srv interface{}, ctx context.Context,
return interceptor(ctx, in, info, handler)
}
func _Node_NodeListAndWatchResources_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(NodeListAndWatchResourcesRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(NodeServer).NodeListAndWatchResources(m, &nodeNodeListAndWatchResourcesServer{stream})
}
type Node_NodeListAndWatchResourcesServer interface {
Send(*NodeListAndWatchResourcesResponse) error
grpc.ServerStream
}
type nodeNodeListAndWatchResourcesServer struct {
grpc.ServerStream
}
func (x *nodeNodeListAndWatchResourcesServer) Send(m *NodeListAndWatchResourcesResponse) error {
return x.ServerStream.SendMsg(m)
}
var _Node_serviceDesc = grpc.ServiceDesc{
ServiceName: "v1alpha3.Node",
HandlerType: (*NodeServer)(nil),
@@ -762,13 +610,7 @@ var _Node_serviceDesc = grpc.ServiceDesc{
Handler: _Node_NodeUnprepareResources_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "NodeListAndWatchResources",
Handler: _Node_NodeListAndWatchResources_Handler,
ServerStreams: true,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "api.proto",
}
@@ -1013,66 +855,6 @@ func (m *NodeUnprepareResourceResponse) MarshalToSizedBuffer(dAtA []byte) (int,
return len(dAtA) - i, nil
}
func (m *NodeListAndWatchResourcesRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *NodeListAndWatchResourcesRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *NodeListAndWatchResourcesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
return len(dAtA) - i, nil
}
func (m *NodeListAndWatchResourcesResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *NodeListAndWatchResourcesResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *NodeListAndWatchResourcesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Resources) > 0 {
for iNdEx := len(m.Resources) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Resources[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintApi(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *Claim) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@@ -1255,30 +1037,6 @@ func (m *NodeUnprepareResourceResponse) Size() (n int) {
return n
}
func (m *NodeListAndWatchResourcesRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
return n
}
func (m *NodeListAndWatchResourcesResponse) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Resources) > 0 {
for _, e := range m.Resources {
l = e.Size()
n += 1 + l + sovApi(uint64(l))
}
}
return n
}
func (m *Claim) Size() (n int) {
if m == nil {
return 0
@@ -1407,30 +1165,6 @@ func (this *NodeUnprepareResourceResponse) String() string {
}, "")
return s
}
func (this *NodeListAndWatchResourcesRequest) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&NodeListAndWatchResourcesRequest{`,
`}`,
}, "")
return s
}
func (this *NodeListAndWatchResourcesResponse) String() string {
if this == nil {
return "nil"
}
repeatedStringForResources := "[]*ResourceModel{"
for _, f := range this.Resources {
repeatedStringForResources += strings.Replace(fmt.Sprintf("%v", f), "ResourceModel", "v1alpha2.ResourceModel", 1) + ","
}
repeatedStringForResources += "}"
s := strings.Join([]string{`&NodeListAndWatchResourcesResponse{`,
`Resources:` + repeatedStringForResources + `,`,
`}`,
}, "")
return s
}
func (this *Claim) String() string {
if this == nil {
return "nil"
@@ -2180,140 +1914,6 @@ func (m *NodeUnprepareResourceResponse) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *NodeListAndWatchResourcesRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: NodeListAndWatchResourcesRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: NodeListAndWatchResourcesRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *NodeListAndWatchResourcesResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: NodeListAndWatchResourcesResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: NodeListAndWatchResourcesResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Resources", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthApi
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthApi
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Resources = append(m.Resources, &v1alpha2.ResourceModel{})
if err := m.Resources[len(m.Resources)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Claim) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0

View File

@@ -45,11 +45,9 @@ service Node {
rpc NodeUnprepareResources (NodeUnprepareResourcesRequest)
returns (NodeUnprepareResourcesResponse) {}
// NodeListAndWatchResources returns a stream of NodeResourcesResponse objects.
// At the start and whenever resource availability changes, the
// plugin must send one such object with all information to the Kubelet.
rpc NodeListAndWatchResources(NodeListAndWatchResourcesRequest)
returns (stream NodeListAndWatchResourcesResponse) {}
// TODO: removing NodeListAndWatchResources and the code for
// publishing ResourceSlice objects by kubelet is an API break.
// If we do this, then v1alpha3 must be renamed to v1alpha4.
}
message NodePrepareResourcesRequest {
@@ -94,13 +92,6 @@ message NodeUnprepareResourceResponse {
string error = 1;
}
message NodeListAndWatchResourcesRequest {
}
message NodeListAndWatchResourcesResponse {
repeated k8s.io.api.resource.v1alpha2.ResourceModel resources = 1;
}
message Claim {
// The ResourceClaim namespace (ResourceClaim.meta.Namespace).
// This field is REQUIRED.

View File

@@ -55,9 +55,8 @@ import (
)
const (
NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources"
NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources"
NodeListAndWatchResourcesMethod = "/v1alpha3.Node/NodeListAndWatchResources"
NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources"
NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources"
)
type Nodes struct {
@@ -314,7 +313,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
nodename := pod.Spec.NodeName
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
loggerCtx := klog.NewContext(ctx, logger)
plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, nodename,
plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, d.f.ClientSet, nodename,
app.FileOperations{
Create: func(name string, content []byte) error {
klog.Background().Info("creating CDI file", "node", nodename, "filename", name, "content", string(content))

View File

@@ -892,17 +892,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
driver.parameterMode = parameterModeStructured
f.It("must manage ResourceSlices", f.WithSlow(), func(ctx context.Context) {
nodeName := nodes.NodeNames[0]
driverName := driver.Name
// Check for gRPC call on one node. If that already fails, then
// we have a fundamental problem.
m := MethodInstance{nodeName, NodeListAndWatchResourcesMethod}
ginkgo.By("wait for NodeListAndWatchResources call")
gomega.Eventually(ctx, func() int64 {
return driver.CallCount(m)
}).WithTimeout(podStartTimeout).Should(gomega.BeNumerically(">", int64(0)), "NodeListAndWatchResources call count")
// Now check for exactly the right set of objects for all nodes.
ginkgo.By("check if ResourceSlice object(s) exist on the API server")
resourceClient := f.ClientSet.ResourceV1alpha2().ResourceSlices()

View File

@@ -66,7 +66,7 @@ go run ./test/e2e/dra/test-driver --feature-gates ContextualLogging=true -v=5 co
In yet another:
```console
sudo mkdir -p /var/run/cdi && sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry
go run ./test/e2e/dra/test-driver --feature-gates ContextualLogging=true -v=5 kubelet-plugin
go run ./test/e2e/dra/test-driver --feature-gates ContextualLogging=true -v=5 kubelet-plugin --node-name=127.0.0.1
```
And finally:

View File

@@ -28,12 +28,11 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
resourceapi "k8s.io/api/resource/v1alpha2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"
drapbv1alpha3 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
@@ -111,8 +110,9 @@ type FileOperations struct {
}
// StartPlugin sets up the servers that are necessary for a DRA kubelet plugin.
func StartPlugin(ctx context.Context, cdiDir, driverName string, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) {
func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kubernetes.Interface, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) {
logger := klog.FromContext(ctx)
if fileOps.Create == nil {
fileOps.Create = func(name string, content []byte) error {
return os.WriteFile(name, content, os.FileMode(0644))
@@ -143,17 +143,33 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, nodeName string
}
opts = append(opts,
kubeletplugin.Logger(logger),
kubeletplugin.DriverName(driverName),
kubeletplugin.NodeName(nodeName),
kubeletplugin.KubeClient(kubeClient),
kubeletplugin.GRPCInterceptor(ex.recordGRPCCall),
kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream),
)
d, err := kubeletplugin.Start(ex, opts...)
d, err := kubeletplugin.Start(ctx, ex, opts...)
if err != nil {
return nil, fmt.Errorf("start kubelet plugin: %w", err)
}
ex.d = d
if fileOps.NumResourceInstances >= 0 {
instances := make([]resourceapi.NamedResourcesInstance, ex.fileOps.NumResourceInstances)
for i := 0; i < ex.fileOps.NumResourceInstances; i++ {
instances[i].Name = fmt.Sprintf("instance-%02d", i)
}
nodeResources := []*resourceapi.ResourceModel{
{
NamedResources: &resourceapi.NamedResourcesResources{
Instances: instances,
},
},
}
ex.d.PublishResources(ctx, nodeResources)
}
return ex, nil
}
@@ -453,39 +469,6 @@ func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapbv
return resp, nil
}
func (ex *ExamplePlugin) NodeListAndWatchResources(req *drapbv1alpha3.NodeListAndWatchResourcesRequest, stream drapbv1alpha3.Node_NodeListAndWatchResourcesServer) error {
if ex.fileOps.NumResourceInstances < 0 {
ex.logger.Info("Sending no NodeResourcesResponse")
return status.New(codes.Unimplemented, "node resource support disabled").Err()
}
instances := make([]resourceapi.NamedResourcesInstance, len(ex.instances))
for i, name := range sets.List(ex.instances) {
instances[i].Name = name
}
resp := &drapbv1alpha3.NodeListAndWatchResourcesResponse{
Resources: []*resourceapi.ResourceModel{
{
NamedResources: &resourceapi.NamedResourcesResources{
Instances: instances,
},
},
},
}
ex.logger.Info("Sending NodeListAndWatchResourcesResponse", "response", resp)
if err := stream.Send(resp); err != nil {
return err
}
// Keep the stream open until the test is done.
// TODO: test sending more updates later
<-ex.stopCh
ex.logger.Info("Done sending NodeListAndWatchResourcesResponse, closing stream")
return nil
}
func (ex *ExamplePlugin) GetPreparedResources() []ClaimID {
ex.mutex.Lock()
defer ex.mutex.Unlock()

View File

@@ -21,6 +21,7 @@ package app
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
@@ -272,6 +273,7 @@ func NewCommand() *cobra.Command {
draAddress := fs.String("dra-address", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket that kubelet will connect to for dynamic resource allocation requests, in the filesystem of kubelet.")
fs = kubeletPluginFlagSets.FlagSet("CDI")
cdiDir := fs.String("cdi-dir", "/var/run/cdi", "directory for dynamically created CDI JSON files")
nodeName := fs.String("node-name", "", "name of the node that the kubelet plugin is responsible for")
fs = kubeletPlugin.Flags()
for _, f := range kubeletPluginFlagSets.FlagSets {
fs.AddFlagSet(f)
@@ -287,7 +289,11 @@ func NewCommand() *cobra.Command {
return fmt.Errorf("create socket directory: %w", err)
}
plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, "", FileOperations{},
if *nodeName == "" {
return errors.New("--node-name not set")
}
plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, clientset, *nodeName, FileOperations{},
kubeletplugin.PluginSocketPath(*endpoint),
kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")),
kubeletplugin.KubeletPluginSocketPath(*draAddress),

View File

@@ -69,14 +69,19 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
f := framework.NewDefaultFramework("dra-node")
f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
var kubeletPlugin, kubeletPlugin1, kubeletPlugin2 *testdriver.ExamplePlugin
ginkgo.BeforeEach(func() {
ginkgo.DeferCleanup(func(ctx context.Context) {
// When plugin and kubelet get killed at the end of the tests, they leave ResourceSlices behind.
// Perhaps garbage collection would eventually remove them (not sure how the node instance
// is managed), but this could take time. Let's clean up explicitly.
framework.ExpectNoError(f.ClientSet.ResourceV1alpha2().ResourceSlices().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{}))
})
})
f.Context("Resource Kubelet Plugin", f.WithSerial(), func() {
ginkgo.BeforeEach(func(ctx context.Context) {
kubeletPlugin = newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
})
ginkgo.It("must register after Kubelet restart", func(ctx context.Context) {
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
oldCalls := kubeletPlugin.GetGRPCCalls()
getNewCalls := func() []testdriver.GRPCCall {
calls := kubeletPlugin.GetGRPCCalls()
@@ -91,6 +96,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must register after plugin restart", func(ctx context.Context) {
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
ginkgo.By("restart Kubelet Plugin")
kubeletPlugin.Stop()
kubeletPlugin = newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
@@ -100,7 +107,10 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must process pod created when kubelet is not running", func(ctx context.Context) {
newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
// Stop Kubelet
ginkgo.By("stop kubelet")
startKubelet := stopKubelet()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName})
// Pod must be in pending state
@@ -109,6 +119,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
framework.ExpectNoError(err)
// Start Kubelet
ginkgo.By("restart kubelet")
startKubelet()
// Pod should succeed
err = e2epod.WaitForPodSuccessInNamespaceTimeout(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartShortTimeout)
@@ -116,6 +127,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must keep pod in pending state if NodePrepareResources times out", func(ctx context.Context) {
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
unblock := kubeletPlugin.BlockNodePrepareResources()
defer unblock()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName})
@@ -134,6 +147,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must run pod if NodePrepareResources fails and then succeeds", func(ctx context.Context) {
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
unset := kubeletPlugin.SetNodePrepareResourcesFailureMode()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName})
@@ -157,6 +172,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must run pod if NodeUnprepareResources fails and then succeeds", func(ctx context.Context) {
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
unset := kubeletPlugin.SetNodeUnprepareResourcesFailureMode()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName})
@@ -177,6 +194,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must retry NodePrepareResources after Kubelet restart", func(ctx context.Context) {
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
unset := kubeletPlugin.SetNodePrepareResourcesFailureMode()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName})
@@ -206,6 +225,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must retry NodeUnprepareResources after Kubelet restart", func(ctx context.Context) {
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
unset := kubeletPlugin.SetNodeUnprepareResourcesFailureMode()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName})
ginkgo.By("wait for NodePrepareResources call to succeed")
@@ -231,6 +252,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must call NodeUnprepareResources for deleted pod", func(ctx context.Context) {
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
unset := kubeletPlugin.SetNodeUnprepareResourcesFailureMode()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", false, []string{driverName})
@@ -253,6 +276,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must call NodeUnprepareResources for deleted pod after Kubelet restart", func(ctx context.Context) {
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
unset := kubeletPlugin.SetNodeUnprepareResourcesFailureMode()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", false, []string{driverName})
@@ -282,6 +307,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must not call NodePrepareResources for deleted pod after Kubelet restart", func(ctx context.Context) {
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
unblock := kubeletPlugin.BlockNodePrepareResources()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", false, []string{driverName})
@@ -309,16 +336,21 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
f.Context("Two resource Kubelet Plugins", f.WithSerial(), func() {
ginkgo.BeforeEach(func(ctx context.Context) {
kubeletPlugin1 = newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), kubeletPlugin1Name)
kubeletPlugin2 = newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), kubeletPlugin2Name)
// start creates plugins which will get stopped when the context gets canceled.
start := func(ctx context.Context) (*testdriver.ExamplePlugin, *testdriver.ExamplePlugin) {
kubeletPlugin1 := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), kubeletPlugin1Name)
kubeletPlugin2 := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), kubeletPlugin2Name)
ginkgo.By("wait for Kubelet plugin registration")
gomega.Eventually(kubeletPlugin1.GetGRPCCalls()).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
gomega.Eventually(kubeletPlugin2.GetGRPCCalls()).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
})
return kubeletPlugin1, kubeletPlugin2
}
ginkgo.It("must prepare and unprepare resources", func(ctx context.Context) {
kubeletPlugin1, kubeletPlugin2 := start(ctx)
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{kubeletPlugin1Name, kubeletPlugin2Name})
ginkgo.By("wait for pod to succeed")
@@ -335,6 +367,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must run pod if NodePrepareResources fails for one plugin and then succeeds", func(ctx context.Context) {
_, kubeletPlugin2 := start(ctx)
unset := kubeletPlugin2.SetNodePrepareResourcesFailureMode()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{kubeletPlugin1Name, kubeletPlugin2Name})
@@ -358,6 +392,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must run pod if NodeUnprepareResources fails for one plugin and then succeeds", func(ctx context.Context) {
kubeletPlugin1, kubeletPlugin2 := start(ctx)
unset := kubeletPlugin2.SetNodeUnprepareResourcesFailureMode()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{kubeletPlugin1Name, kubeletPlugin2Name})
@@ -381,6 +417,9 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must run pod if NodePrepareResources is in progress for one plugin when Kubelet restarts", func(ctx context.Context) {
_, kubeletPlugin2 := start(ctx)
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
unblock := kubeletPlugin.BlockNodePrepareResources()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{kubeletPlugin1Name, kubeletPlugin2Name})
@@ -404,6 +443,8 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
ginkgo.It("must call NodeUnprepareResources again if it's in progress for one plugin when Kubelet restarts", func(ctx context.Context) {
kubeletPlugin1, kubeletPlugin2 := start(ctx)
unblock := kubeletPlugin2.BlockNodeUnprepareResources()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{kubeletPlugin1Name, kubeletPlugin2Name})
@@ -443,8 +484,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
}
f.It("must be removed on kubelet startup", f.WithDisruptive(), func(ctx context.Context) {
gomega.Expect(listResources(ctx)).To(gomega.BeEmpty(), "ResourceSlices should have been deleted after previous test")
ginkgo.By("stop kubelet")
startKubelet := stopKubelet()
ginkgo.DeferCleanup(func() {
@@ -474,12 +513,11 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
})
f.It("must be removed after plugin unregistration", func(ctx context.Context) {
gomega.Expect(listResources(ctx)).To(gomega.BeEmpty(), "ResourceSlices should have been deleted after previous test")
nodeName := getNodeName(ctx, f)
matchNode := gomega.ConsistOf(matchResourcesByNodeName(nodeName))
ginkgo.By("start plugin and wait for ResourceSlice")
kubeletPlugin = newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
kubeletPlugin := newKubeletPlugin(ctx, f.ClientSet, getNodeName(ctx, f), driverName)
gomega.Eventually(ctx, listResources).Should(matchNode, "ResourceSlice from kubelet plugin")
gomega.Consistently(ctx, listResources).WithTimeout(5*time.Second).Should(matchNode, "ResourceSlice from kubelet plugin")
@@ -510,7 +548,8 @@ func newKubeletPlugin(ctx context.Context, clientSet kubernetes.Interface, nodeN
ctx,
cdiDir,
pluginName,
"",
clientSet,
nodeName,
testdriver.FileOperations{},
kubeletplugin.PluginSocketPath(endpoint),
kubeletplugin.RegistrarSocketPath(path.Join(pluginRegistrationPath, pluginName+"-reg.sock")),