kubernetes/test/integration/ipamperf/ipam_test.go
Kubernetes Submit Queue 5a54555f59
Merge pull request #63049 from andrewsykim/kcm-nodeipam
Automatic merge from submit-queue (batch tested with PRs 63049, 59731). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

re-enable nodeipam in kube-controller-manager

**What this PR does / why we need it**:
Re-enables nodeipam controller for external clouds. Also does a small refactor so that we don't need to pass in `allocateNodeCidr` into the controller. 

In v1.10 we made a change (9187b343e1 (diff-f11913dc67d80d36b3d06a93f61c49cf) in https://github.com/kubernetes/kubernetes/pull/57492) where nodeipam would be disabled for any cluster that sets `--cloud-provider=external`. The original intention behind this was that the nodeipam controller is cloud specific for some clouds (only GCE at the moment) so it should be moved to the CCM (cloud controller manager). After some discussions with wg-cloud-provider it makes sense to re-enable nodeipam controller in KCM and have GCE CCM enable its own cloud-specific IPAM controller as part of [Initialize()](https://github.com/kubernetes/kubernetes/blob/master/pkg/cloudprovider/cloud.go#L33-L35). This would allow for GCE to run nodeipam in both KCM (by setting --cloud-provider=gce and --allocate-node-cidr) and in the CCM (once implemented in `Initialize()`) without disabling nodeipam in the KCM for all external clouds and avoids having to implement nodeipam in CCM. 

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes # 

**Special notes for your reviewer**:


**Release note**:
```release-note
Re-enable nodeipam controller for external clouds. 
```
2018-05-11 11:07:12 -07:00

159 lines
4.9 KiB
Go

/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipamperf
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"os"
"testing"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/controller/nodeipam"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
"k8s.io/kubernetes/test/integration/util"
)
func setupAllocator(apiURL string, config *Config, clusterCIDR, serviceCIDR *net.IPNet, subnetMaskSize int) (*clientset.Clientset, util.ShutdownFunc, error) {
controllerStopChan := make(chan struct{})
shutdownFunc := func() {
close(controllerStopChan)
}
clientSet := clientset.NewForConfigOrDie(&restclient.Config{
Host: apiURL,
ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}},
QPS: float32(config.KubeQPS),
Burst: config.KubeQPS,
})
sharedInformer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour)
ipamController, err := nodeipam.NewNodeIpamController(
sharedInformer.Core().V1().Nodes(), config.Cloud, clientSet,
clusterCIDR, serviceCIDR, subnetMaskSize, config.AllocatorType,
)
if err != nil {
return nil, shutdownFunc, err
}
go ipamController.Run(controllerStopChan)
sharedInformer.Start(controllerStopChan)
return clientSet, shutdownFunc, nil
}
func runTest(t *testing.T, apiURL string, config *Config, clusterCIDR, serviceCIDR *net.IPNet, subnetMaskSize int) (*Results, error) {
t.Helper()
glog.Infof("Running test %s", t.Name())
defer deleteNodes(apiURL, config) // cleanup nodes on after controller shutdown
clientSet, shutdownFunc, err := setupAllocator(apiURL, config, clusterCIDR, serviceCIDR, subnetMaskSize)
if err != nil {
t.Fatalf("Error starting IPAM allocator: %v", err)
}
defer shutdownFunc()
o := NewObserver(clientSet, config.NumNodes)
if err := o.StartObserving(); err != nil {
t.Fatalf("Could not start test observer: %v", err)
}
if err := createNodes(apiURL, config); err != nil {
t.Fatalf("Could not create nodes: %v", err)
}
results := o.Results(t.Name(), config)
glog.Infof("Results: %s", results)
if !results.Succeeded {
t.Errorf("%s: Not allocations succeeded", t.Name())
}
return results, nil
}
func logResults(allResults []*Results) {
jStr, err := json.MarshalIndent(allResults, "", " ")
if err != nil {
glog.Errorf("Error formating results: %v", err)
return
}
if resultsLogFile != "" {
glog.Infof("Logging results to %s", resultsLogFile)
if err := ioutil.WriteFile(resultsLogFile, jStr, os.FileMode(0644)); err != nil {
glog.Errorf("Error logging results to %s: %v", resultsLogFile, err)
}
}
glog.Infof("AllResults:\n%s", string(jStr))
}
func TestPerformance(t *testing.T) {
if testing.Short() {
// TODO (#61854) find why flakiness is caused by etcd connectivity before enabling always
t.Skip("Skipping because we want to run short tests")
}
apiURL, masterShutdown := util.StartApiserver()
defer masterShutdown()
_, clusterCIDR, _ := net.ParseCIDR("10.96.0.0/11") // allows up to 8K nodes
_, serviceCIDR, _ := net.ParseCIDR("10.94.0.0/24") // does not matter for test - pick upto 250 services
subnetMaskSize := 24
var (
allResults []*Results
tests []*Config
)
if isCustom {
tests = append(tests, customConfig)
} else {
for _, numNodes := range []int{10, 100} {
for _, alloc := range []ipam.CIDRAllocatorType{ipam.RangeAllocatorType, ipam.CloudAllocatorType, ipam.IPAMFromClusterAllocatorType, ipam.IPAMFromCloudAllocatorType} {
tests = append(tests, &Config{AllocatorType: alloc, NumNodes: numNodes, CreateQPS: numNodes, KubeQPS: 10, CloudQPS: 10})
}
}
}
for _, test := range tests {
testName := fmt.Sprintf("%s-KubeQPS%d-Nodes%d", test.AllocatorType, test.KubeQPS, test.NumNodes)
t.Run(testName, func(t *testing.T) {
allocateCIDR := false
if test.AllocatorType == ipam.IPAMFromCloudAllocatorType || test.AllocatorType == ipam.CloudAllocatorType {
allocateCIDR = true
}
bil := newBaseInstanceList(allocateCIDR, clusterCIDR, subnetMaskSize)
cloud, err := util.NewMockGCECloud(bil.newMockCloud())
if err != nil {
t.Fatalf("Unable to create mock cloud: %v", err)
}
test.Cloud = cloud
if results, err := runTest(t, apiURL, test, clusterCIDR, serviceCIDR, subnetMaskSize); err == nil {
allResults = append(allResults, results)
}
})
}
logResults(allResults)
}