Merge pull request #115912 from moshe010/dra-checkpoint

kubelet DRA: Add checkpointing mechanism in the DRA Manager
This commit is contained in:
Kubernetes Prow Robot 2023-03-12 12:20:40 -07:00 committed by GitHub
commit a4a0fd44d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 499 additions and 103 deletions

View File

@ -309,7 +309,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
// initialize DRA manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager")
cm.draManager, err = dra.NewManagerImpl(kubeClient)
cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir)
if err != nil {
return nil, err
}

View File

@ -29,6 +29,9 @@ import (
"errors"
"fmt"
"strings"
"k8s.io/apimachinery/pkg/types"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
const (
@ -36,6 +39,25 @@ const (
annotationPrefix = "cdi.k8s.io/"
)
// generate container annotations using CDI UpdateAnnotations API.
func generateCDIAnnotations(
claimUID types.UID,
driverName string,
cdiDevices []string,
) ([]kubecontainer.Annotation, error) {
annotations, err := updateAnnotations(map[string]string{}, driverName, string(claimUID), cdiDevices)
if err != nil {
return nil, fmt.Errorf("can't generate CDI annotations: %+v", err)
}
kubeAnnotations := []kubecontainer.Annotation{}
for key, value := range annotations {
kubeAnnotations = append(kubeAnnotations, kubecontainer.Annotation{Name: key, Value: value})
}
return kubeAnnotations, nil
}
// updateAnnotations updates annotations with a plugin-specific CDI device
// injection request for the given devices. Upon any error a non-nil error
// is returned and annotations are left intact. By convention plugin should

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/cm/dra/state"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
@ -29,26 +30,7 @@ import (
// to prepare and unprepare a resource claim.
type claimInfo struct {
sync.RWMutex
// name of the DRA driver
driverName string
// claimUID is an UID of the resource claim
claimUID types.UID
// claimName is a name of the resource claim
claimName string
// namespace is a claim namespace
namespace string
// podUIDs is a set of pod UIDs that reference a resource
podUIDs sets.Set[string]
// cdiDevices is a list of CDI devices returned by the
// GRPC API call NodePrepareResource
cdiDevices []string
state.ClaimInfoState
// annotations is a list of container annotations associated with
// a prepared resource
annotations []kubecontainer.Annotation
@ -58,41 +40,86 @@ func (res *claimInfo) addPodReference(podUID types.UID) {
res.Lock()
defer res.Unlock()
res.podUIDs.Insert(string(podUID))
res.PodUIDs.Insert(string(podUID))
}
func (res *claimInfo) deletePodReference(podUID types.UID) {
res.Lock()
defer res.Unlock()
res.podUIDs.Delete(string(podUID))
res.PodUIDs.Delete(string(podUID))
}
// claimInfoCache is a cache of processed resource claims keyed by namespace + claim name.
type claimInfoCache struct {
sync.RWMutex
state state.CheckpointState
claimInfo map[string]*claimInfo
}
// newClaimInfoCache is a function that returns an instance of the claimInfoCache.
func newClaimInfoCache() *claimInfoCache {
return &claimInfoCache{
claimInfo: make(map[string]*claimInfo),
func newClaimInfo(driverName string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string], cdiDevice []string) (*claimInfo, error) {
claimInfoState := state.ClaimInfoState{
DriverName: driverName,
ClaimUID: claimUID,
ClaimName: claimName,
Namespace: namespace,
PodUIDs: podUIDs,
CdiDevices: cdiDevice,
}
// NOTE: Passing CDI device names as annotations is a temporary solution
// It will be removed after all runtimes are updated
// to get CDI device names from the ContainerConfig.CDIDevices field
annotations, err := generateCDIAnnotations(claimUID, driverName, cdiDevice)
if err != nil {
return nil, fmt.Errorf("failed to generate container annotations, err: %+v", err)
}
claimInfo := claimInfo{
ClaimInfoState: claimInfoState,
annotations: annotations,
}
return &claimInfo, nil
}
func (cache *claimInfoCache) add(claim, namespace string, res *claimInfo) error {
// newClaimInfoCache is a function that returns an instance of the claimInfoCache.
func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) {
stateImpl, err := state.NewCheckpointState(stateDir, checkpointName)
if err != nil {
return nil, fmt.Errorf("could not initialize checkpoint manager, please drain node and remove dra state file, err: %+v", err)
}
curState, err := stateImpl.GetOrCreate()
if err != nil {
return nil, fmt.Errorf("error calling GetOrCreate() on checkpoint state: %v", err)
}
cache := &claimInfoCache{
state: stateImpl,
claimInfo: make(map[string]*claimInfo),
}
for _, entry := range curState {
info, err := newClaimInfo(
entry.DriverName,
entry.ClaimUID,
entry.ClaimName,
entry.Namespace,
entry.PodUIDs,
entry.CdiDevices,
)
if err != nil {
return nil, fmt.Errorf("failed to create claimInfo %+v: %+v", info, err)
}
cache.add(info)
}
return cache, nil
}
func (cache *claimInfoCache) add(res *claimInfo) {
cache.Lock()
defer cache.Unlock()
key := claim + namespace
if _, ok := cache.claimInfo[key]; ok {
return fmt.Errorf("claim %s, namespace %s already cached", claim, namespace)
}
cache.claimInfo[claim+namespace] = res
return nil
cache.claimInfo[res.ClaimName+res.Namespace] = res
}
func (cache *claimInfoCache) get(claimName, namespace string) *claimInfo {
@ -118,10 +145,22 @@ func (cache *claimInfoCache) hasPodReference(UID types.UID) bool {
defer cache.RUnlock()
for _, claimInfo := range cache.claimInfo {
if claimInfo.podUIDs.Has(string(UID)) {
if claimInfo.PodUIDs.Has(string(UID)) {
return true
}
}
return false
}
func (cache *claimInfoCache) syncToCheckpoint() error {
cache.RLock()
defer cache.RUnlock()
claimInfoStateList := make(state.ClaimInfoStateList, 0, len(cache.claimInfo))
for _, infoClaim := range cache.claimInfo {
claimInfoStateList = append(claimInfoStateList, infoClaim.ClaimInfoState)
}
return cache.state.Store(claimInfoStateList)
}

View File

@ -31,6 +31,9 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// draManagerStateFileName is the file name where dra manager stores its state
const draManagerStateFileName = "dra_manager_state"
// ManagerImpl is the structure in charge of managing DRA resource Plugins.
type ManagerImpl struct {
// cache contains cached claim info
@ -41,36 +44,22 @@ type ManagerImpl struct {
}
// NewManagerImpl creates a new manager.
func NewManagerImpl(kubeClient clientset.Interface) (*ManagerImpl, error) {
func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (*ManagerImpl, error) {
klog.V(2).InfoS("Creating DRA manager")
claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName)
if err != nil {
return nil, fmt.Errorf("failed to create claimInfo cache: %+v", err)
}
manager := &ManagerImpl{
cache: newClaimInfoCache(),
cache: claimInfoCache,
kubeClient: kubeClient,
}
return manager, nil
}
// Generate container annotations using CDI UpdateAnnotations API.
func generateCDIAnnotations(
claimUID types.UID,
driverName string,
cdiDevices []string,
) ([]kubecontainer.Annotation, error) {
annotations, err := updateAnnotations(map[string]string{}, driverName, string(claimUID), cdiDevices)
if err != nil {
return nil, fmt.Errorf("can't generate CDI annotations: %+v", err)
}
kubeAnnotations := []kubecontainer.Annotation{}
for key, value := range annotations {
kubeAnnotations = append(kubeAnnotations, kubecontainer.Annotation{Name: key, Value: value})
}
return kubeAnnotations, nil
}
// PrepareResources attempts to prepare all of the required resource
// plugin resources for the input container, issue an NodePrepareResource rpc request
// for each new resource requirement, process their responses and update the cached
@ -83,10 +72,14 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)
// Resource is already prepared, add pod UID to it
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
// resource is already prepared, add pod UID to it
// We delay checkpointing of this change until this call returns successfully.
// It is OK to do this because we will only return successfully from this call if
// the checkpoint has succeeded. That means if the kubelet is ever restarted
// before this checkpoint succeeds, the pod whose resources are being prepared
// would never have started, so it's OK (actually correct) to not include it in the cache.
claimInfo.addPodReference(pod.UID)
continue
}
@ -126,39 +119,36 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
klog.V(3).InfoS("NodePrepareResource succeeded", "response", response)
// NOTE: Passing CDI device names as annotations is a temporary solution
// It will be removed after all runtimes are updated
// to get CDI device names from the ContainerConfig.CDIDevices field
annotations, err := generateCDIAnnotations(resourceClaim.UID, driverName, response.CdiDevices)
if err != nil {
return fmt.Errorf("failed to generate container annotations, err: %+v", err)
}
// Cache prepared resource
err = m.cache.add(
// TODO: We are adding the claimInfo struct to the cache and syncing it to the checkpoint *after* the NodePrepareResource
// call has completed. This will cause issues if the kubelet gets restarted between NodePrepareResource and syncToCheckpoint.
// It will result in not calling NodeUnprepareResource for this claim because no claimInfo will be synced back to the cache
// for it after the restart. We need to resolve this issue before moving to beta.
claimInfo, err := newClaimInfo(
driverName,
resourceClaim.UID,
resourceClaim.Name,
resourceClaim.Namespace,
&claimInfo{
driverName: driverName,
claimUID: resourceClaim.UID,
claimName: resourceClaim.Name,
namespace: resourceClaim.Namespace,
podUIDs: sets.New(string(pod.UID)),
cdiDevices: response.CdiDevices,
annotations: annotations,
})
sets.New(string(pod.UID)),
response.CdiDevices)
if err != nil {
return fmt.Errorf(
"failed to cache prepared resource, claim: %s(%s), err: %+v",
resourceClaim.Name,
resourceClaim.UID,
err,
)
return fmt.Errorf("newClaimInfo failed, claim UID: %s, claim name: %s, claim namespace: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceClaim.Namespace, err)
}
m.cache.add(claimInfo)
// Checkpoint to reduce redundant calls to NodePrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
}
}
}
// Checkpoint to capture all of the previous addPodReference() calls.
err := m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
return nil
}
@ -181,9 +171,9 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, claimName)
}
klog.V(3).InfoS("add resource annotations", "claim", claimName, "annotations", claimInfo.annotations)
klog.V(3).InfoS("Add resource annotations", "claim", claimName, "annotations", claimInfo.annotations)
annotations = append(annotations, claimInfo.annotations...)
for _, cdiDevice := range claimInfo.cdiDevices {
for _, cdiDevice := range claimInfo.CdiDevices {
cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: cdiDevice})
}
}
@ -208,43 +198,54 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
}
// Skip calling NodeUnprepareResource if other pods are still referencing it
if len(claimInfo.podUIDs) > 1 {
if len(claimInfo.PodUIDs) > 1 {
// We delay checkpointing of this change until this call returns successfully.
// It is OK to do this because we will only return successfully from this call if
// the checkpoint has succeeded. That means if the kubelet is ever restarted
// before this checkpoint succeeds, we will simply call into this (idempotent)
// function again.
claimInfo.deletePodReference(pod.UID)
continue
}
// Call NodeUnprepareResource only for the last pod that references the claim
client, err := dra.NewDRAPluginClient(claimInfo.driverName)
client, err := dra.NewDRAPluginClient(claimInfo.DriverName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.driverName, err)
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.DriverName, err)
}
response, err := client.NodeUnprepareResource(
context.Background(),
claimInfo.namespace,
claimInfo.claimUID,
claimInfo.claimName,
claimInfo.cdiDevices)
claimInfo.Namespace,
claimInfo.ClaimUID,
claimInfo.ClaimName,
claimInfo.CdiDevices)
if err != nil {
return fmt.Errorf(
"NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, CDI devices: %s, err: %+v",
pod.Name,
claimInfo.claimUID,
claimInfo.claimName,
claimInfo.cdiDevices, err)
pod.Name, claimInfo.ClaimUID, claimInfo.ClaimName, claimInfo.CdiDevices, err)
}
klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response)
// Delete last pod UID only if NodeUnprepareResource call succeeds.
// This ensures that status manager doesn't enter termination status
// for the pod. This logic is implemented in the m.PodMightNeedToUnprepareResources
// and in the claimInfo.hasPodReference.
claimInfo.deletePodReference(pod.UID)
klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response)
// delete resource from the cache
m.cache.delete(claimInfo.claimName, pod.Namespace)
m.cache.delete(claimInfo.ClaimName, pod.Namespace)
// Checkpoint to reduce redundant calls to NodeUnPrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
}
// Checkpoint to capture all of the previous deletePodReference() calls.
err := m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
return nil
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"encoding/json"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
var _ checkpointmanager.Checkpoint = &DRAManagerCheckpoint{}
const checkpointVersion = "v1"
// DRAManagerCheckpoint struct is used to store pod dynamic resources assignments in a checkpoint
type DRAManagerCheckpoint struct {
Version string `json:"version"`
Entries ClaimInfoStateList `json:"entries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
}
// List of claim info to store in checkpoint
type ClaimInfoStateList []ClaimInfoState
// NewDRAManagerCheckpoint returns an instance of Checkpoint
func NewDRAManagerCheckpoint() *DRAManagerCheckpoint {
return &DRAManagerCheckpoint{
Version: checkpointVersion,
Entries: ClaimInfoStateList{},
}
}
// MarshalCheckpoint returns marshalled checkpoint
func (dc *DRAManagerCheckpoint) MarshalCheckpoint() ([]byte, error) {
// make sure checksum wasn't set before so it doesn't affect output checksum
dc.Checksum = 0
dc.Checksum = checksum.New(dc)
return json.Marshal(*dc)
}
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint
func (dc *DRAManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, dc)
}
// VerifyChecksum verifies that current checksum of checkpoint is valid
func (dc *DRAManagerCheckpoint) VerifyChecksum() error {
ck := dc.Checksum
dc.Checksum = 0
err := ck.Verify(dc)
dc.Checksum = ck
return err
}

View File

@ -0,0 +1,115 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"fmt"
"sync"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)
var _ CheckpointState = &stateCheckpoint{}
// CheckpointState interface provides to get and store state
type CheckpointState interface {
GetOrCreate() (ClaimInfoStateList, error)
Store(ClaimInfoStateList) error
}
// ClaimInfoState is used to store claim info state in a checkpoint
type ClaimInfoState struct {
// Name of the DRA driver
DriverName string
// ClaimUID is an UID of the resource claim
ClaimUID types.UID
// ClaimName is a name of the resource claim
ClaimName string
// Namespace is a claim namespace
Namespace string
// PodUIDs is a set of pod UIDs that reference a resource
PodUIDs sets.Set[string]
// CdiDevices is a list of CDI devices returned by the
// GRPC API call NodePrepareResource
CdiDevices []string
}
type stateCheckpoint struct {
sync.RWMutex
checkpointManager checkpointmanager.CheckpointManager
checkpointName string
}
// NewCheckpointState creates new State for keeping track of claim info with checkpoint backend
func NewCheckpointState(stateDir, checkpointName string) (*stateCheckpoint, error) {
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
stateCheckpoint := &stateCheckpoint{
checkpointManager: checkpointManager,
checkpointName: checkpointName,
}
return stateCheckpoint, nil
}
// get state from a checkpoint and creates it if it doesn't exist
func (sc *stateCheckpoint) GetOrCreate() (ClaimInfoStateList, error) {
sc.Lock()
defer sc.Unlock()
checkpoint := NewDRAManagerCheckpoint()
err := sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint)
if err == errors.ErrCheckpointNotFound {
sc.store(ClaimInfoStateList{})
return ClaimInfoStateList{}, nil
}
if err != nil {
return nil, fmt.Errorf("failed to get checkpoint %v: %v", sc.checkpointName, err)
}
return checkpoint.Entries, nil
}
// saves state to a checkpoint
func (sc *stateCheckpoint) Store(claimInfoStateList ClaimInfoStateList) error {
sc.Lock()
defer sc.Unlock()
return sc.store(claimInfoStateList)
}
// saves state to a checkpoint, caller is responsible for locking
func (sc *stateCheckpoint) store(claimInfoStateList ClaimInfoStateList) error {
checkpoint := NewDRAManagerCheckpoint()
checkpoint.Entries = claimInfoStateList
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil {
return fmt.Errorf("could not save checkpoint %s: %v", sc.checkpointName, err)
}
return nil
}

View File

@ -0,0 +1,151 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"os"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing"
)
const testingCheckpoint = "dramanager_checkpoint_test"
// assertStateEqual marks provided test as failed if provided states differ
func assertStateEqual(t *testing.T, restoredState, expectedState ClaimInfoStateList) {
assert.Equal(t, expectedState, restoredState, "expected ClaimInfoState does not equal to restored one")
}
func TestCheckpointGetOrCreate(t *testing.T) {
testCases := []struct {
description string
checkpointContent string
expectedError string
expectedState ClaimInfoStateList
}{
{
"Create non-existing checkpoint",
"",
"",
[]ClaimInfoState{},
},
{
"Restore valid checkpoint",
`{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CdiDevices":["example.com/example=cdi-example"]}],"checksum":2939981547}`,
"",
[]ClaimInfoState{{
DriverName: "test-driver.cdi.k8s.io",
ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7",
ClaimName: "example",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
CdiDevices: []string{"example.com/example=cdi-example"},
},
},
},
{
"Restore checkpoint with invalid checksum",
`{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CdiDevices":["example.com/example=cdi-example"]}],"checksum":2939981548}`,
"checkpoint is corrupted",
[]ClaimInfoState{},
},
{
"Restore checkpoint with invalid JSON",
`{`,
"unexpected end of JSON input",
[]ClaimInfoState{},
},
}
// create temp dir
testingDir, err := os.MkdirTemp("", "dramanager_state_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(testingDir)
// create checkpoint manager for testing
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
assert.NoError(t, err, "could not create testing checkpoint manager")
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
// ensure there is no previous checkpoint
assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint")
// prepare checkpoint for testing
if strings.TrimSpace(tc.checkpointContent) != "" {
checkpoint := &testutil.MockCheckpoint{Content: tc.checkpointContent}
assert.NoError(t, cpm.CreateCheckpoint(testingCheckpoint, checkpoint), "could not create testing checkpoint")
}
var state ClaimInfoStateList
checkpointState, err := NewCheckpointState(testingDir, testingCheckpoint)
if err == nil {
state, err = checkpointState.GetOrCreate()
}
if strings.TrimSpace(tc.expectedError) != "" {
assert.Error(t, err)
assert.Contains(t, err.Error(), tc.expectedError)
} else {
assert.NoError(t, err, "unexpected error while creating checkpointState")
// compare state after restoration with the one expected
assertStateEqual(t, state, tc.expectedState)
}
})
}
}
func TestCheckpointStateStore(t *testing.T) {
claimInfoState := ClaimInfoState{
DriverName: "test-driver.cdi.k8s.io",
ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7",
ClaimName: "example",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
CdiDevices: []string{"example.com/example=cdi-example"},
}
expectedCheckpoint := `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CdiDevices":["example.com/example=cdi-example"]}],"checksum":2939981547}`
// create temp dir
testingDir, err := os.MkdirTemp("", "dramanager_state_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(testingDir)
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
assert.NoError(t, err, "could not create testing checkpoint manager")
assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint")
cs, err := NewCheckpointState(testingDir, testingCheckpoint)
assert.NoError(t, err, "could not create testing checkpointState instance")
err = cs.Store(ClaimInfoStateList{claimInfoState})
assert.NoError(t, err, "could not store ClaimInfoState")
checkpoint := NewDRAManagerCheckpoint()
cpm.GetCheckpoint(testingCheckpoint, checkpoint)
checkpointData, err := checkpoint.MarshalCheckpoint()
assert.NoError(t, err, "could not Marshal Checkpoint")
assert.Equal(t, expectedCheckpoint, string(checkpointData), "expected ClaimInfoState does not equal to restored one")
}