Merge pull request #92222 from cofyc/fix92186

Share pod volume binding cache via framework.CycleState
This commit is contained in:
Kubernetes Prow Robot
2020-06-24 13:31:21 -07:00
committed by GitHub
16 changed files with 260 additions and 649 deletions

View File

@@ -274,7 +274,6 @@ func NewLegacyRegistry() *LegacyRegistry {
plugins.Reserve = appendToPluginSet(plugins.Reserve, volumebinding.Name, nil)
plugins.PreBind = appendToPluginSet(plugins.PreBind, volumebinding.Name, nil)
plugins.Unreserve = appendToPluginSet(plugins.Unreserve, volumebinding.Name, nil)
plugins.PostBind = appendToPluginSet(plugins.PostBind, volumebinding.Name, nil)
return
})
registry.registerPredicateConfigProducer(NoDiskConflictPred,

View File

@@ -11,8 +11,6 @@ go_library(
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)

View File

@@ -24,8 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@@ -39,11 +37,18 @@ const (
stateKey framework.StateKey = Name
)
// the state is initialized in PreFilter phase. because we save the pointer in
// framework.CycleState, in the later phases we don't need to call Write method
// to update the value
type stateData struct {
skip bool // set true if pod does not have PVCs
boundClaims []*v1.PersistentVolumeClaim
claimsToBind []*v1.PersistentVolumeClaim
allBound bool
// podVolumesByNode holds the pod's volume information found in the Filter
// phase for each node
// it's initialized in the PreFilter phase
podVolumesByNode map[string]*scheduling.PodVolumes
}
func (d *stateData) Clone() framework.StateData {
@@ -52,12 +57,7 @@ func (d *stateData) Clone() framework.StateData {
// VolumeBinding is a plugin that binds pod volumes in scheduling.
// In the Filter phase, pod binding cache is created for the pod and used in
// Reserve and PreBind phases. Pod binding cache will be cleared at
// Unreserve and PostBind extension points. However, if pod fails before
// the Reserve phase and is deleted from the apiserver later, its pod binding
// cache cannot be cleared at plugin extension points. We register an
// event handler to clear pod binding cache when the pod is deleted to
// prevent memory leaking.
// Reserve and PreBind phases.
type VolumeBinding struct {
Binder scheduling.SchedulerVolumeBinder
}
@@ -67,7 +67,6 @@ var _ framework.FilterPlugin = &VolumeBinding{}
var _ framework.ReservePlugin = &VolumeBinding{}
var _ framework.PreBindPlugin = &VolumeBinding{}
var _ framework.UnreservePlugin = &VolumeBinding{}
var _ framework.PostBindPlugin = &VolumeBinding{}
// Name is the name of the plugin used in Registry and configurations.
const Name = "VolumeBinding"
@@ -90,7 +89,7 @@ func podHasPVCs(pod *v1.Pod) bool {
// immediate PVCs bound. If not all immediate PVCs are bound, an
// UnschedulableAndUnresolvable is returned.
func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
// If pod does not request any PVC, we don't need to do anything.
// If pod does not reference any PVC, we don't need to do anything.
if !podHasPVCs(pod) {
state.Write(stateKey, &stateData{skip: true})
return nil
@@ -107,7 +106,7 @@ func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleSt
status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
return status
}
state.Write(stateKey, &stateData{boundClaims: boundClaims, claimsToBind: claimsToBind})
state.Write(stateKey, &stateData{boundClaims: boundClaims, claimsToBind: claimsToBind, podVolumesByNode: make(map[string]*scheduling.PodVolumes)})
return nil
}
@@ -155,7 +154,7 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p
return nil
}
reasons, err := pl.Binder.FindPodVolumes(pod, state.boundClaims, state.claimsToBind, node)
podVolumes, reasons, err := pl.Binder.FindPodVolumes(pod, state.boundClaims, state.claimsToBind, node)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
@@ -168,16 +167,31 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p
}
return status
}
cs.Lock()
state.podVolumesByNode[node.Name] = podVolumes
cs.Unlock()
return nil
}
// Reserve reserves volumes of pod and saves binding status in cycle state.
func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName)
state, err := getStateData(cs)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
cs.Write(stateKey, &stateData{allBound: allBound})
// we don't need to hold the lock as only one node will be reserved for the given pod
podVolumes, ok := state.podVolumesByNode[nodeName]
if ok {
allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName, podVolumes)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
state.allBound = allBound
} else {
// may not exist if the pod does not reference any PVC
state.allBound = true
}
return nil
}
@@ -195,8 +209,13 @@ func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState,
// no need to bind volumes
return nil
}
// we don't need to hold the lock as only one node will be pre-bound for the given pod
podVolumes, ok := s.podVolumesByNode[nodeName]
if !ok {
return framework.NewStatus(framework.Error, fmt.Sprintf("no pod volumes found for node %q", nodeName))
}
klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", pod.Namespace, pod.Name)
err = pl.Binder.BindPodVolumes(pod)
err = pl.Binder.BindPodVolumes(pod, podVolumes)
if err != nil {
klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", pod.Namespace, pod.Name, err)
return framework.NewStatus(framework.Error, err.Error())
@@ -205,17 +224,19 @@ func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState,
return nil
}
// Unreserve clears assumed PV and PVC cache and pod binding state.
// It's idempotent, and does nothing if no cache or binding state found for the given pod.
// Unreserve clears assumed PV and PVC cache.
// It's idempotent, and does nothing if no cache found for the given pod.
func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
pl.Binder.RevertAssumedPodVolumes(pod, nodeName)
pl.Binder.DeletePodBindings(pod)
return
}
// PostBind is called after a pod is successfully bound.
func (pl *VolumeBinding) PostBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
pl.Binder.DeletePodBindings(pod)
s, err := getStateData(cs)
if err != nil {
return
}
// we don't need to hold the lock as only one node may be unreserved
podVolumes, ok := s.podVolumesByNode[nodeName]
if !ok {
return
}
pl.Binder.RevertAssumedPodVolumes(podVolumes)
return
}
@@ -228,37 +249,13 @@ func New(plArgs runtime.Object, fh framework.FrameworkHandle) (framework.Plugin,
if err := validateArgs(args); err != nil {
return nil, err
}
podInformer := fh.SharedInformerFactory().Core().V1().Pods()
nodeInformer := fh.SharedInformerFactory().Core().V1().Nodes()
pvcInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumeClaims()
pvInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumes()
storageClassInformer := fh.SharedInformerFactory().Storage().V1().StorageClasses()
csiNodeInformer := fh.SharedInformerFactory().Storage().V1().CSINodes()
binder := scheduling.NewVolumeBinder(fh.ClientSet(), nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
// TODO(#90962) Because pod volume binding cache in SchedulerVolumeBinder is
// used only in current scheduling cycle, we can share it via
// framework.CycleState, then we don't need to register this event handler
// and Unreserve/PostBind extension points to clear pod volume binding
// cache.
fh.SharedInformerFactory().Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = obj.(*v1.Pod)
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj))
return
}
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object %T", obj))
return
}
binder.DeletePodBindings(pod)
},
})
binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
return &VolumeBinding{
Binder: binder,
}, nil

View File

@@ -134,7 +134,8 @@ func TestVolumeBinding(t *testing.T) {
boundClaims: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "pv-a", waitSC.Name),
},
claimsToBind: []*v1.PersistentVolumeClaim{},
claimsToBind: []*v1.PersistentVolumeClaim{},
podVolumesByNode: map[string]*scheduling.PodVolumes{},
},
},
{
@@ -158,6 +159,7 @@ func TestVolumeBinding(t *testing.T) {
claimsToBind: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "", waitSC.Name),
},
podVolumesByNode: map[string]*scheduling.PodVolumes{},
},
wantFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonBindConflict)),
},
@@ -199,6 +201,7 @@ func TestVolumeBinding(t *testing.T) {
claimsToBind: []*v1.PersistentVolumeClaim{
makePVC("pvc-b", "", waitSC.Name),
},
podVolumesByNode: map[string]*scheduling.PodVolumes{},
},
wantFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonNodeConflict), string(scheduling.ErrReasonBindConflict)),
},
@@ -221,7 +224,8 @@ func TestVolumeBinding(t *testing.T) {
boundClaims: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "pv-a", waitSC.Name),
},
claimsToBind: []*v1.PersistentVolumeClaim{},
claimsToBind: []*v1.PersistentVolumeClaim{},
podVolumesByNode: map[string]*scheduling.PodVolumes{},
},
wantFilterStatus: framework.NewStatus(framework.Error, `could not find v1.PersistentVolume "pv-a"`),
},
@@ -229,7 +233,8 @@ func TestVolumeBinding(t *testing.T) {
for _, item := range table {
t.Run(item.name, func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
opts := []runtime.Option{
@@ -256,15 +261,11 @@ func TestVolumeBinding(t *testing.T) {
if item.node != nil {
client.CoreV1().Nodes().Create(ctx, item.node, metav1.CreateOptions{})
}
if len(item.pvcs) > 0 {
for _, pvc := range item.pvcs {
client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
}
for _, pvc := range item.pvcs {
client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
}
if len(item.pvs) > 0 {
for _, pv := range item.pvs {
client.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{})
}
for _, pv := range item.pvs {
client.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{})
}
caches := informerFactory.WaitForCacheSync(ctx.Done())
for _, synced := range caches {