controller change for statefulset auto-delete (implementation)
This commit is contained in:
@@ -27,68 +27,115 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
errorutils "k8s.io/apimachinery/pkg/util/errors"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
appslisters "k8s.io/client-go/listers/apps/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
// StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods,
|
||||
// StatefulPodControlObjectManager abstracts the manipulation of Pods and PVCs. The real controller implements this
|
||||
// with a clientset for writes and listers for reads; for tests we provide stubs.
|
||||
type StatefulPodControlObjectManager interface {
|
||||
CreatePod(ctx context.Context, pod *v1.Pod) error
|
||||
GetPod(namespace, podName string) (*v1.Pod, error)
|
||||
UpdatePod(pod *v1.Pod) error
|
||||
DeletePod(pod *v1.Pod) error
|
||||
CreateClaim(claim *v1.PersistentVolumeClaim) error
|
||||
GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error)
|
||||
UpdateClaim(claim *v1.PersistentVolumeClaim) error
|
||||
}
|
||||
|
||||
// StatefulPodControl defines the interface that StatefulSetController uses to create, update, and delete Pods,
|
||||
// and to update the Status of a StatefulSet. It follows the design paradigms used for PodControl, but its
|
||||
// implementation provides for PVC creation, ordered Pod creation, ordered Pod termination, and Pod identity enforcement.
|
||||
// Like controller.PodControlInterface, it is implemented as an interface to provide for testing fakes.
|
||||
type StatefulPodControlInterface interface {
|
||||
// CreateStatefulPod create a Pod in a StatefulSet. Any PVCs necessary for the Pod are created prior to creating
|
||||
// the Pod. If the returned error is nil the Pod and its PVCs have been created.
|
||||
CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error
|
||||
// UpdateStatefulPod Updates a Pod in a StatefulSet. If the Pod already has the correct identity and stable
|
||||
// storage this method is a no-op. If the Pod must be mutated to conform to the Set, it is mutated and updated.
|
||||
// pod is an in-out parameter, and any updates made to the pod are reflected as mutations to this parameter. If
|
||||
// the create is successful, the returned error is nil.
|
||||
UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
|
||||
// DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful,
|
||||
// the returned error is nil.
|
||||
DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
|
||||
}
|
||||
|
||||
func NewRealStatefulPodControl(
|
||||
client clientset.Interface,
|
||||
setLister appslisters.StatefulSetLister,
|
||||
podLister corelisters.PodLister,
|
||||
pvcLister corelisters.PersistentVolumeClaimLister,
|
||||
recorder record.EventRecorder,
|
||||
) StatefulPodControlInterface {
|
||||
return &realStatefulPodControl{client, setLister, podLister, pvcLister, recorder}
|
||||
}
|
||||
|
||||
// realStatefulPodControl implements StatefulPodControlInterface using a clientset.Interface to communicate with the
|
||||
// API server. The struct is package private as the internal details are irrelevant to importing packages.
|
||||
type realStatefulPodControl struct {
|
||||
client clientset.Interface
|
||||
setLister appslisters.StatefulSetLister
|
||||
podLister corelisters.PodLister
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
// Manipulation of objects is provided through objectMgr, which allows the k8s API to be mocked out for testing.
|
||||
type StatefulPodControl struct {
|
||||
objectMgr StatefulPodControlObjectManager
|
||||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
func (spc *realStatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
|
||||
// NewStatefulPodControl constructs a StatefulPodControl using a realStatefulPodControlObjectManager with the given
|
||||
// clientset, listers and EventRecorder.
|
||||
func NewStatefulPodControl(
|
||||
client clientset.Interface,
|
||||
podLister corelisters.PodLister,
|
||||
claimLister corelisters.PersistentVolumeClaimLister,
|
||||
recorder record.EventRecorder,
|
||||
) *StatefulPodControl {
|
||||
return &StatefulPodControl{&realStatefulPodControlObjectManager{client, podLister, claimLister}, recorder}
|
||||
}
|
||||
|
||||
// NewStatefulPodControlFromManager creates a StatefulPodControl using the given StatefulPodControlObjectManager and recorder.
|
||||
func NewStatefulPodControlFromManager(om StatefulPodControlObjectManager, recorder record.EventRecorder) *StatefulPodControl {
|
||||
return &StatefulPodControl{om, recorder}
|
||||
}
|
||||
|
||||
// realStatefulPodControlObjectManager uses a clientset.Interface and listers.
|
||||
type realStatefulPodControlObjectManager struct {
|
||||
client clientset.Interface
|
||||
podLister corelisters.PodLister
|
||||
claimLister corelisters.PersistentVolumeClaimLister
|
||||
}
|
||||
|
||||
func (om *realStatefulPodControlObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
_, err := om.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func (om *realStatefulPodControlObjectManager) GetPod(namespace, podName string) (*v1.Pod, error) {
|
||||
return om.podLister.Pods(namespace).Get(podName)
|
||||
}
|
||||
|
||||
func (om *realStatefulPodControlObjectManager) UpdatePod(pod *v1.Pod) error {
|
||||
_, err := om.client.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func (om *realStatefulPodControlObjectManager) DeletePod(pod *v1.Pod) error {
|
||||
return om.client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
func (om *realStatefulPodControlObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error {
|
||||
_, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), claim, metav1.CreateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func (om *realStatefulPodControlObjectManager) GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) {
|
||||
return om.claimLister.PersistentVolumeClaims(namespace).Get(claimName)
|
||||
}
|
||||
|
||||
func (om *realStatefulPodControlObjectManager) UpdateClaim(claim *v1.PersistentVolumeClaim) error {
|
||||
_, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claim, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
|
||||
// Create the Pod's PVCs prior to creating the Pod
|
||||
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
|
||||
spc.recordPodEvent("create", set, pod, err)
|
||||
return err
|
||||
}
|
||||
// If we created the PVCs attempt to create the Pod
|
||||
_, err := spc.client.CoreV1().Pods(set.Namespace).Create(ctx, pod, metav1.CreateOptions{})
|
||||
err := spc.objectMgr.CreatePod(ctx, pod)
|
||||
// sink already exists errors
|
||||
if apierrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||
// Set PVC policy as much as is possible at this point.
|
||||
if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil {
|
||||
spc.recordPodEvent("update", set, pod, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
spc.recordPodEvent("create", set, pod, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
|
||||
func (spc *StatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
|
||||
attemptedUpdate := false
|
||||
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
// assume the Pod is consistent
|
||||
@@ -108,6 +155,21 @@ func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod
|
||||
return err
|
||||
}
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||
// if the Pod's PVCs are not consistent with the StatefulSet's PVC deletion policy, update the PVC
|
||||
// and dirty the pod.
|
||||
if match, err := spc.ClaimsMatchRetentionPolicy(set, pod); err != nil {
|
||||
spc.recordPodEvent("update", set, pod, err)
|
||||
return err
|
||||
} else if !match {
|
||||
if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil {
|
||||
spc.recordPodEvent("update", set, pod, err)
|
||||
return err
|
||||
}
|
||||
consistent = false
|
||||
}
|
||||
}
|
||||
|
||||
// if the Pod is not dirty, do nothing
|
||||
if consistent {
|
||||
return nil
|
||||
@@ -115,16 +177,17 @@ func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod
|
||||
|
||||
attemptedUpdate = true
|
||||
// commit the update, retrying on conflicts
|
||||
_, updateErr := spc.client.CoreV1().Pods(set.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
|
||||
|
||||
updateErr := spc.objectMgr.UpdatePod(pod)
|
||||
if updateErr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if updated, err := spc.podLister.Pods(set.Namespace).Get(pod.Name); err == nil {
|
||||
if updated, err := spc.objectMgr.GetPod(set.Namespace, pod.Name); err == nil {
|
||||
// make a copy so we don't mutate the shared cache
|
||||
pod = updated.DeepCopy()
|
||||
} else {
|
||||
utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", set.Namespace, pod.Name, err))
|
||||
utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s: %w", set.Namespace, pod.Name, err))
|
||||
}
|
||||
|
||||
return updateErr
|
||||
@@ -135,15 +198,92 @@ func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod
|
||||
return err
|
||||
}
|
||||
|
||||
func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
|
||||
err := spc.client.CoreV1().Pods(set.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
|
||||
func (spc *StatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
|
||||
err := spc.objectMgr.DeletePod(pod)
|
||||
spc.recordPodEvent("delete", set, pod, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// ClaimsMatchRetentionPolicy returns false if the PVCs for pod are not consistent with set's PVC deletion policy.
|
||||
// An error is returned if something is not consistent. This is expected if the pod is being otherwise updated,
|
||||
// but a problem otherwise (see usage of this method in UpdateStatefulPod).
|
||||
func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(set *apps.StatefulSet, pod *v1.Pod) (bool, error) {
|
||||
ordinal := getOrdinal(pod)
|
||||
templates := set.Spec.VolumeClaimTemplates
|
||||
for i := range templates {
|
||||
claimName := getPersistentVolumeClaimName(set, &templates[i], ordinal)
|
||||
claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName)
|
||||
switch {
|
||||
case apierrors.IsNotFound(err):
|
||||
klog.V(4).Infof("Expected claim %s missing, continuing to pick up in next iteration", claimName)
|
||||
case err != nil:
|
||||
return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy", claimName, pod.Name)
|
||||
default:
|
||||
if !claimOwnerMatchesSetAndPod(claim, set, pod) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// UpdatePodClaimForRetentionPolicy updates the PVCs used by pod to match the PVC deletion policy of set.
|
||||
func (spc *StatefulPodControl) UpdatePodClaimForRetentionPolicy(set *apps.StatefulSet, pod *v1.Pod) error {
|
||||
ordinal := getOrdinal(pod)
|
||||
templates := set.Spec.VolumeClaimTemplates
|
||||
for i := range templates {
|
||||
claimName := getPersistentVolumeClaimName(set, &templates[i], ordinal)
|
||||
claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName)
|
||||
switch {
|
||||
case apierrors.IsNotFound(err):
|
||||
klog.V(4).Infof("Expected claim %s missing, continuing to pick up in next iteration.")
|
||||
case err != nil:
|
||||
return fmt.Errorf("Could not retrieve claim %s not found for %s when checking PVC deletion policy: %w", claimName, pod.Name, err)
|
||||
default:
|
||||
if !claimOwnerMatchesSetAndPod(claim, set, pod) {
|
||||
needsUpdate := updateClaimOwnerRefForSetAndPod(claim, set, pod)
|
||||
if needsUpdate {
|
||||
err := spc.objectMgr.UpdateClaim(claim)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not update claim %s for delete policy ownerRefs: %w", claimName, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PodClaimIsStale returns true for a stale PVC that should block pod creation. If the scaling
|
||||
// policy is deletion, and a PVC has an ownerRef that does not match the pod, the PVC is stale. This
|
||||
// includes pods whose UID has not been created.
|
||||
func (spc *StatefulPodControl) PodClaimIsStale(set *apps.StatefulSet, pod *v1.Pod) (bool, error) {
|
||||
policy := getPersistentVolumeClaimRetentionPolicy(set)
|
||||
if policy.WhenScaled == apps.RetainPersistentVolumeClaimRetentionPolicyType {
|
||||
// PVCs are meant to be reused and so can't be stale.
|
||||
return false, nil
|
||||
}
|
||||
for _, claim := range getPersistentVolumeClaims(set, pod) {
|
||||
pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name)
|
||||
switch {
|
||||
case apierrors.IsNotFound(err):
|
||||
// If the claim doesn't exist yet, it can't be stale.
|
||||
continue
|
||||
case err != nil:
|
||||
return false, err
|
||||
case err == nil:
|
||||
// A claim is stale if it doesn't match the pod's UID, including if the pod has no UID.
|
||||
if hasStaleOwnerRef(pvc, pod) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will
|
||||
// have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a reason of v1.EventTypeWarning.
|
||||
func (spc *realStatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) {
|
||||
func (spc *StatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) {
|
||||
if err == nil {
|
||||
reason := fmt.Sprintf("Successful%s", strings.Title(verb))
|
||||
message := fmt.Sprintf("%s Pod %s in StatefulSet %s successful",
|
||||
@@ -160,7 +300,7 @@ func (spc *realStatefulPodControl) recordPodEvent(verb string, set *apps.Statefu
|
||||
// recordClaimEvent records an event for verb applied to the PersistentVolumeClaim of a Pod in a StatefulSet. If err is
|
||||
// nil the generated event will have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a
|
||||
// reason of v1.EventTypeWarning.
|
||||
func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) {
|
||||
func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) {
|
||||
if err == nil {
|
||||
reason := fmt.Sprintf("Successful%s", strings.Title(verb))
|
||||
message := fmt.Sprintf("%s Claim %s Pod %s in StatefulSet %s success",
|
||||
@@ -178,13 +318,13 @@ func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.State
|
||||
// set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method
|
||||
// may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with
|
||||
// set's Spec.
|
||||
func (spc *realStatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
|
||||
func (spc *StatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
|
||||
var errs []error
|
||||
for _, claim := range getPersistentVolumeClaims(set, pod) {
|
||||
pvc, err := spc.pvcLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
|
||||
pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name)
|
||||
switch {
|
||||
case apierrors.IsNotFound(err):
|
||||
_, err := spc.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), &claim, metav1.CreateOptions{})
|
||||
err := spc.objectMgr.CreateClaim(&claim)
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("failed to create PVC %s: %s", claim.Name, err))
|
||||
}
|
||||
@@ -203,5 +343,3 @@ func (spc *realStatefulPodControl) createPersistentVolumeClaims(set *apps.Statef
|
||||
}
|
||||
return errorutils.NewAggregate(errs)
|
||||
}
|
||||
|
||||
var _ StatefulPodControlInterface = &realStatefulPodControl{}
|
||||
|
Reference in New Issue
Block a user