Files
kubernetes/pkg/scheduler/framework/plugins/volumezone/volume_zone.go
2024-05-18 23:34:39 +09:00

518 lines
21 KiB
Go

/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumezone
import (
"context"
"errors"
"fmt"
"reflect"
"sort"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
storagehelpers "k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/util"
)
// VolumeZone is a plugin that checks volume zone.
type VolumeZone struct {
pvLister corelisters.PersistentVolumeLister
pvcLister corelisters.PersistentVolumeClaimLister
scLister storagelisters.StorageClassLister
}
var _ framework.FilterPlugin = &VolumeZone{}
var _ framework.PreFilterPlugin = &VolumeZone{}
var _ framework.EnqueueExtensions = &VolumeZone{}
const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = names.VolumeZone
preFilterStateKey framework.StateKey = "PreFilter" + Name
// ErrReasonConflict is used for NoVolumeZoneConflict predicate error.
ErrReasonConflict = "node(s) had no available volume zone"
)
// pvTopology holds the value of a pv's topologyLabel
type pvTopology struct {
pvName string
key string
values sets.Set[string]
}
// the state is initialized in PreFilter phase. because we save the pointer in
// framework.CycleState, in the later phases we don't need to call Write method
// to update the value
type stateData struct {
// podPVTopologies holds the pv information we need
// it's initialized in the PreFilter phase
podPVTopologies []pvTopology
}
func (d *stateData) Clone() framework.StateData {
return d
}
var topologyLabels = []string{
v1.LabelFailureDomainBetaZone,
v1.LabelFailureDomainBetaRegion,
v1.LabelTopologyZone,
v1.LabelTopologyRegion,
}
func translateToGALabel(label string) string {
if label == v1.LabelFailureDomainBetaRegion {
return v1.LabelTopologyRegion
}
if label == v1.LabelFailureDomainBetaZone {
return v1.LabelTopologyZone
}
return label
}
// Name returns name of the plugin. It is used in logs, etc.
func (pl *VolumeZone) Name() string {
return Name
}
// PreFilter invoked at the prefilter extension point
//
// # It finds the topology of the PersistentVolumes corresponding to the volumes a pod requests
//
// Currently, this is only supported with PersistentVolumeClaims,
// and only looks for the bound PersistentVolume.
func (pl *VolumeZone) PreFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
logger := klog.FromContext(ctx)
podPVTopologies, status := pl.getPVbyPod(logger, pod)
if !status.IsSuccess() {
return nil, status
}
if len(podPVTopologies) == 0 {
return nil, framework.NewStatus(framework.Skip)
}
cs.Write(preFilterStateKey, &stateData{podPVTopologies: podPVTopologies})
return nil, nil
}
// isWaitForFirstConsumer confirms whether storageClass's volumeBindingMode is VolumeBindingWaitForFirstConsumer or not.
func (pl *VolumeZone) isWaitForFirstConsumer(scName string) (bool, *framework.Status) {
class, err := pl.scLister.Get(scName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return false, s
}
if class.VolumeBindingMode == nil {
return false, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("VolumeBindingMode not set for StorageClass %q", scName))
}
if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
// Return true because volumeBindingMode of storageClass is VolumeBindingWaitForFirstConsumer.
return true, nil
}
return false, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolume had no name")
}
// getPVbyPod gets PVTopology from pod
func (pl *VolumeZone) getPVbyPod(logger klog.Logger, pod *v1.Pod) ([]pvTopology, *framework.Status) {
podPVTopologies := make([]pvTopology, 0)
pvcNames := pl.getPersistentVolumeClaimNameFromPod(pod)
for _, pvcName := range pvcNames {
if pvcName == "" {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no name")
}
pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return nil, s
}
pvName := pvc.Spec.VolumeName
if pvName == "" {
scName := storagehelpers.GetPersistentVolumeClaimClass(pvc)
if len(scName) == 0 {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no pv name and storageClass name")
}
isWait, status := pl.isWaitForFirstConsumer(scName)
if !isWait {
return nil, status
}
continue
}
pv, err := pl.pvLister.Get(pvName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return nil, s
}
podPVTopologies = append(podPVTopologies, pl.getPVTopologiesFromPV(logger, pv)...)
}
return podPVTopologies, nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.
func (pl *VolumeZone) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
// Filter invoked at the filter extension point.
//
// It evaluates if a pod can fit due to the volumes it requests, given
// that some volumes may have zone scheduling constraints. The requirement is that any
// volume zone-labels must match the equivalent zone-labels on the node. It is OK for
// the node to have more zone-label constraints (for example, a hypothetical replicated
// volume might allow region-wide access)
//
// Currently this is only supported with PersistentVolumeClaims, and looks to the labels
// only on the bound PersistentVolume.
//
// Working with volumes declared inline in the pod specification (i.e. not
// using a PersistentVolume) is likely to be harder, as it would require
// determining the zone of a volume during scheduling, and that is likely to
// require calling out to the cloud provider. It seems that we are moving away
// from inline volume declarations anyway.
func (pl *VolumeZone) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
logger := klog.FromContext(ctx)
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len(pod.Spec.Volumes) == 0 {
return nil
}
var podPVTopologies []pvTopology
state, err := getStateData(cs)
if err != nil {
// Fallback to calculate pv list here
var status *framework.Status
podPVTopologies, status = pl.getPVbyPod(logger, pod)
if !status.IsSuccess() {
return status
}
} else {
podPVTopologies = state.podPVTopologies
}
node := nodeInfo.Node()
status, _ := pl.isSchedulableNode(logger, podPVTopologies, node, pod)
return status
}
func getStateData(cs *framework.CycleState) (*stateData, error) {
state, err := cs.Read(preFilterStateKey)
if err != nil {
return nil, err
}
s, ok := state.(*stateData)
if !ok {
return nil, errors.New("unable to convert state into stateData")
}
return s, nil
}
func getErrorAsStatus(err error) *framework.Status {
if err != nil {
if apierrors.IsNotFound(err) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return framework.AsStatus(err)
}
return nil
}
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
// New storageClass with bind mode `VolumeBindingWaitForFirstConsumer` will make a pod schedulable.
// Due to immutable field `storageClass.volumeBindingMode`, storageClass update events are ignored.
{Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add}, QueueingHintFn: pl.isSchedulableAfterStorageClassAdded},
// A new node or updating a node's volume zone labels may make a pod schedulable.
//
// A note about UpdateNodeTaint event:
// NodeAdd QueueingHint isn't always called because of the internal feature called preCheck.
// As a common problematic scenario,
// when a node is added but not ready, NodeAdd event is filtered out by preCheck and doesn't arrive.
// In such cases, this plugin may miss some events that actually make pods schedulable.
// As a workaround, we add UpdateNodeTaint event to catch the case.
// We can remove UpdateNodeTaint when we remove the preCheck feature.
// See: https://github.com/kubernetes/kubernetes/issues/110175
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
// A new pvc may make a pod schedulable.
// PVC's VolumeName is mutable if old PVC's volumeName is empty.
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange},
// A new pv or updating a pv's volume zone labels may make a pod schedulable.
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeChange},
}
}
// getPersistentVolumeClaimNameFromPod gets pvc names bound to a pod.
func (pl *VolumeZone) getPersistentVolumeClaimNameFromPod(pod *v1.Pod) []string {
var pvcNames []string
for i := range pod.Spec.Volumes {
volume := pod.Spec.Volumes[i]
if volume.PersistentVolumeClaim == nil {
continue
}
pvcName := volume.PersistentVolumeClaim.ClaimName
pvcNames = append(pvcNames, pvcName)
}
return pvcNames
}
// isSchedulableAfterStorageClassAdded is invoked whenever a StorageClass is added.
// It checks whether the addition of StorageClass has made a previously unschedulable pod schedulable.
// Only a new StorageClass with WaitForFirstConsumer will cause a pod to become schedulable.
func (pl *VolumeZone) isSchedulableAfterStorageClassAdded(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
_, addedStorageClass, err := util.As[*storage.StorageClass](nil, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterStorageClassAdded: %w", err)
}
pvcNames := pl.getPersistentVolumeClaimNameFromPod(pod)
for _, pvcName := range pvcNames {
pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
if err != nil {
logger.Error(err, "unexpected error during getting PVC", "Namespace", pod.Namespace, "pvc", pvcName)
continue
}
pvName := pvc.Spec.VolumeName
if pvName != "" {
// We can skip PersistentVolume because we only want to check storageClass
continue
}
scName := storagehelpers.GetPersistentVolumeClaimClass(pvc)
if scName == addedStorageClass.Name {
if (addedStorageClass.VolumeBindingMode != nil) && (*addedStorageClass.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer) {
logger.V(5).Info("a new WaitForFirstConsumer storageClass for the pod's PVC is created, which might make the pod schedulable", "storageClass", klog.KObj(addedStorageClass), "pod", klog.KObj(pod))
return framework.Queue, nil
}
}
}
logger.V(5).Info("StorageClass was created but it doesn't make this pod schedulable", "pod", klog.KObj(pod), "StorageClass", klog.KObj(addedStorageClass))
return framework.QueueSkip, nil
}
// isSchedulableAfterNodeChange is invoked whenever a node added or updated.
// It checks whether the change of Node has made a previously unschedulable pod schedulable.
// If the node label related to pod's pv topology is changed, it could make the pod schedulable.
func (pl *VolumeZone) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalNode, modifiedNode, err := util.As[*v1.Node](oldObj, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterNodeChange: %w", err)
}
podPVTopologies, status := pl.getPVbyPod(logger, pod)
if !status.IsSuccess() {
return framework.Queue, status.AsError()
}
_, isMatched := pl.isSchedulableNode(logger, podPVTopologies, modifiedNode, pod)
if !isMatched {
logger.V(5).Info("node was created or updated but it doesn't make this pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.QueueSkip, nil
}
wasMatched := false
if originalNode != nil {
_, wasMatched = pl.isSchedulableNode(logger, podPVTopologies, originalNode, pod)
}
if !wasMatched {
logger.V(5).Info("node was created or updated, which might make the pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.Queue, nil
}
logger.V(5).Info("node was created or updated but it doesn't make this pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.QueueSkip, nil
}
// isSchedulablerNode checks whether the node can schedule the pod.
func (pl *VolumeZone) isSchedulableNode(logger klog.Logger, podPVTopologies []pvTopology, node *v1.Node, pod *v1.Pod) (*framework.Status, bool) {
hasAnyNodeConstraint := false
for _, topologyLabel := range topologyLabels {
if _, ok := node.Labels[topologyLabel]; ok {
hasAnyNodeConstraint = true
break
}
}
if !hasAnyNodeConstraint {
// The node has no zone constraints, so we're OK to schedule.
// This is to handle a single-zone cluster scenario where the node may not have any topology labels.
return nil, true
}
for _, pvTopology := range podPVTopologies {
v, ok := node.Labels[pvTopology.key]
if !ok {
// if we can't match the beta label, try to match pv's beta label with node's ga label
v, ok = node.Labels[translateToGALabel(pvTopology.key)]
}
if !ok || !pvTopology.values.Has(v) {
logger.V(10).Info("Cannot schedule pod onto node due to volume (mismatch on label key)", "pod", klog.KObj(pod), "node", klog.KObj(node), "PV", klog.KRef("", pvTopology.pvName), "PVLabelKey", pvTopology.key)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict), false
}
}
return nil, true
}
// isSchedulableAfterPersistentVolumeClaimChange is invoked whenever a PersistentVolumeClaim added or updated.
// It checks whether the change of PVC has made a previously unschedulable pod schedulable.
// A PVC becoming bound or using a WaitForFirstConsumer storageclass can cause the pod to become schedulable.
func (pl *VolumeZone) isSchedulableAfterPersistentVolumeClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalPVC, modifiedPVC, err := util.As[*v1.PersistentVolumeClaim](oldObj, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPersistentVolumeClaimChange: %w", err)
}
isMatched := pl.checkPVCBindingToPodPV(logger, modifiedPVC, pod)
// updated PVC is not schedulable because PVC doesn't match the pod's PVC
if !isMatched {
logger.V(5).Info("PVC was created or updated but it doesn't make this pod schedulable.", "pod", klog.KObj(pod), "PVC", klog.KObj(modifiedPVC))
return framework.QueueSkip, nil
}
wasMatched := pl.checkPVCBindingToPodPV(logger, originalPVC, pod)
// the PVC that didn't match the pod now matches the pod
if isMatched && !wasMatched {
logger.V(5).Info("PVC was created or updated, which might make the pod schedulable. The given PVC matches the pod's PVC", "pod", klog.KObj(pod), "PVC", klog.KObj(modifiedPVC))
return framework.Queue, nil
}
logger.V(5).Info("PVC was created or updated but it doesn't make this pod schedulable. Nothing has changed about PV bound to PVC.", "pod", klog.KObj(pod), "PVC", klog.KObj(modifiedPVC))
return framework.QueueSkip, nil
}
// checkPVCBindingToPodPV verifies if the PVC is bound to PV of a given Pod.
func (pl *VolumeZone) checkPVCBindingToPodPV(logger klog.Logger, pvc *v1.PersistentVolumeClaim, pod *v1.Pod) bool {
if pvc == nil {
return false
}
pvcNames := pl.getPersistentVolumeClaimNameFromPod(pod)
for _, pvcName := range pvcNames {
if (pvc.Name != pvcName) || (pod.Namespace != pvc.Namespace) {
// pod's PVC doesn't match with the given PVC
continue
}
logger.V(5).Info("PVC matches the pod's PVC", "pod", klog.KObj(pod), "PVC", klog.KObj(pvc))
pvName := pvc.Spec.VolumeName
if pvName == "" {
scName := storagehelpers.GetPersistentVolumeClaimClass(pvc)
if len(scName) == 0 {
return false
}
isWait, _ := pl.isWaitForFirstConsumer(scName)
if !isWait {
logger.V(5).Info("PVC is bound to storageClass but the volumeBindingMode is not WaitForFirstConsumer", "storageClass", scName)
return false
}
}
return true
}
logger.V(5).Info("PVC doesn't match the pod's PVC", "pod", klog.KObj(pod), "PVC", klog.KObj(pvc))
return false
}
// isSchedulableAfterPersistentVolumeChange is invoked whenever a PersistentVolume added or updated.
// It checks whether the change of PVC has made a previously unschedulable pod schedulable.
// Changing the PV topology labels could cause the pod to become schedulable.
func (pl *VolumeZone) isSchedulableAfterPersistentVolumeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalPV, modifiedPV, err := util.As[*v1.PersistentVolume](oldObj, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPersistentVolumeChange: %w", err)
}
originalPVTopologies := make([]pvTopology, 0)
if originalPV != nil {
originalPVTopologies, _ = pl.getPVTopologiesAndSort(logger, pod, originalPV)
}
modifiedPVTopologies, isPodBoundToPV := pl.getPVTopologiesAndSort(logger, pod, modifiedPV)
if isPodBoundToPV && !reflect.DeepEqual(originalPVTopologies, modifiedPVTopologies) {
logger.V(5).Info("PV was created or updated, which might make the pod schedulable", "pod", klog.KObj(pod), "PV", klog.KObj(modifiedPV))
return framework.Queue, nil
}
logger.V(5).Info("PV was created or updated, but it doesn't make this pod schedulable", "pod", klog.KObj(pod), "PV", klog.KObj(modifiedPV))
return framework.QueueSkip, nil
}
// getPVTopologiesFromPV retrieves pvTopology from a given PV and returns the array
func (pl *VolumeZone) getPVTopologiesFromPV(logger klog.Logger, pv *v1.PersistentVolume) []pvTopology {
podPVTopologies := make([]pvTopology, 0)
for _, key := range topologyLabels {
if value, ok := pv.ObjectMeta.Labels[key]; ok {
volumeVSet, err := volumehelpers.LabelZonesToSet(value)
if err != nil {
logger.V(5).Info("Failed to parse label, ignoring the label", "label", fmt.Sprintf("%s:%s", key, value), "err", err)
continue
}
podPVTopologies = append(podPVTopologies, pvTopology{
pvName: pv.Name,
key: key,
values: sets.Set[string](volumeVSet),
})
}
}
return podPVTopologies
}
// getPVTopologiesAndSort checks whether target PV is bound to pod's PVC.
// If PV is bound to pod's PVC, get PVTopologies and Sort.
func (pl *VolumeZone) getPVTopologiesAndSort(logger klog.Logger, pod *v1.Pod, pv *v1.PersistentVolume) ([]pvTopology, bool) {
isPodBoundToPV := false
podPVTopologies := make([]pvTopology, 0)
pvcNames := pl.getPersistentVolumeClaimNameFromPod(pod)
for _, pvcName := range pvcNames {
pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
if err != nil {
logger.Error(err, "unexpected error during getting PVC", "Namespace", pod.Namespace, "pvc", pvcName)
continue
}
// If target PV is bound to pod's PVC, get PVTopologies
pvName := pvc.Spec.VolumeName
if pvName == pv.Name {
isPodBoundToPV = true
podPVTopologies = append(podPVTopologies, pl.getPVTopologiesFromPV(logger, pv)...)
// One PV is bound to one PVC. One PV can't be bound to more than one PVC.
break
}
}
// Sort PVTopologies based on key order
sort.Slice(podPVTopologies, func(i, j int) bool {
// One PV is bound to one PVC. One PV can't be bound to more than one PVC.
// That's why we don't use pvName here. We use key for sorting podPVTopologies.
return podPVTopologies[i].key > podPVTopologies[j].key
})
return podPVTopologies, isPodBoundToPV
}
// New initializes a new plugin and returns it.
func New(_ context.Context, _ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory()
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
return &VolumeZone{
pvLister,
pvcLister,
scLister,
}, nil
}