From 4b17a48def1c995f6033643d4045f7ecb53b09ba Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Mon, 13 Aug 2018 19:38:30 -0400 Subject: [PATCH] Implement support for updating volume limits Create a new predicate to count CSI volumes --- .../predicates/csi_volume_predicate.go | 154 +++++++++++ .../algorithm/predicates/predicates.go | 4 +- .../algorithmprovider/defaults/defaults.go | 6 + .../defaults/defaults_test.go | 1 + pkg/scheduler/core/equivalence/eqivalence.go | 18 +- pkg/scheduler/factory/factory.go | 12 + pkg/volume/csi/BUILD | 4 +- pkg/volume/csi/csi_plugin.go | 11 +- pkg/volume/csi/labelmanager/labelmanager.go | 250 ------------------ .../csi/{labelmanager => nodeupdater}/BUILD | 7 +- pkg/volume/csi/nodeupdater/nodeupdater.go | 193 ++++++++++++++ pkg/volume/util/BUILD | 2 + pkg/volume/util/attach_limit.go | 26 ++ pkg/volume/util/attach_limit_test.go | 40 +++ test/integration/scheduler/scheduler_test.go | 1 + 15 files changed, 464 insertions(+), 265 deletions(-) create mode 100644 pkg/scheduler/algorithm/predicates/csi_volume_predicate.go delete mode 100644 pkg/volume/csi/labelmanager/labelmanager.go rename pkg/volume/csi/{labelmanager => nodeupdater}/BUILD (75%) create mode 100644 pkg/volume/csi/nodeupdater/nodeupdater.go create mode 100644 pkg/volume/util/attach_limit_test.go diff --git a/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go b/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go new file mode 100644 index 00000000000..db6fdd39170 --- /dev/null +++ b/pkg/scheduler/algorithm/predicates/csi_volume_predicate.go @@ -0,0 +1,154 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicates + +import ( + "fmt" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/scheduler/algorithm" + schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" + volumeutil "k8s.io/kubernetes/pkg/volume/util" +) + +// CSIMaxVolumeLimitChecker defines predicate needed for counting CSI volumes +type CSIMaxVolumeLimitChecker struct { + pvInfo PersistentVolumeInfo + pvcInfo PersistentVolumeClaimInfo +} + +// NewCSIMaxVolumeLimitPredicate returns a predicate for counting CSI volumes +func NewCSIMaxVolumeLimitPredicate( + pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate { + c := &CSIMaxVolumeLimitChecker{ + pvInfo: pvInfo, + pvcInfo: pvcInfo, + } + return c.attachableLimitPredicate +} + +func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate( + pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + + // if feature gate is disable we return + if !utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + return true, nil, nil + } + // If a pod doesn't have any volume attached to it, the predicate will always be true. + // Thus we make a fast path for it, to avoid unnecessary computations in this case. + if len(pod.Spec.Volumes) == 0 { + return true, nil, nil + } + + nodeVolumeLimits := nodeInfo.VolumeLimits() + + // if node does not have volume limits this predicate should exit + if len(nodeVolumeLimits) == 0 { + return true, nil, nil + } + + // a map of unique volume name/csi volume handle and volume limit key + newVolumes := make(map[string]string) + if err := c.filterAttachableVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { + return false, nil, err + } + + if len(newVolumes) == 0 { + return true, nil, nil + } + + // a map of unique volume name/csi volume handle and volume limit key + attachedVolumes := make(map[string]string) + for _, existingPod := range nodeInfo.Pods() { + if err := c.filterAttachableVolumes(existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil { + return false, nil, err + } + } + + newVolumeCount := map[string]int{} + attachedVolumeCount := map[string]int{} + + for volumeName, volumeLimitKey := range attachedVolumes { + if _, ok := newVolumes[volumeName]; ok { + delete(newVolumes, volumeName) + } + attachedVolumeCount[volumeLimitKey]++ + } + + for _, volumeLimitKey := range newVolumes { + newVolumeCount[volumeLimitKey]++ + } + + for volumeLimitKey, count := range newVolumeCount { + maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)] + if ok { + currentVolumeCount := attachedVolumeCount[volumeLimitKey] + if currentVolumeCount+count > int(maxVolumeLimit) { + return false, []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil + } + } + } + + return true, nil, nil +} + +func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes( + volumes []v1.Volume, namespace string, result map[string]string) error { + + for _, vol := range volumes { + // CSI volumes can only be used as persistent volumes + if vol.PersistentVolumeClaim != nil { + pvcName := vol.PersistentVolumeClaim.ClaimName + + if pvcName == "" { + return fmt.Errorf("PersistentVolumeClaim had no name") + } + + pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName) + + if err != nil { + glog.Errorf("Unable to look up PVC info for %s/%s", namespace, pvcName) + continue + } + + pvName := pvc.Spec.VolumeName + if pvName == "" { + glog.Errorf("Persistent volume had no name for claim %s/%s", namespace, pvcName) + continue + } + pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName) + + if err != nil { + glog.Errorf("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName) + continue + } + + csiSource := pv.Spec.PersistentVolumeSource.CSI + if csiSource == nil { + glog.V(4).Infof("Not considering non-CSI volume %s/%s", namespace, pvcName) + continue + } + driverName := csiSource.Driver + volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName) + result[csiSource.VolumeHandle] = volumeLimitKey + } + } + return nil +} diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index c919282c837..953fcef25e4 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -85,6 +85,8 @@ const ( MaxGCEPDVolumeCountPred = "MaxGCEPDVolumeCount" // MaxAzureDiskVolumeCountPred defines the name of predicate MaxAzureDiskVolumeCount. MaxAzureDiskVolumeCountPred = "MaxAzureDiskVolumeCount" + // MaxCSIVolumeCountPred defines the predicate that decides how many CSI volumes should be attached + MaxCSIVolumeCountPred = "MaxCSIVolumeCountPred" // NoVolumeZoneConflictPred defines the name of predicate NoVolumeZoneConflict. NoVolumeZoneConflictPred = "NoVolumeZoneConflict" // CheckNodeMemoryPressurePred defines the name of predicate CheckNodeMemoryPressure. @@ -137,7 +139,7 @@ var ( GeneralPred, HostNamePred, PodFitsHostPortsPred, MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred, PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred, - CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, + CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred, MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred, CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred} ) diff --git a/pkg/scheduler/algorithmprovider/defaults/defaults.go b/pkg/scheduler/algorithmprovider/defaults/defaults.go index 94d73b7e7fa..625d8b6742b 100644 --- a/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -137,6 +137,12 @@ func defaultPredicates() sets.String { return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.PVInfo, args.PVCInfo) }, ), + factory.RegisterFitPredicateFactory( + predicates.MaxCSIVolumeCountPred, + func(args factory.PluginFactoryArgs) algorithm.FitPredicate { + return predicates.NewCSIMaxVolumeLimitPredicate(args.PVInfo, args.PVCInfo) + }, + ), // Fit is determined by inter-pod affinity. factory.RegisterFitPredicateFactory( predicates.MatchInterPodAffinityPred, diff --git a/pkg/scheduler/algorithmprovider/defaults/defaults_test.go b/pkg/scheduler/algorithmprovider/defaults/defaults_test.go index 8dbc2f1536f..eab84151a51 100644 --- a/pkg/scheduler/algorithmprovider/defaults/defaults_test.go +++ b/pkg/scheduler/algorithmprovider/defaults/defaults_test.go @@ -71,6 +71,7 @@ func TestDefaultPredicates(t *testing.T) { predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred, + predicates.MaxCSIVolumeCountPred, predicates.MatchInterPodAffinityPred, predicates.NoDiskConflictPred, predicates.GeneralPred, diff --git a/pkg/scheduler/core/equivalence/eqivalence.go b/pkg/scheduler/core/equivalence/eqivalence.go index 115acc4d181..8101809ad21 100644 --- a/pkg/scheduler/core/equivalence/eqivalence.go +++ b/pkg/scheduler/core/equivalence/eqivalence.go @@ -23,16 +23,16 @@ import ( "hash/fnv" "sync" - "k8s.io/kubernetes/pkg/scheduler/metrics" - + "github.com/golang/glog" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" + "k8s.io/kubernetes/pkg/scheduler/metrics" hashutil "k8s.io/kubernetes/pkg/util/hash" - - "github.com/golang/glog" ) // Cache is a thread safe map saves and reuses the output of predicate functions, @@ -136,8 +136,16 @@ func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName str // MaxPDVolumeCountPredicate: we check the volumes of pod to make decisioc. for _, vol := range pod.Spec.Volumes { if vol.PersistentVolumeClaim != nil { - invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred) + invalidPredicates.Insert( + predicates.MaxEBSVolumeCountPred, + predicates.MaxGCEPDVolumeCountPred, + predicates.MaxAzureDiskVolumeCountPred) + if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) + } } else { + // We do not consider CSI volumes here because CSI + // volumes can not be used inline. if vol.AWSElasticBlockStore != nil { invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred) } diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index a61ae267af0..bf00c74193d 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -488,6 +488,10 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred) } + if pv.Spec.CSI != nil && utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) + } + // If PV contains zone related label, it may impact cached NoVolumeZoneConflict for k := range pv.Labels { if isZoneRegionLabel(k) { @@ -564,6 +568,10 @@ func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim // The bound volume type may change invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...) + if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) + } + // The bound volume's label may change invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) @@ -584,6 +592,10 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent } // The bound volume type may change invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...) + + if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) + } } c.equivalencePodCache.InvalidatePredicates(invalidPredicates) diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index e5f55a15d8a..1e9253d4f95 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -16,7 +16,7 @@ go_library( "//pkg/features:go_default_library", "//pkg/util/strings:go_default_library", "//pkg/volume:go_default_library", - "//pkg/volume/csi/labelmanager:go_default_library", + "//pkg/volume/csi/nodeupdater:go_default_library", "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", @@ -75,7 +75,7 @@ filegroup( srcs = [ ":package-srcs", "//pkg/volume/csi/fake:all-srcs", - "//pkg/volume/csi/labelmanager:all-srcs", + "//pkg/volume/csi/nodeupdater:all-srcs", ], tags = ["automanaged"], visibility = ["//visibility:public"], diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index cc9e3c97901..4328c6e3e7c 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -26,6 +26,7 @@ import ( "time" "context" + "github.com/golang/glog" api "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,7 +34,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" - "k8s.io/kubernetes/pkg/volume/csi/labelmanager" + "k8s.io/kubernetes/pkg/volume/csi/nodeupdater" ) const ( @@ -82,7 +83,7 @@ type csiDriversStore struct { // corresponding sockets var csiDrivers csiDriversStore -var lm labelmanager.Interface +var nodeUpdater nodeupdater.Interface // RegistrationCallback is called by kubelet's plugin watcher upon detection // of a new registration socket opened by CSI Driver registrar side car. @@ -106,13 +107,13 @@ func RegistrationCallback(pluginName string, endpoint string, versions []string, // TODO (verult) retry with exponential backoff, possibly added in csi client library. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() - driverNodeID, _, _, err := csi.NodeGetInfo(ctx) + driverNodeID, maxVolumePerNode, _, err := csi.NodeGetInfo(ctx) if err != nil { return nil, fmt.Errorf("error during CSI NodeGetInfo() call: %v", err) } // Calling nodeLabelManager to update annotations and labels for newly registered CSI driver - err = lm.AddLabels(pluginName, driverNodeID) + err = nodeUpdater.AddLabelsAndLimits(pluginName, driverNodeID, maxVolumePerNode) if err != nil { // Unregister the driver and return error csiDrivers.Lock() @@ -130,7 +131,7 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { // Initializing csiDrivers map and label management channels csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}} - lm = labelmanager.NewLabelManager(host.GetNodeName(), host.GetKubeClient()) + nodeUpdater = nodeupdater.NewNodeUpdater(host.GetNodeName(), host.GetKubeClient()) return nil } diff --git a/pkg/volume/csi/labelmanager/labelmanager.go b/pkg/volume/csi/labelmanager/labelmanager.go deleted file mode 100644 index 79fd5311453..00000000000 --- a/pkg/volume/csi/labelmanager/labelmanager.go +++ /dev/null @@ -1,250 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package labelmanager includes internal functions used to add/delete labels to -// kubernetes nodes for corresponding CSI drivers -package labelmanager // import "k8s.io/kubernetes/pkg/volume/csi/labelmanager" - -import ( - "encoding/json" - "fmt" - - "github.com/golang/glog" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/util/retry" -) - -const ( - // Name of node annotation that contains JSON map of driver names to node - // names - annotationKey = "csi.volume.kubernetes.io/nodeid" -) - -// labelManagementStruct is struct of channels used for communication between the driver registration -// code and the go routine responsible for managing the node's labels -type labelManagerStruct struct { - nodeName types.NodeName - k8s kubernetes.Interface -} - -// Interface implements an interface for managing labels of a node -type Interface interface { - AddLabels(driverName string, driverNodeId string) error -} - -// NewLabelManager initializes labelManagerStruct and returns available interfaces -func NewLabelManager(nodeName types.NodeName, kubeClient kubernetes.Interface) Interface { - return labelManagerStruct{ - nodeName: nodeName, - k8s: kubeClient, - } -} - -// nodeLabelManager waits for labeling requests initiated by the driver's registration -// process. -func (lm labelManagerStruct) AddLabels(driverName string, driverNodeId string) error { - err := verifyAndAddNodeId(string(lm.nodeName), lm.k8s.CoreV1().Nodes(), driverName, driverNodeId) - if err != nil { - return fmt.Errorf("failed to update node %s's annotation with error: %+v", lm.nodeName, err) - } - return nil -} - -// Clones the given map and returns a new map with the given key and value added. -// Returns the given map, if annotationKey is empty. -func cloneAndAddAnnotation( - annotations map[string]string, - annotationKey, - annotationValue string) map[string]string { - if annotationKey == "" { - // Don't need to add an annotation. - return annotations - } - // Clone. - newAnnotations := map[string]string{} - for key, value := range annotations { - newAnnotations[key] = value - } - newAnnotations[annotationKey] = annotationValue - return newAnnotations -} - -func verifyAndAddNodeId( - k8sNodeName string, - k8sNodesClient corev1.NodeInterface, - csiDriverName string, - csiDriverNodeId string) error { - // Add or update annotation on Node object - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - // Retrieve the latest version of Node before attempting update, so that - // existing changes are not overwritten. RetryOnConflict uses - // exponential backoff to avoid exhausting the apiserver. - result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{}) - if getErr != nil { - glog.Errorf("Failed to get latest version of Node: %v", getErr) - return getErr // do not wrap error - } - - var previousAnnotationValue string - if result.ObjectMeta.Annotations != nil { - previousAnnotationValue = - result.ObjectMeta.Annotations[annotationKey] - glog.V(3).Infof( - "previousAnnotationValue=%q", previousAnnotationValue) - } - - existingDriverMap := map[string]string{} - if previousAnnotationValue != "" { - // Parse previousAnnotationValue as JSON - if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil { - return fmt.Errorf( - "failed to parse node's %q annotation value (%q) err=%v", - annotationKey, - previousAnnotationValue, - err) - } - } - - if val, ok := existingDriverMap[csiDriverName]; ok { - if val == csiDriverNodeId { - // Value already exists in node annotation, nothing more to do - glog.V(2).Infof( - "The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v", - csiDriverName, - csiDriverNodeId, - annotationKey, - previousAnnotationValue) - return nil - } - } - - // Add/update annotation value - existingDriverMap[csiDriverName] = csiDriverNodeId - jsonObj, err := json.Marshal(existingDriverMap) - if err != nil { - return fmt.Errorf( - "failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v", - csiDriverName, - csiDriverNodeId, - annotationKey, - previousAnnotationValue) - } - - result.ObjectMeta.Annotations = cloneAndAddAnnotation( - result.ObjectMeta.Annotations, - annotationKey, - string(jsonObj)) - _, updateErr := k8sNodesClient.Update(result) - if updateErr == nil { - glog.V(2).Infof( - "Updated node %q successfully for CSI driver %q and CSI node name %q", - k8sNodeName, - csiDriverName, - csiDriverNodeId) - } - return updateErr // do not wrap error - }) - if retryErr != nil { - return fmt.Errorf("node update failed: %v", retryErr) - } - return nil -} - -// Fetches Kubernetes node API object corresponding to k8sNodeName. -// If the csiDriverName is present in the node annotation, it is removed. -func verifyAndDeleteNodeId( - k8sNodeName string, - k8sNodesClient corev1.NodeInterface, - csiDriverName string) error { - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - // Retrieve the latest version of Node before attempting update, so that - // existing changes are not overwritten. RetryOnConflict uses - // exponential backoff to avoid exhausting the apiserver. - result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{}) - if getErr != nil { - glog.Errorf("failed to get latest version of Node: %v", getErr) - return getErr // do not wrap error - } - - var previousAnnotationValue string - if result.ObjectMeta.Annotations != nil { - previousAnnotationValue = - result.ObjectMeta.Annotations[annotationKey] - glog.V(3).Infof( - "previousAnnotationValue=%q", previousAnnotationValue) - } - - existingDriverMap := map[string]string{} - if previousAnnotationValue == "" { - // Value already exists in node annotation, nothing more to do - glog.V(2).Infof( - "The key %q does not exist in node %q annotation, no need to cleanup.", - csiDriverName, - annotationKey) - return nil - } - - // Parse previousAnnotationValue as JSON - if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil { - return fmt.Errorf( - "failed to parse node's %q annotation value (%q) err=%v", - annotationKey, - previousAnnotationValue, - err) - } - - if _, ok := existingDriverMap[csiDriverName]; !ok { - // Value already exists in node annotation, nothing more to do - glog.V(2).Infof( - "The key %q does not eixst in node %q annotation, no need to cleanup: %v", - csiDriverName, - annotationKey, - previousAnnotationValue) - return nil - } - - // Add/update annotation value - delete(existingDriverMap, csiDriverName) - jsonObj, err := json.Marshal(existingDriverMap) - if err != nil { - return fmt.Errorf( - "failed while trying to remove key %q from node %q annotation. Existing data: %v", - csiDriverName, - annotationKey, - previousAnnotationValue) - } - - result.ObjectMeta.Annotations = cloneAndAddAnnotation( - result.ObjectMeta.Annotations, - annotationKey, - string(jsonObj)) - _, updateErr := k8sNodesClient.Update(result) - if updateErr == nil { - fmt.Printf( - "Updated node %q annotation to remove CSI driver %q.", - k8sNodeName, - csiDriverName) - } - return updateErr // do not wrap error - }) - if retryErr != nil { - return fmt.Errorf("node update failed: %v", retryErr) - } - return nil -} diff --git a/pkg/volume/csi/labelmanager/BUILD b/pkg/volume/csi/nodeupdater/BUILD similarity index 75% rename from pkg/volume/csi/labelmanager/BUILD rename to pkg/volume/csi/nodeupdater/BUILD index 0f952539ea4..2b8e18a753b 100644 --- a/pkg/volume/csi/labelmanager/BUILD +++ b/pkg/volume/csi/nodeupdater/BUILD @@ -2,10 +2,13 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = ["labelmanager.go"], - importpath = "k8s.io/kubernetes/pkg/volume/csi/labelmanager", + srcs = ["nodeupdater.go"], + importpath = "k8s.io/kubernetes/pkg/volume/csi/nodeupdater", visibility = ["//visibility:public"], deps = [ + "//pkg/volume/util:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", diff --git a/pkg/volume/csi/nodeupdater/nodeupdater.go b/pkg/volume/csi/nodeupdater/nodeupdater.go new file mode 100644 index 00000000000..5571fe4c725 --- /dev/null +++ b/pkg/volume/csi/nodeupdater/nodeupdater.go @@ -0,0 +1,193 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package nodeupdater includes internal functions used to add/delete labels to +// kubernetes nodes for corresponding CSI drivers +package nodeupdater // import "k8s.io/kubernetes/pkg/volume/csi/nodeupdater" + +import ( + "encoding/json" + "fmt" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/util/retry" + "k8s.io/kubernetes/pkg/volume/util" +) + +const ( + // Name of node annotation that contains JSON map of driver names to node + // names + annotationKey = "csi.volume.kubernetes.io/nodeid" +) + +// labelManagementStruct is struct of channels used for communication between the driver registration +// code and the go routine responsible for managing the node's labels +type nodeUpdateStruct struct { + nodeName types.NodeName + k8s kubernetes.Interface +} + +// Interface implements an interface for managing labels of a node +type Interface interface { + AddLabelsAndLimits(driverName string, driverNodeId string, maxLimit int64) error +} + +// NewNodeupdater initializes nodeUpdateStruct and returns available interfaces +func NewNodeUpdater(nodeName types.NodeName, kubeClient kubernetes.Interface) Interface { + return nodeUpdateStruct{ + nodeName: nodeName, + k8s: kubeClient, + } +} + +// AddLabelsAndLimits nodeUpdater waits for labeling requests initiated by the driver's registration +// process and updates labels and attach limits +func (nodeUpdater nodeUpdateStruct) AddLabelsAndLimits(driverName string, driverNodeId string, maxLimit int64) error { + err := addLabelsAndLimits(string(nodeUpdater.nodeName), nodeUpdater.k8s.CoreV1().Nodes(), driverName, driverNodeId, maxLimit) + if err != nil { + return err + } + return nil +} + +func addMaxAttachLimitToNode(node *v1.Node, driverName string, maxLimit int64) *v1.Node { + if maxLimit <= 0 { + glog.V(4).Infof("skipping adding attach limit for %s", driverName) + return node + } + + if node.Status.Capacity == nil { + node.Status.Capacity = v1.ResourceList{} + } + if node.Status.Allocatable == nil { + node.Status.Allocatable = v1.ResourceList{} + } + limitKeyName := util.GetCSIAttachLimitKey(driverName) + node.Status.Capacity[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI) + node.Status.Allocatable[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI) + return node +} + +// Clones the given map and returns a new map with the given key and value added. +// Returns the given map, if annotationKey is empty. +func cloneAndAddAnnotation( + annotations map[string]string, + annotationKey, + annotationValue string) map[string]string { + if annotationKey == "" { + // Don't need to add an annotation. + return annotations + } + // Clone. + newAnnotations := map[string]string{} + for key, value := range annotations { + newAnnotations[key] = value + } + newAnnotations[annotationKey] = annotationValue + return newAnnotations +} + +func addNodeIdToNode(node *v1.Node, driverName string, csiDriverNodeId string) (*v1.Node, error) { + var previousAnnotationValue string + if node.ObjectMeta.Annotations != nil { + previousAnnotationValue = + node.ObjectMeta.Annotations[annotationKey] + glog.V(3).Infof( + "previousAnnotationValue=%q", previousAnnotationValue) + } + + existingDriverMap := map[string]string{} + if previousAnnotationValue != "" { + // Parse previousAnnotationValue as JSON + if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil { + return node, fmt.Errorf( + "failed to parse node's %q annotation value (%q) err=%v", + annotationKey, + previousAnnotationValue, + err) + } + } + + if val, ok := existingDriverMap[driverName]; ok { + if val == csiDriverNodeId { + // Value already exists in node annotation, nothing more to do + glog.V(2).Infof( + "The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v", + driverName, + csiDriverNodeId, + annotationKey, + previousAnnotationValue) + return node, nil + } + } + + // Add/update annotation value + existingDriverMap[driverName] = csiDriverNodeId + jsonObj, err := json.Marshal(existingDriverMap) + if err != nil { + return node, fmt.Errorf( + "failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v", + driverName, + csiDriverNodeId, + annotationKey, + previousAnnotationValue) + } + + node.ObjectMeta.Annotations = cloneAndAddAnnotation( + node.ObjectMeta.Annotations, + annotationKey, + string(jsonObj)) + return node, nil +} + +func addLabelsAndLimits(nodeName string, nodeClient corev1.NodeInterface, driverName string, csiDriverNodeId string, maxLimit int64) error { + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Retrieve the latest version of Node before attempting update, so that + // existing changes are not overwritten. RetryOnConflict uses + // exponential backoff to avoid exhausting the apiserver. + node, getErr := nodeClient.Get(nodeName, metav1.GetOptions{}) + if getErr != nil { + glog.Errorf("Failed to get latest version of Node: %v", getErr) + return getErr // do not wrap error + } + var labelErr error + node, labelErr = addNodeIdToNode(node, driverName, csiDriverNodeId) + if labelErr != nil { + return labelErr + } + node = addMaxAttachLimitToNode(node, driverName, maxLimit) + + _, updateErr := nodeClient.Update(node) + if updateErr == nil { + glog.V(2).Infof( + "Updated node %q successfully for CSI driver %q and CSI node name %q", + nodeName, + driverName, + csiDriverNodeId) + } + return updateErr // do not wrap error + }) + if retryErr != nil { + return fmt.Errorf("error setting attach limit and labels for %s with : %v", driverName, retryErr) + } + return nil +} diff --git a/pkg/volume/util/BUILD b/pkg/volume/util/BUILD index 049558ce989..a6b371ede0f 100644 --- a/pkg/volume/util/BUILD +++ b/pkg/volume/util/BUILD @@ -49,6 +49,7 @@ go_test( name = "go_default_test", srcs = [ "atomic_writer_test.go", + "attach_limit_test.go", "device_util_linux_test.go", "nested_volumes_test.go", "resize_util_test.go", @@ -57,6 +58,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/apis/core/install:go_default_library", + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/util/slice:go_default_library", diff --git a/pkg/volume/util/attach_limit.go b/pkg/volume/util/attach_limit.go index 610f5f5b2cc..04bda52533e 100644 --- a/pkg/volume/util/attach_limit.go +++ b/pkg/volume/util/attach_limit.go @@ -16,6 +16,11 @@ limitations under the License. package util +import ( + "crypto/sha1" + "encoding/hex" +) + // This file is a common place holder for volume limit utility constants // shared between volume package and scheduler @@ -26,4 +31,25 @@ const ( AzureVolumeLimitKey = "attachable-volumes-azure-disk" // GCEVolumeLimitKey stores resource name that will store volume limits for GCE node GCEVolumeLimitKey = "attachable-volumes-gce-pd" + + // CSIAttachLimitPrefix defines prefix used for CSI volumes + CSIAttachLimitPrefix = "attachable-volumes-csi-" + + // ResourceNameLengthLimit stores maximum allowed Length for a ResourceName + ResourceNameLengthLimit = 63 ) + +// GetCSIAttachLimitKey returns limit key used for CSI volumes +func GetCSIAttachLimitKey(driverName string) string { + csiPrefixLength := len(CSIAttachLimitPrefix) + totalkeyLength := csiPrefixLength + len(driverName) + if totalkeyLength >= ResourceNameLengthLimit { + charsFromDriverName := driverName[:23] + hash := sha1.New() + hash.Write([]byte(driverName)) + hashed := hex.EncodeToString(hash.Sum(nil)) + hashed = hashed[:16] + return CSIAttachLimitPrefix + charsFromDriverName + hashed + } + return CSIAttachLimitPrefix + driverName +} diff --git a/pkg/volume/util/attach_limit_test.go b/pkg/volume/util/attach_limit_test.go new file mode 100644 index 00000000000..98e8e8d0331 --- /dev/null +++ b/pkg/volume/util/attach_limit_test.go @@ -0,0 +1,40 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "testing" + + "k8s.io/api/core/v1" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" +) + +func TestGetCSIAttachLimitKey(t *testing.T) { + // When driverName is less than 39 characters + csiLimitKey := GetCSIAttachLimitKey("com.amazon.ebs") + if csiLimitKey != "attachable-volumes-csi-com.amazon.ebs" { + t.Errorf("Expected com.amazon.ebs got %s", csiLimitKey) + } + + // When driver is longer than 39 chars + csiLimitKeyLonger := GetCSIAttachLimitKey("com.amazon.kubernetes.eks.ec2.ebs/csi-driver") + fmt.Println(csiLimitKeyLonger) + if !v1helper.IsAttachableVolumeResourceName(v1.ResourceName(csiLimitKeyLonger)) { + t.Errorf("Expected %s to have attachable prefix", csiLimitKeyLonger) + } +} diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 14b065bdf64..19678cf7212 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -140,6 +140,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { "GeneralPredicates", "MatchInterPodAffinity", "MaxAzureDiskVolumeCount", + "MaxCSIVolumeCountPred", "MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "NoDiskConflict",