Update kubeletplugin API for DynamicResourceAllocation to v1alpha2

This PR makes the NodePrepareResources() and NodeUnprepareResource()
calls of the kubeletplugin API for DynamicResourceAllocation
symmetrical. It wasn't clear how one would use the set of CDIDevices
passed back in the NodeUnprepareResource() of the v1alpha1 API, and the
new API now passes back the full ResourceHandle that was originally
passed to the Prepare() call. Passing the ResourceHandle is strictly
more informative and a plugin could always (re)derive the set of
CDIDevice from it.

This is a breaking change, but this release is scheduled to break
multiple APIs for DynamicResourceAllocation, so it makes sense to do
this now instead of later.

Signed-off-by: Kevin Klues <kklues@nvidia.com>
This commit is contained in:
Kevin Klues
2023-03-13 21:38:56 +00:00
parent 452f345c47
commit 579295e727
8 changed files with 106 additions and 84 deletions

View File

@@ -69,7 +69,7 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
// Process resources for each resource claim referenced by container
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
for range container.Resources.Claims {
for i, podResourceClaim := range pod.Spec.ResourceClaims {
for i := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)
@@ -99,7 +99,7 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
// Check if pod is in the ReservedFor for the claim
if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
pod.Name, pod.UID, podResourceClaim.Name, resourceClaim.UID)
pod.Name, pod.UID, claimName, resourceClaim.UID)
}
// Grab the allocation.resourceHandles. If there are no
@@ -241,19 +241,45 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
continue
}
// Query claim object from the API server
resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(
context.TODO(),
claimName,
metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err)
}
// Grab the allocation.resourceHandles. If there are no
// allocation.resourceHandles, create a single resourceHandle with no
// content. This will trigger processing of this claim by a single
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
if len(resourceHandles) == 0 {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
}
// Loop through all plugins and call NodeUnprepareResource only for the
// last pod that references the claim
for pluginName, cdiDevices := range claimInfo.CDIDevices {
for _, resourceHandle := range resourceHandles {
// If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status
pluginName := resourceHandle.DriverName
if pluginName == "" {
pluginName = claimInfo.DriverName
}
// Call NodeUnprepareResource RPC for each resourceHandle
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.DriverName, err)
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
}
response, err := client.NodeUnprepareResource(
context.Background(),
claimInfo.Namespace,
claimInfo.ClaimUID,
claimInfo.ClaimName,
cdiDevices)
resourceHandle.Data)
if err != nil {
return fmt.Errorf(
"NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, CDI devices: %s, err: %+v",
@@ -270,7 +296,7 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
m.cache.delete(claimInfo.ClaimName, pod.Namespace)
// Checkpoint to reduce redundant calls to NodeUnPrepareResource() after a kubelet restart.
err := m.cache.syncToCheckpoint()
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}

View File

@@ -29,7 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha1"
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
)
const PluginClientTimeout = 10 * time.Second
@@ -48,7 +48,7 @@ type Client interface {
namespace string,
claimUID types.UID,
claimName string,
cdiDevice []string,
resourceHandle string,
) (*drapbv1.NodeUnprepareResourceResponse, error)
}
@@ -145,14 +145,14 @@ func (r *draPluginClient) NodeUnprepareResource(
namespace string,
claimUID types.UID,
claimName string,
cdiDevices []string,
resourceHandle string,
) (*drapbv1.NodeUnprepareResourceResponse, error) {
klog.V(4).InfoS(
log("calling NodeUnprepareResource rpc"),
"namespace", namespace,
"claimUID", claimUID,
"claimname", claimName,
"cdiDevices", cdiDevices)
"resourceHandle", resourceHandle)
if r.nodeV1ClientCreator == nil {
return nil, errors.New("nodeV1ClientCreate is nil")
@@ -165,10 +165,10 @@ func (r *draPluginClient) NodeUnprepareResource(
defer closer.Close()
req := &drapbv1.NodeUnprepareResourceRequest{
Namespace: namespace,
ClaimUid: string(claimUID),
ClaimName: claimName,
CdiDevices: cdiDevices,
Namespace: namespace,
ClaimUid: string(claimUID),
ClaimName: claimName,
ResourceHandle: resourceHandle,
}
ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)