Implement support for updating volume limits
Create a new predicate to count CSI volumes
This commit is contained in:
154
pkg/scheduler/algorithm/predicates/csi_volume_predicate.go
Normal file
154
pkg/scheduler/algorithm/predicates/csi_volume_predicate.go
Normal file
@@ -0,0 +1,154 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package predicates
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
||||
// CSIMaxVolumeLimitChecker defines predicate needed for counting CSI volumes
|
||||
type CSIMaxVolumeLimitChecker struct {
|
||||
pvInfo PersistentVolumeInfo
|
||||
pvcInfo PersistentVolumeClaimInfo
|
||||
}
|
||||
|
||||
// NewCSIMaxVolumeLimitPredicate returns a predicate for counting CSI volumes
|
||||
func NewCSIMaxVolumeLimitPredicate(
|
||||
pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate {
|
||||
c := &CSIMaxVolumeLimitChecker{
|
||||
pvInfo: pvInfo,
|
||||
pvcInfo: pvcInfo,
|
||||
}
|
||||
return c.attachableLimitPredicate
|
||||
}
|
||||
|
||||
func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate(
|
||||
pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
|
||||
// if feature gate is disable we return
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
|
||||
return true, nil, nil
|
||||
}
|
||||
// If a pod doesn't have any volume attached to it, the predicate will always be true.
|
||||
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
|
||||
if len(pod.Spec.Volumes) == 0 {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
nodeVolumeLimits := nodeInfo.VolumeLimits()
|
||||
|
||||
// if node does not have volume limits this predicate should exit
|
||||
if len(nodeVolumeLimits) == 0 {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
// a map of unique volume name/csi volume handle and volume limit key
|
||||
newVolumes := make(map[string]string)
|
||||
if err := c.filterAttachableVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
if len(newVolumes) == 0 {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
// a map of unique volume name/csi volume handle and volume limit key
|
||||
attachedVolumes := make(map[string]string)
|
||||
for _, existingPod := range nodeInfo.Pods() {
|
||||
if err := c.filterAttachableVolumes(existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
newVolumeCount := map[string]int{}
|
||||
attachedVolumeCount := map[string]int{}
|
||||
|
||||
for volumeName, volumeLimitKey := range attachedVolumes {
|
||||
if _, ok := newVolumes[volumeName]; ok {
|
||||
delete(newVolumes, volumeName)
|
||||
}
|
||||
attachedVolumeCount[volumeLimitKey]++
|
||||
}
|
||||
|
||||
for _, volumeLimitKey := range newVolumes {
|
||||
newVolumeCount[volumeLimitKey]++
|
||||
}
|
||||
|
||||
for volumeLimitKey, count := range newVolumeCount {
|
||||
maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)]
|
||||
if ok {
|
||||
currentVolumeCount := attachedVolumeCount[volumeLimitKey]
|
||||
if currentVolumeCount+count > int(maxVolumeLimit) {
|
||||
return false, []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes(
|
||||
volumes []v1.Volume, namespace string, result map[string]string) error {
|
||||
|
||||
for _, vol := range volumes {
|
||||
// CSI volumes can only be used as persistent volumes
|
||||
if vol.PersistentVolumeClaim != nil {
|
||||
pvcName := vol.PersistentVolumeClaim.ClaimName
|
||||
|
||||
if pvcName == "" {
|
||||
return fmt.Errorf("PersistentVolumeClaim had no name")
|
||||
}
|
||||
|
||||
pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName)
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to look up PVC info for %s/%s", namespace, pvcName)
|
||||
continue
|
||||
}
|
||||
|
||||
pvName := pvc.Spec.VolumeName
|
||||
if pvName == "" {
|
||||
glog.Errorf("Persistent volume had no name for claim %s/%s", namespace, pvcName)
|
||||
continue
|
||||
}
|
||||
pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName)
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName)
|
||||
continue
|
||||
}
|
||||
|
||||
csiSource := pv.Spec.PersistentVolumeSource.CSI
|
||||
if csiSource == nil {
|
||||
glog.V(4).Infof("Not considering non-CSI volume %s/%s", namespace, pvcName)
|
||||
continue
|
||||
}
|
||||
driverName := csiSource.Driver
|
||||
volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
|
||||
result[csiSource.VolumeHandle] = volumeLimitKey
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -85,6 +85,8 @@ const (
|
||||
MaxGCEPDVolumeCountPred = "MaxGCEPDVolumeCount"
|
||||
// MaxAzureDiskVolumeCountPred defines the name of predicate MaxAzureDiskVolumeCount.
|
||||
MaxAzureDiskVolumeCountPred = "MaxAzureDiskVolumeCount"
|
||||
// MaxCSIVolumeCountPred defines the predicate that decides how many CSI volumes should be attached
|
||||
MaxCSIVolumeCountPred = "MaxCSIVolumeCountPred"
|
||||
// NoVolumeZoneConflictPred defines the name of predicate NoVolumeZoneConflict.
|
||||
NoVolumeZoneConflictPred = "NoVolumeZoneConflict"
|
||||
// CheckNodeMemoryPressurePred defines the name of predicate CheckNodeMemoryPressure.
|
||||
@@ -137,7 +139,7 @@ var (
|
||||
GeneralPred, HostNamePred, PodFitsHostPortsPred,
|
||||
MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
|
||||
PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
|
||||
CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred,
|
||||
CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
|
||||
MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
|
||||
CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
|
||||
)
|
||||
|
@@ -137,6 +137,12 @@ func defaultPredicates() sets.String {
|
||||
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.PVInfo, args.PVCInfo)
|
||||
},
|
||||
),
|
||||
factory.RegisterFitPredicateFactory(
|
||||
predicates.MaxCSIVolumeCountPred,
|
||||
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
|
||||
return predicates.NewCSIMaxVolumeLimitPredicate(args.PVInfo, args.PVCInfo)
|
||||
},
|
||||
),
|
||||
// Fit is determined by inter-pod affinity.
|
||||
factory.RegisterFitPredicateFactory(
|
||||
predicates.MatchInterPodAffinityPred,
|
||||
|
@@ -71,6 +71,7 @@ func TestDefaultPredicates(t *testing.T) {
|
||||
predicates.MaxEBSVolumeCountPred,
|
||||
predicates.MaxGCEPDVolumeCountPred,
|
||||
predicates.MaxAzureDiskVolumeCountPred,
|
||||
predicates.MaxCSIVolumeCountPred,
|
||||
predicates.MatchInterPodAffinityPred,
|
||||
predicates.NoDiskConflictPred,
|
||||
predicates.GeneralPred,
|
||||
|
@@ -23,16 +23,16 @@ import (
|
||||
"hash/fnv"
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
hashutil "k8s.io/kubernetes/pkg/util/hash"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// Cache is a thread safe map saves and reuses the output of predicate functions,
|
||||
@@ -136,8 +136,16 @@ func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName str
|
||||
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decisioc.
|
||||
for _, vol := range pod.Spec.Volumes {
|
||||
if vol.PersistentVolumeClaim != nil {
|
||||
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred)
|
||||
invalidPredicates.Insert(
|
||||
predicates.MaxEBSVolumeCountPred,
|
||||
predicates.MaxGCEPDVolumeCountPred,
|
||||
predicates.MaxAzureDiskVolumeCountPred)
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
|
||||
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
|
||||
}
|
||||
} else {
|
||||
// We do not consider CSI volumes here because CSI
|
||||
// volumes can not be used inline.
|
||||
if vol.AWSElasticBlockStore != nil {
|
||||
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
|
||||
}
|
||||
|
@@ -488,6 +488,10 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
|
||||
invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
|
||||
}
|
||||
|
||||
if pv.Spec.CSI != nil && utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
|
||||
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
|
||||
}
|
||||
|
||||
// If PV contains zone related label, it may impact cached NoVolumeZoneConflict
|
||||
for k := range pv.Labels {
|
||||
if isZoneRegionLabel(k) {
|
||||
@@ -564,6 +568,10 @@ func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim
|
||||
// The bound volume type may change
|
||||
invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...)
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
|
||||
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
|
||||
}
|
||||
|
||||
// The bound volume's label may change
|
||||
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
|
||||
|
||||
@@ -584,6 +592,10 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent
|
||||
}
|
||||
// The bound volume type may change
|
||||
invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...)
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
|
||||
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
|
||||
}
|
||||
}
|
||||
|
||||
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
|
||||
|
@@ -16,7 +16,7 @@ go_library(
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/util/strings:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/csi/labelmanager:go_default_library",
|
||||
"//pkg/volume/csi/nodeupdater:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
|
||||
@@ -75,7 +75,7 @@ filegroup(
|
||||
srcs = [
|
||||
":package-srcs",
|
||||
"//pkg/volume/csi/fake:all-srcs",
|
||||
"//pkg/volume/csi/labelmanager:all-srcs",
|
||||
"//pkg/volume/csi/nodeupdater:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
|
@@ -26,6 +26,7 @@ import (
|
||||
"time"
|
||||
|
||||
"context"
|
||||
|
||||
"github.com/golang/glog"
|
||||
api "k8s.io/api/core/v1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -33,7 +34,7 @@ import (
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csi/labelmanager"
|
||||
"k8s.io/kubernetes/pkg/volume/csi/nodeupdater"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -82,7 +83,7 @@ type csiDriversStore struct {
|
||||
// corresponding sockets
|
||||
var csiDrivers csiDriversStore
|
||||
|
||||
var lm labelmanager.Interface
|
||||
var nodeUpdater nodeupdater.Interface
|
||||
|
||||
// RegistrationCallback is called by kubelet's plugin watcher upon detection
|
||||
// of a new registration socket opened by CSI Driver registrar side car.
|
||||
@@ -106,13 +107,13 @@ func RegistrationCallback(pluginName string, endpoint string, versions []string,
|
||||
// TODO (verult) retry with exponential backoff, possibly added in csi client library.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||
defer cancel()
|
||||
driverNodeID, _, _, err := csi.NodeGetInfo(ctx)
|
||||
driverNodeID, maxVolumePerNode, _, err := csi.NodeGetInfo(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error during CSI NodeGetInfo() call: %v", err)
|
||||
}
|
||||
|
||||
// Calling nodeLabelManager to update annotations and labels for newly registered CSI driver
|
||||
err = lm.AddLabels(pluginName, driverNodeID)
|
||||
err = nodeUpdater.AddLabelsAndLimits(pluginName, driverNodeID, maxVolumePerNode)
|
||||
if err != nil {
|
||||
// Unregister the driver and return error
|
||||
csiDrivers.Lock()
|
||||
@@ -130,7 +131,7 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
|
||||
|
||||
// Initializing csiDrivers map and label management channels
|
||||
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
|
||||
lm = labelmanager.NewLabelManager(host.GetNodeName(), host.GetKubeClient())
|
||||
nodeUpdater = nodeupdater.NewNodeUpdater(host.GetNodeName(), host.GetKubeClient())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@@ -1,250 +0,0 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package labelmanager includes internal functions used to add/delete labels to
|
||||
// kubernetes nodes for corresponding CSI drivers
|
||||
package labelmanager // import "k8s.io/kubernetes/pkg/volume/csi/labelmanager"
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/util/retry"
|
||||
)
|
||||
|
||||
const (
|
||||
// Name of node annotation that contains JSON map of driver names to node
|
||||
// names
|
||||
annotationKey = "csi.volume.kubernetes.io/nodeid"
|
||||
)
|
||||
|
||||
// labelManagementStruct is struct of channels used for communication between the driver registration
|
||||
// code and the go routine responsible for managing the node's labels
|
||||
type labelManagerStruct struct {
|
||||
nodeName types.NodeName
|
||||
k8s kubernetes.Interface
|
||||
}
|
||||
|
||||
// Interface implements an interface for managing labels of a node
|
||||
type Interface interface {
|
||||
AddLabels(driverName string, driverNodeId string) error
|
||||
}
|
||||
|
||||
// NewLabelManager initializes labelManagerStruct and returns available interfaces
|
||||
func NewLabelManager(nodeName types.NodeName, kubeClient kubernetes.Interface) Interface {
|
||||
return labelManagerStruct{
|
||||
nodeName: nodeName,
|
||||
k8s: kubeClient,
|
||||
}
|
||||
}
|
||||
|
||||
// nodeLabelManager waits for labeling requests initiated by the driver's registration
|
||||
// process.
|
||||
func (lm labelManagerStruct) AddLabels(driverName string, driverNodeId string) error {
|
||||
err := verifyAndAddNodeId(string(lm.nodeName), lm.k8s.CoreV1().Nodes(), driverName, driverNodeId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update node %s's annotation with error: %+v", lm.nodeName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Clones the given map and returns a new map with the given key and value added.
|
||||
// Returns the given map, if annotationKey is empty.
|
||||
func cloneAndAddAnnotation(
|
||||
annotations map[string]string,
|
||||
annotationKey,
|
||||
annotationValue string) map[string]string {
|
||||
if annotationKey == "" {
|
||||
// Don't need to add an annotation.
|
||||
return annotations
|
||||
}
|
||||
// Clone.
|
||||
newAnnotations := map[string]string{}
|
||||
for key, value := range annotations {
|
||||
newAnnotations[key] = value
|
||||
}
|
||||
newAnnotations[annotationKey] = annotationValue
|
||||
return newAnnotations
|
||||
}
|
||||
|
||||
func verifyAndAddNodeId(
|
||||
k8sNodeName string,
|
||||
k8sNodesClient corev1.NodeInterface,
|
||||
csiDriverName string,
|
||||
csiDriverNodeId string) error {
|
||||
// Add or update annotation on Node object
|
||||
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
// Retrieve the latest version of Node before attempting update, so that
|
||||
// existing changes are not overwritten. RetryOnConflict uses
|
||||
// exponential backoff to avoid exhausting the apiserver.
|
||||
result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{})
|
||||
if getErr != nil {
|
||||
glog.Errorf("Failed to get latest version of Node: %v", getErr)
|
||||
return getErr // do not wrap error
|
||||
}
|
||||
|
||||
var previousAnnotationValue string
|
||||
if result.ObjectMeta.Annotations != nil {
|
||||
previousAnnotationValue =
|
||||
result.ObjectMeta.Annotations[annotationKey]
|
||||
glog.V(3).Infof(
|
||||
"previousAnnotationValue=%q", previousAnnotationValue)
|
||||
}
|
||||
|
||||
existingDriverMap := map[string]string{}
|
||||
if previousAnnotationValue != "" {
|
||||
// Parse previousAnnotationValue as JSON
|
||||
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
|
||||
return fmt.Errorf(
|
||||
"failed to parse node's %q annotation value (%q) err=%v",
|
||||
annotationKey,
|
||||
previousAnnotationValue,
|
||||
err)
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := existingDriverMap[csiDriverName]; ok {
|
||||
if val == csiDriverNodeId {
|
||||
// Value already exists in node annotation, nothing more to do
|
||||
glog.V(2).Infof(
|
||||
"The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v",
|
||||
csiDriverName,
|
||||
csiDriverNodeId,
|
||||
annotationKey,
|
||||
previousAnnotationValue)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Add/update annotation value
|
||||
existingDriverMap[csiDriverName] = csiDriverNodeId
|
||||
jsonObj, err := json.Marshal(existingDriverMap)
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v",
|
||||
csiDriverName,
|
||||
csiDriverNodeId,
|
||||
annotationKey,
|
||||
previousAnnotationValue)
|
||||
}
|
||||
|
||||
result.ObjectMeta.Annotations = cloneAndAddAnnotation(
|
||||
result.ObjectMeta.Annotations,
|
||||
annotationKey,
|
||||
string(jsonObj))
|
||||
_, updateErr := k8sNodesClient.Update(result)
|
||||
if updateErr == nil {
|
||||
glog.V(2).Infof(
|
||||
"Updated node %q successfully for CSI driver %q and CSI node name %q",
|
||||
k8sNodeName,
|
||||
csiDriverName,
|
||||
csiDriverNodeId)
|
||||
}
|
||||
return updateErr // do not wrap error
|
||||
})
|
||||
if retryErr != nil {
|
||||
return fmt.Errorf("node update failed: %v", retryErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fetches Kubernetes node API object corresponding to k8sNodeName.
|
||||
// If the csiDriverName is present in the node annotation, it is removed.
|
||||
func verifyAndDeleteNodeId(
|
||||
k8sNodeName string,
|
||||
k8sNodesClient corev1.NodeInterface,
|
||||
csiDriverName string) error {
|
||||
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
// Retrieve the latest version of Node before attempting update, so that
|
||||
// existing changes are not overwritten. RetryOnConflict uses
|
||||
// exponential backoff to avoid exhausting the apiserver.
|
||||
result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{})
|
||||
if getErr != nil {
|
||||
glog.Errorf("failed to get latest version of Node: %v", getErr)
|
||||
return getErr // do not wrap error
|
||||
}
|
||||
|
||||
var previousAnnotationValue string
|
||||
if result.ObjectMeta.Annotations != nil {
|
||||
previousAnnotationValue =
|
||||
result.ObjectMeta.Annotations[annotationKey]
|
||||
glog.V(3).Infof(
|
||||
"previousAnnotationValue=%q", previousAnnotationValue)
|
||||
}
|
||||
|
||||
existingDriverMap := map[string]string{}
|
||||
if previousAnnotationValue == "" {
|
||||
// Value already exists in node annotation, nothing more to do
|
||||
glog.V(2).Infof(
|
||||
"The key %q does not exist in node %q annotation, no need to cleanup.",
|
||||
csiDriverName,
|
||||
annotationKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Parse previousAnnotationValue as JSON
|
||||
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
|
||||
return fmt.Errorf(
|
||||
"failed to parse node's %q annotation value (%q) err=%v",
|
||||
annotationKey,
|
||||
previousAnnotationValue,
|
||||
err)
|
||||
}
|
||||
|
||||
if _, ok := existingDriverMap[csiDriverName]; !ok {
|
||||
// Value already exists in node annotation, nothing more to do
|
||||
glog.V(2).Infof(
|
||||
"The key %q does not eixst in node %q annotation, no need to cleanup: %v",
|
||||
csiDriverName,
|
||||
annotationKey,
|
||||
previousAnnotationValue)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add/update annotation value
|
||||
delete(existingDriverMap, csiDriverName)
|
||||
jsonObj, err := json.Marshal(existingDriverMap)
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"failed while trying to remove key %q from node %q annotation. Existing data: %v",
|
||||
csiDriverName,
|
||||
annotationKey,
|
||||
previousAnnotationValue)
|
||||
}
|
||||
|
||||
result.ObjectMeta.Annotations = cloneAndAddAnnotation(
|
||||
result.ObjectMeta.Annotations,
|
||||
annotationKey,
|
||||
string(jsonObj))
|
||||
_, updateErr := k8sNodesClient.Update(result)
|
||||
if updateErr == nil {
|
||||
fmt.Printf(
|
||||
"Updated node %q annotation to remove CSI driver %q.",
|
||||
k8sNodeName,
|
||||
csiDriverName)
|
||||
}
|
||||
return updateErr // do not wrap error
|
||||
})
|
||||
if retryErr != nil {
|
||||
return fmt.Errorf("node update failed: %v", retryErr)
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -2,10 +2,13 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["labelmanager.go"],
|
||||
importpath = "k8s.io/kubernetes/pkg/volume/csi/labelmanager",
|
||||
srcs = ["nodeupdater.go"],
|
||||
importpath = "k8s.io/kubernetes/pkg/volume/csi/nodeupdater",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
193
pkg/volume/csi/nodeupdater/nodeupdater.go
Normal file
193
pkg/volume/csi/nodeupdater/nodeupdater.go
Normal file
@@ -0,0 +1,193 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package nodeupdater includes internal functions used to add/delete labels to
|
||||
// kubernetes nodes for corresponding CSI drivers
|
||||
package nodeupdater // import "k8s.io/kubernetes/pkg/volume/csi/nodeupdater"
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
||||
const (
|
||||
// Name of node annotation that contains JSON map of driver names to node
|
||||
// names
|
||||
annotationKey = "csi.volume.kubernetes.io/nodeid"
|
||||
)
|
||||
|
||||
// labelManagementStruct is struct of channels used for communication between the driver registration
|
||||
// code and the go routine responsible for managing the node's labels
|
||||
type nodeUpdateStruct struct {
|
||||
nodeName types.NodeName
|
||||
k8s kubernetes.Interface
|
||||
}
|
||||
|
||||
// Interface implements an interface for managing labels of a node
|
||||
type Interface interface {
|
||||
AddLabelsAndLimits(driverName string, driverNodeId string, maxLimit int64) error
|
||||
}
|
||||
|
||||
// NewNodeupdater initializes nodeUpdateStruct and returns available interfaces
|
||||
func NewNodeUpdater(nodeName types.NodeName, kubeClient kubernetes.Interface) Interface {
|
||||
return nodeUpdateStruct{
|
||||
nodeName: nodeName,
|
||||
k8s: kubeClient,
|
||||
}
|
||||
}
|
||||
|
||||
// AddLabelsAndLimits nodeUpdater waits for labeling requests initiated by the driver's registration
|
||||
// process and updates labels and attach limits
|
||||
func (nodeUpdater nodeUpdateStruct) AddLabelsAndLimits(driverName string, driverNodeId string, maxLimit int64) error {
|
||||
err := addLabelsAndLimits(string(nodeUpdater.nodeName), nodeUpdater.k8s.CoreV1().Nodes(), driverName, driverNodeId, maxLimit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func addMaxAttachLimitToNode(node *v1.Node, driverName string, maxLimit int64) *v1.Node {
|
||||
if maxLimit <= 0 {
|
||||
glog.V(4).Infof("skipping adding attach limit for %s", driverName)
|
||||
return node
|
||||
}
|
||||
|
||||
if node.Status.Capacity == nil {
|
||||
node.Status.Capacity = v1.ResourceList{}
|
||||
}
|
||||
if node.Status.Allocatable == nil {
|
||||
node.Status.Allocatable = v1.ResourceList{}
|
||||
}
|
||||
limitKeyName := util.GetCSIAttachLimitKey(driverName)
|
||||
node.Status.Capacity[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
|
||||
node.Status.Allocatable[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
|
||||
return node
|
||||
}
|
||||
|
||||
// Clones the given map and returns a new map with the given key and value added.
|
||||
// Returns the given map, if annotationKey is empty.
|
||||
func cloneAndAddAnnotation(
|
||||
annotations map[string]string,
|
||||
annotationKey,
|
||||
annotationValue string) map[string]string {
|
||||
if annotationKey == "" {
|
||||
// Don't need to add an annotation.
|
||||
return annotations
|
||||
}
|
||||
// Clone.
|
||||
newAnnotations := map[string]string{}
|
||||
for key, value := range annotations {
|
||||
newAnnotations[key] = value
|
||||
}
|
||||
newAnnotations[annotationKey] = annotationValue
|
||||
return newAnnotations
|
||||
}
|
||||
|
||||
func addNodeIdToNode(node *v1.Node, driverName string, csiDriverNodeId string) (*v1.Node, error) {
|
||||
var previousAnnotationValue string
|
||||
if node.ObjectMeta.Annotations != nil {
|
||||
previousAnnotationValue =
|
||||
node.ObjectMeta.Annotations[annotationKey]
|
||||
glog.V(3).Infof(
|
||||
"previousAnnotationValue=%q", previousAnnotationValue)
|
||||
}
|
||||
|
||||
existingDriverMap := map[string]string{}
|
||||
if previousAnnotationValue != "" {
|
||||
// Parse previousAnnotationValue as JSON
|
||||
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
|
||||
return node, fmt.Errorf(
|
||||
"failed to parse node's %q annotation value (%q) err=%v",
|
||||
annotationKey,
|
||||
previousAnnotationValue,
|
||||
err)
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := existingDriverMap[driverName]; ok {
|
||||
if val == csiDriverNodeId {
|
||||
// Value already exists in node annotation, nothing more to do
|
||||
glog.V(2).Infof(
|
||||
"The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v",
|
||||
driverName,
|
||||
csiDriverNodeId,
|
||||
annotationKey,
|
||||
previousAnnotationValue)
|
||||
return node, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Add/update annotation value
|
||||
existingDriverMap[driverName] = csiDriverNodeId
|
||||
jsonObj, err := json.Marshal(existingDriverMap)
|
||||
if err != nil {
|
||||
return node, fmt.Errorf(
|
||||
"failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v",
|
||||
driverName,
|
||||
csiDriverNodeId,
|
||||
annotationKey,
|
||||
previousAnnotationValue)
|
||||
}
|
||||
|
||||
node.ObjectMeta.Annotations = cloneAndAddAnnotation(
|
||||
node.ObjectMeta.Annotations,
|
||||
annotationKey,
|
||||
string(jsonObj))
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func addLabelsAndLimits(nodeName string, nodeClient corev1.NodeInterface, driverName string, csiDriverNodeId string, maxLimit int64) error {
|
||||
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
// Retrieve the latest version of Node before attempting update, so that
|
||||
// existing changes are not overwritten. RetryOnConflict uses
|
||||
// exponential backoff to avoid exhausting the apiserver.
|
||||
node, getErr := nodeClient.Get(nodeName, metav1.GetOptions{})
|
||||
if getErr != nil {
|
||||
glog.Errorf("Failed to get latest version of Node: %v", getErr)
|
||||
return getErr // do not wrap error
|
||||
}
|
||||
var labelErr error
|
||||
node, labelErr = addNodeIdToNode(node, driverName, csiDriverNodeId)
|
||||
if labelErr != nil {
|
||||
return labelErr
|
||||
}
|
||||
node = addMaxAttachLimitToNode(node, driverName, maxLimit)
|
||||
|
||||
_, updateErr := nodeClient.Update(node)
|
||||
if updateErr == nil {
|
||||
glog.V(2).Infof(
|
||||
"Updated node %q successfully for CSI driver %q and CSI node name %q",
|
||||
nodeName,
|
||||
driverName,
|
||||
csiDriverNodeId)
|
||||
}
|
||||
return updateErr // do not wrap error
|
||||
})
|
||||
if retryErr != nil {
|
||||
return fmt.Errorf("error setting attach limit and labels for %s with : %v", driverName, retryErr)
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -49,6 +49,7 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"atomic_writer_test.go",
|
||||
"attach_limit_test.go",
|
||||
"device_util_linux_test.go",
|
||||
"nested_volumes_test.go",
|
||||
"resize_util_test.go",
|
||||
@@ -57,6 +58,7 @@ go_test(
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/apis/core/install:go_default_library",
|
||||
"//pkg/apis/core/v1/helper:go_default_library",
|
||||
"//pkg/kubelet/apis:go_default_library",
|
||||
"//pkg/util/mount:go_default_library",
|
||||
"//pkg/util/slice:go_default_library",
|
||||
|
@@ -16,6 +16,11 @@ limitations under the License.
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
)
|
||||
|
||||
// This file is a common place holder for volume limit utility constants
|
||||
// shared between volume package and scheduler
|
||||
|
||||
@@ -26,4 +31,25 @@ const (
|
||||
AzureVolumeLimitKey = "attachable-volumes-azure-disk"
|
||||
// GCEVolumeLimitKey stores resource name that will store volume limits for GCE node
|
||||
GCEVolumeLimitKey = "attachable-volumes-gce-pd"
|
||||
|
||||
// CSIAttachLimitPrefix defines prefix used for CSI volumes
|
||||
CSIAttachLimitPrefix = "attachable-volumes-csi-"
|
||||
|
||||
// ResourceNameLengthLimit stores maximum allowed Length for a ResourceName
|
||||
ResourceNameLengthLimit = 63
|
||||
)
|
||||
|
||||
// GetCSIAttachLimitKey returns limit key used for CSI volumes
|
||||
func GetCSIAttachLimitKey(driverName string) string {
|
||||
csiPrefixLength := len(CSIAttachLimitPrefix)
|
||||
totalkeyLength := csiPrefixLength + len(driverName)
|
||||
if totalkeyLength >= ResourceNameLengthLimit {
|
||||
charsFromDriverName := driverName[:23]
|
||||
hash := sha1.New()
|
||||
hash.Write([]byte(driverName))
|
||||
hashed := hex.EncodeToString(hash.Sum(nil))
|
||||
hashed = hashed[:16]
|
||||
return CSIAttachLimitPrefix + charsFromDriverName + hashed
|
||||
}
|
||||
return CSIAttachLimitPrefix + driverName
|
||||
}
|
||||
|
40
pkg/volume/util/attach_limit_test.go
Normal file
40
pkg/volume/util/attach_limit_test.go
Normal file
@@ -0,0 +1,40 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
)
|
||||
|
||||
func TestGetCSIAttachLimitKey(t *testing.T) {
|
||||
// When driverName is less than 39 characters
|
||||
csiLimitKey := GetCSIAttachLimitKey("com.amazon.ebs")
|
||||
if csiLimitKey != "attachable-volumes-csi-com.amazon.ebs" {
|
||||
t.Errorf("Expected com.amazon.ebs got %s", csiLimitKey)
|
||||
}
|
||||
|
||||
// When driver is longer than 39 chars
|
||||
csiLimitKeyLonger := GetCSIAttachLimitKey("com.amazon.kubernetes.eks.ec2.ebs/csi-driver")
|
||||
fmt.Println(csiLimitKeyLonger)
|
||||
if !v1helper.IsAttachableVolumeResourceName(v1.ResourceName(csiLimitKeyLonger)) {
|
||||
t.Errorf("Expected %s to have attachable prefix", csiLimitKeyLonger)
|
||||
}
|
||||
}
|
@@ -140,6 +140,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
|
||||
"GeneralPredicates",
|
||||
"MatchInterPodAffinity",
|
||||
"MaxAzureDiskVolumeCount",
|
||||
"MaxCSIVolumeCountPred",
|
||||
"MaxEBSVolumeCount",
|
||||
"MaxGCEPDVolumeCount",
|
||||
"NoDiskConflict",
|
||||
|
Reference in New Issue
Block a user