ad controller: optimize populateActualStateOfWorld
Reduce overall complexity from O(n^2) to O(n) Run time of the new benchmark is reduced from hours to 13s
This commit is contained in:
		| @@ -34,6 +34,7 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	coreinformers "k8s.io/client-go/informers/core/v1" | ||||
| @@ -384,6 +385,8 @@ func (adc *attachDetachController) populateActualStateOfWorld(logger klog.Logger | ||||
|  | ||||
| 	for _, node := range nodes { | ||||
| 		nodeName := types.NodeName(node.Name) | ||||
| 		volumesInUse := sets.New(node.Status.VolumesInUse...) | ||||
|  | ||||
| 		for _, attachedVolume := range node.Status.VolumesAttached { | ||||
| 			uniqueName := attachedVolume.Name | ||||
| 			// The nil VolumeSpec is safe only in the case the volume is not in use by any pod. | ||||
| @@ -396,9 +399,13 @@ func (adc *attachDetachController) populateActualStateOfWorld(logger klog.Logger | ||||
| 				logger.Error(err, "Failed to mark the volume as attached") | ||||
| 				continue | ||||
| 			} | ||||
| 			adc.processVolumesInUse(logger, nodeName, node.Status.VolumesInUse) | ||||
| 			adc.addNodeToDswp(node, types.NodeName(node.Name)) | ||||
| 			inUse := volumesInUse.Has(uniqueName) | ||||
| 			err = adc.actualStateOfWorld.SetVolumeMountedByNode(logger, uniqueName, nodeName, inUse) | ||||
| 			if err != nil { | ||||
| 				logger.Error(err, "Failed to set volume mounted by node") | ||||
| 			} | ||||
| 		} | ||||
| 		adc.addNodeToDswp(node, types.NodeName(node.Name)) | ||||
| 	} | ||||
| 	err = adc.processVolumeAttachments(logger) | ||||
| 	if err != nil { | ||||
|   | ||||
| @@ -28,6 +28,7 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/client-go/informers" | ||||
| 	"k8s.io/client-go/kubernetes/fake" | ||||
| 	kcache "k8s.io/client-go/tools/cache" | ||||
| 	"k8s.io/klog/v2/ktesting" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| @@ -162,6 +163,84 @@ func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkPopulateActualStateOfWorld(b *testing.B) { | ||||
| 	// Arrange | ||||
| 	fakeKubeClient := fake.NewSimpleClientset() | ||||
|  | ||||
| 	// populate 10000 nodes, each with 100 volumes | ||||
| 	for i := 0; i < 10000; i++ { | ||||
| 		nodeName := fmt.Sprintf("node-%d", i) | ||||
| 		node := &v1.Node{ | ||||
| 			ObjectMeta: metav1.ObjectMeta{ | ||||
| 				Name: nodeName, | ||||
| 				Labels: map[string]string{ | ||||
| 					"name": nodeName, | ||||
| 				}, | ||||
| 				Annotations: map[string]string{ | ||||
| 					util.ControllerManagedAttachAnnotation: "true", | ||||
| 				}, | ||||
| 			}, | ||||
| 		} | ||||
| 		for j := 0; j < 100; j++ { | ||||
| 			volumeName := v1.UniqueVolumeName(fmt.Sprintf("test-volume/vol-%d-%d", i, j)) | ||||
| 			node.Status.VolumesAttached = append(node.Status.VolumesAttached, v1.AttachedVolume{ | ||||
| 				Name:       volumeName, | ||||
| 				DevicePath: fmt.Sprintf("/dev/disk/by-id/vol-%d-%d", i, j), | ||||
| 			}) | ||||
| 			node.Status.VolumesInUse = append(node.Status.VolumesInUse, volumeName) | ||||
| 			_, err := fakeKubeClient.CoreV1().PersistentVolumes().Create(context.Background(), &v1.PersistentVolume{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name: fmt.Sprintf("vol-%d-%d", i, j), | ||||
| 				}, | ||||
| 			}, metav1.CreateOptions{}) | ||||
| 			if err != nil { | ||||
| 				b.Fatalf("failed to create PV: %v", err) | ||||
| 			} | ||||
| 		} | ||||
| 		_, err := fakeKubeClient.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{}) | ||||
| 		if err != nil { | ||||
| 			b.Fatalf("failed to create node: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| 	informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) | ||||
|  | ||||
| 	logger, ctx := ktesting.NewTestContext(b) | ||||
| 	ctx, cancel := context.WithCancel(ctx) | ||||
| 	defer cancel() | ||||
| 	adcObj, err := NewAttachDetachController( | ||||
| 		logger, | ||||
| 		fakeKubeClient, | ||||
| 		informerFactory.Core().V1().Pods(), | ||||
| 		informerFactory.Core().V1().Nodes(), | ||||
| 		informerFactory.Core().V1().PersistentVolumeClaims(), | ||||
| 		informerFactory.Core().V1().PersistentVolumes(), | ||||
| 		informerFactory.Storage().V1().CSINodes(), | ||||
| 		informerFactory.Storage().V1().CSIDrivers(), | ||||
| 		informerFactory.Storage().V1().VolumeAttachments(), | ||||
| 		nil, /* cloud */ | ||||
| 		nil, /* plugins */ | ||||
| 		nil, /* prober */ | ||||
| 		false, | ||||
| 		5*time.Second, | ||||
| 		DefaultTimerConfig, | ||||
| 	) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err) | ||||
| 	} | ||||
| 	adc := adcObj.(*attachDetachController) | ||||
|  | ||||
| 	// Act | ||||
| 	informerFactory.Start(ctx.Done()) | ||||
| 	informerFactory.WaitForCacheSync(ctx.Done()) | ||||
|  | ||||
| 	b.ResetTimer() | ||||
| 	err = adc.populateActualStateOfWorld(logger) | ||||
| 	if err != nil { | ||||
| 		b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func Test_AttachDetachControllerRecovery(t *testing.T) { | ||||
| 	attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{}) | ||||
| 	newPod1 := controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1") | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 huweiwen
					huweiwen