Use workqueue model in expand controller
Add a test for patch creation
This commit is contained in:

committed by
Hemant Kumar

parent
851afa0bea
commit
c16a555654
@@ -21,6 +21,10 @@ import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
@@ -46,45 +50,154 @@ func ClaimToClaimKey(claim *v1.PersistentVolumeClaim) string {
|
||||
return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
|
||||
}
|
||||
|
||||
// UpdatePVSize updates just pv size after cloudprovider resizing is successful
|
||||
func UpdatePVSize(
|
||||
pv *v1.PersistentVolume,
|
||||
newSize resource.Quantity,
|
||||
kubeClient clientset.Interface) error {
|
||||
pvClone := pv.DeepCopy()
|
||||
|
||||
oldData, err := json.Marshal(pvClone)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unexpected error marshaling old PV %q with error : %v", pvClone.Name, err)
|
||||
}
|
||||
|
||||
pvClone.Spec.Capacity[v1.ResourceStorage] = newSize
|
||||
|
||||
newData, err := json.Marshal(pvClone)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unexpected error marshaling new PV %q with error : %v", pvClone.Name, err)
|
||||
}
|
||||
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvClone)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error Creating two way merge patch for PV %q with error : %v", pvClone.Name, err)
|
||||
}
|
||||
|
||||
_, err = kubeClient.CoreV1().PersistentVolumes().Patch(pvClone.Name, types.StrategicMergePatchType, patchBytes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error Patching PV %q with error : %v", pvClone.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarkResizeInProgress marks cloudprovider resizing as in progress
|
||||
func MarkResizeInProgress(
|
||||
pvc *v1.PersistentVolumeClaim,
|
||||
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
|
||||
// Mark PVC as Resize Started
|
||||
progressCondition := v1.PersistentVolumeClaimCondition{
|
||||
Type: v1.PersistentVolumeClaimResizing,
|
||||
Status: v1.ConditionTrue,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
}
|
||||
conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
|
||||
newPVC := pvc.DeepCopy()
|
||||
newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
|
||||
return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
|
||||
}
|
||||
|
||||
// MarkForFSResize marks file system resizing as pending
|
||||
func MarkForFSResize(
|
||||
pvc *v1.PersistentVolumeClaim,
|
||||
kubeClient clientset.Interface) error {
|
||||
pvcCondition := v1.PersistentVolumeClaimCondition{
|
||||
Type: v1.PersistentVolumeClaimFileSystemResizePending,
|
||||
Status: v1.ConditionTrue,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.",
|
||||
}
|
||||
conditions := []v1.PersistentVolumeClaimCondition{pvcCondition}
|
||||
newPVC := pvc.DeepCopy()
|
||||
newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
|
||||
_, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
|
||||
return err
|
||||
}
|
||||
|
||||
// MarkResizeFinished marks all resizing as done
|
||||
func MarkResizeFinished(
|
||||
pvc *v1.PersistentVolumeClaim,
|
||||
newSize resource.Quantity,
|
||||
kubeClient clientset.Interface) error {
|
||||
return MarkFSResizeFinished(pvc, newSize, kubeClient)
|
||||
}
|
||||
|
||||
// MarkFSResizeFinished marks file system resizing as done
|
||||
func MarkFSResizeFinished(
|
||||
pvc *v1.PersistentVolumeClaim,
|
||||
capacity v1.ResourceList,
|
||||
newSize resource.Quantity,
|
||||
kubeClient clientset.Interface) error {
|
||||
newPVC := pvc.DeepCopy()
|
||||
newPVC.Status.Capacity = capacity
|
||||
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
|
||||
newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{})
|
||||
_, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
|
||||
return err
|
||||
}
|
||||
|
||||
// PatchPVCStatus updates PVC status using PATCH verb
|
||||
// Don't use Update because this can be called from kubelet and if kubelet has an older client its
|
||||
// Updates will overwrite new fields. And to avoid writing to a stale object, add ResourceVersion
|
||||
// to the patch so that Patch will fail if the patch's RV != actual up-to-date RV like Update would
|
||||
func PatchPVCStatus(
|
||||
oldPVC *v1.PersistentVolumeClaim,
|
||||
newPVC *v1.PersistentVolumeClaim,
|
||||
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
|
||||
pvcName := oldPVC.Name
|
||||
patchBytes, err := createPVCPatch(oldPVC, newPVC)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("PatchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, err)
|
||||
}
|
||||
|
||||
updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
|
||||
Patch(oldPVC.Name, types.StrategicMergePatchType, patchBytes, "status")
|
||||
if updateErr != nil {
|
||||
return nil, fmt.Errorf("PatchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, updateErr)
|
||||
}
|
||||
return updatedClaim, nil
|
||||
}
|
||||
|
||||
func createPVCPatch(
|
||||
oldPVC *v1.PersistentVolumeClaim,
|
||||
newPVC *v1.PersistentVolumeClaim) ([]byte, error) {
|
||||
oldData, err := json.Marshal(oldPVC)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("PatchPVCStatus.Failed to marshal oldData for pvc %q with %v", pvcName, err)
|
||||
return nil, fmt.Errorf("failed to marshal old data: %v", err)
|
||||
}
|
||||
|
||||
newData, err := json.Marshal(newPVC)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("PatchPVCStatus.Failed to marshal newData for pvc %q with %v", pvcName, err)
|
||||
return nil, fmt.Errorf("failed to marshal new data: %v", err)
|
||||
}
|
||||
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPVC)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("PatchPVCStatus.Failed to CreateTwoWayMergePatch for pvc %q with %v ", pvcName, err)
|
||||
return nil, fmt.Errorf("failed to create 2 way merge patch: %v", err)
|
||||
}
|
||||
updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
|
||||
Patch(pvcName, types.StrategicMergePatchType, patchBytes, "status")
|
||||
if updateErr != nil {
|
||||
return nil, fmt.Errorf("PatchPVCStatus.Failed to patch PVC %q with %v", pvcName, updateErr)
|
||||
|
||||
patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to add resource version: %v", err)
|
||||
}
|
||||
return updatedClaim, nil
|
||||
|
||||
return patchBytes, nil
|
||||
}
|
||||
|
||||
func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
|
||||
var patchMap map[string]interface{}
|
||||
err := json.Unmarshal(patchBytes, &patchMap)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error unmarshalling patch: %v", err)
|
||||
}
|
||||
u := unstructured.Unstructured{Object: patchMap}
|
||||
a, err := meta.Accessor(&u)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating accessor: %v", err)
|
||||
}
|
||||
a.SetResourceVersion(resourceVersion)
|
||||
versionBytes, err := json.Marshal(patchMap)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error marshalling json patch: %v", err)
|
||||
}
|
||||
return versionBytes, nil
|
||||
}
|
||||
|
||||
// MergeResizeConditionOnPVC updates pvc with requested resize conditions
|
||||
|
Reference in New Issue
Block a user