Merge pull request #122293 from mengjiao-liu/controller-reconsider-log-verbosity

kube-controller-manager: readjust log verbosity
This commit is contained in:
Kubernetes Prow Robot
2024-02-29 11:55:21 -08:00
committed by GitHub
66 changed files with 770 additions and 649 deletions

View File

@@ -30,11 +30,11 @@ import (
clientset "k8s.io/client-go/kubernetes"
clientbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/cronjob"
"k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
)
func setup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *cronjob.ControllerV2, *job.Controller, informers.SharedInformerFactory, clientset.Interface) {
@@ -148,16 +148,13 @@ func validateJobAndPod(t *testing.T, clientSet clientset.Interface, namespace st
}
func TestCronJobLaunchesPodAndCleansUp(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tCtx := ktesting.Init(t)
closeFn, cjc, jc, informerSet, clientSet := setup(ctx, t)
closeFn, cjc, jc, informerSet, clientSet := setup(tCtx, t)
defer closeFn()
// When shutting down, cancel must be called before closeFn.
// We simply call it multiple times.
defer cancel()
defer tCtx.Cancel("test has completed")
cronJobName := "foo"
namespaceName := "simple-cronjob-test"
@@ -167,11 +164,11 @@ func TestCronJobLaunchesPodAndCleansUp(t *testing.T) {
cjClient := clientSet.BatchV1().CronJobs(ns.Name)
informerSet.Start(ctx.Done())
go cjc.Run(ctx, 1)
go jc.Run(ctx, 1)
informerSet.Start(tCtx.Done())
go cjc.Run(tCtx, 1)
go jc.Run(tCtx, 1)
_, err := cjClient.Create(context.TODO(), newCronJob(cronJobName, ns.Name, "* * * * ?"), metav1.CreateOptions{})
_, err := cjClient.Create(tCtx, newCronJob(cronJobName, ns.Name, "* * * * ?"), metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create CronJob: %v", err)
}

View File

@@ -30,7 +30,6 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/deployment"
@@ -105,7 +104,6 @@ func newDeployment(name, ns string, replicas int32) *apps.Deployment {
func dcSetup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *replicaset.ReplicaSetController, *deployment.DeploymentController, informers.SharedInformerFactory, clientset.Interface) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
logger, _ := ktesting.NewTestContext(t)
config := restclient.CopyConfig(server.ClientConfig)
clientSet, err := clientset.NewForConfig(config)
@@ -126,7 +124,7 @@ func dcSetup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFu
t.Fatalf("error creating Deployment controller: %v", err)
}
rm := replicaset.NewReplicaSetController(
logger,
ctx,
informers.Apps().V1().ReplicaSets(),
informers.Core().V1().Pods(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "replicaset-controller")),

View File

@@ -539,14 +539,15 @@ func createPDBUsingRemovedAPI(ctx context.Context, etcdClient *clientv3.Client,
}
func TestPatchCompatibility(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
s, pdbc, _, clientSet, _, _ := setup(ctx, t)
tCtx := ktesting.Init(t)
s, pdbc, _, clientSet, _, _ := setup(tCtx, t)
defer s.TearDownFn()
// Even though pdbc isn't used in this test, its creation is already
// spawning some goroutines. So we need to run it to ensure they won't leak.
cancel()
pdbc.Run(ctx)
// We can't cancel immediately but later, because when the context is canceled,
// the event broadcaster will be shut down .
defer tCtx.Cancel("cleaning up")
go pdbc.Run(tCtx)
testcases := []struct {
name string

View File

@@ -17,7 +17,6 @@ limitations under the License.
package dualstack
import (
"context"
"fmt"
"testing"
"time"
@@ -44,11 +43,9 @@ func TestDualStackEndpoints(t *testing.T) {
return map[string]string{"foo": "bar"}
}
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tCtx := ktesting.Init(t)
client, _, tearDownFn := framework.StartTestServer(ctx, t, framework.TestServerSetup{
client, _, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR)
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
@@ -59,7 +56,7 @@ func TestDualStackEndpoints(t *testing.T) {
// Wait until the default "kubernetes" service is created.
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{})
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(tCtx, "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
@@ -88,11 +85,12 @@ func TestDualStackEndpoints(t *testing.T) {
},
},
}
if _, err := client.CoreV1().Nodes().Create(ctx, testNode, metav1.CreateOptions{}); err != nil {
if _, err := client.CoreV1().Nodes().Create(tCtx, testNode, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Node %q: %v", testNode.Name, err)
}
epController := endpoint.NewEndpointController(
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().Services(),
informers.Core().V1().Endpoints(),
@@ -100,7 +98,7 @@ func TestDualStackEndpoints(t *testing.T) {
1*time.Second)
epsController := endpointslice.NewController(
ctx,
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().Services(),
informers.Core().V1().Nodes(),
@@ -110,10 +108,10 @@ func TestDualStackEndpoints(t *testing.T) {
1*time.Second)
// Start informer and controllers
informers.Start(ctx.Done())
informers.Start(tCtx.Done())
// use only one worker to serialize the updates
go epController.Run(ctx, 1)
go epsController.Run(ctx, 1)
go epController.Run(tCtx, 1)
go epsController.Run(tCtx, 1)
var testcases = []struct {
name string
@@ -170,7 +168,7 @@ func TestDualStackEndpoints(t *testing.T) {
},
}
createdPod, err := client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{})
createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}
@@ -181,7 +179,7 @@ func TestDualStackEndpoints(t *testing.T) {
Phase: v1.PodRunning,
PodIPs: []v1.PodIP{{IP: podIPbyFamily[v1.IPv4Protocol]}, {IP: podIPbyFamily[v1.IPv6Protocol]}},
}
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, createdPod, metav1.UpdateOptions{})
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
}
@@ -209,7 +207,7 @@ func TestDualStackEndpoints(t *testing.T) {
}
// create a service
_, err = client.CoreV1().Services(ns.Name).Create(ctx, svc, metav1.CreateOptions{})
_, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating service: %v", err)
}
@@ -218,7 +216,7 @@ func TestDualStackEndpoints(t *testing.T) {
// legacy endpoints are not dual stack
// and use the address of the first IP family
if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
e, err := client.CoreV1().Endpoints(ns.Name).Get(ctx, svc.Name, metav1.GetOptions{})
e, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
if err != nil {
t.Logf("Error fetching endpoints: %v", err)
return false, nil
@@ -240,7 +238,7 @@ func TestDualStackEndpoints(t *testing.T) {
// wait until the endpoint slices are created
err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
lSelector := discovery.LabelServiceName + "=" + svc.Name
esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(ctx, metav1.ListOptions{LabelSelector: lSelector})
esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(tCtx, metav1.ListOptions{LabelSelector: lSelector})
if err != nil {
t.Logf("Error listing EndpointSlices: %v", err)
return false, nil

View File

@@ -17,7 +17,6 @@ limitations under the License.
package endpoints
import (
"context"
"errors"
"fmt"
"testing"
@@ -33,6 +32,7 @@ import (
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestEndpointUpdates(t *testing.T) {
@@ -47,7 +47,9 @@ func TestEndpointUpdates(t *testing.T) {
informers := informers.NewSharedInformerFactory(client, 0)
tCtx := ktesting.Init(t)
epController := endpoint.NewEndpointController(
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().Services(),
informers.Core().V1().Endpoints(),
@@ -55,10 +57,8 @@ func TestEndpointUpdates(t *testing.T) {
0)
// Start informer and controllers
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go epController.Run(ctx, 1)
informers.Start(tCtx.Done())
go epController.Run(tCtx, 1)
// Create namespace
ns := framework.CreateNamespaceOrDie(client, "test-endpoints-updates", t)
@@ -82,7 +82,7 @@ func TestEndpointUpdates(t *testing.T) {
},
}
createdPod, err := client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{})
createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}
@@ -92,14 +92,14 @@ func TestEndpointUpdates(t *testing.T) {
Phase: v1.PodRunning,
PodIPs: []v1.PodIP{{IP: "1.1.1.1"}, {IP: "2001:db8::"}},
}
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, createdPod, metav1.UpdateOptions{})
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
}
// Create a service associated to the pod
svc := newService(ns.Name, "foo1")
svc1, err := client.CoreV1().Services(ns.Name).Create(ctx, svc, metav1.CreateOptions{})
svc1, err := client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
}
@@ -107,7 +107,7 @@ func TestEndpointUpdates(t *testing.T) {
// Obtain ResourceVersion of the new endpoint created
var resVersion string
if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(ctx, svc.Name, metav1.GetOptions{})
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
if err != nil {
t.Logf("error fetching endpoints: %v", err)
return false, nil
@@ -120,7 +120,7 @@ func TestEndpointUpdates(t *testing.T) {
// Force recomputation on the endpoint controller
svc1.SetAnnotations(map[string]string{"foo": "bar"})
_, err = client.CoreV1().Services(ns.Name).Update(ctx, svc1, metav1.UpdateOptions{})
_, err = client.CoreV1().Services(ns.Name).Update(tCtx, svc1, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update service %s: %v", svc1.Name, err)
}
@@ -130,13 +130,13 @@ func TestEndpointUpdates(t *testing.T) {
// was recomputed before asserting, since we only have 1 worker
// in the endpoint controller
svc2 := newService(ns.Name, "foo2")
_, err = client.CoreV1().Services(ns.Name).Create(ctx, svc2, metav1.CreateOptions{})
_, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc2, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
}
if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
_, err := client.CoreV1().Endpoints(ns.Name).Get(ctx, svc2.Name, metav1.GetOptions{})
_, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc2.Name, metav1.GetOptions{})
if err != nil {
t.Logf("error fetching endpoints: %v", err)
return false, nil
@@ -148,7 +148,7 @@ func TestEndpointUpdates(t *testing.T) {
// the endpoint controller should not update the endpoint created for the original
// service since nothing has changed, the resource version has to be the same
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(ctx, svc.Name, metav1.GetOptions{})
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("error fetching endpoints: %v", err)
}
@@ -173,7 +173,9 @@ func TestExternalNameToClusterIPTransition(t *testing.T) {
informers := informers.NewSharedInformerFactory(client, 0)
tCtx := ktesting.Init(t)
epController := endpoint.NewEndpointController(
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().Services(),
informers.Core().V1().Endpoints(),
@@ -181,10 +183,8 @@ func TestExternalNameToClusterIPTransition(t *testing.T) {
0)
// Start informer and controllers
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go epController.Run(ctx, 1)
informers.Start(tCtx.Done())
go epController.Run(tCtx, 1)
// Create namespace
ns := framework.CreateNamespaceOrDie(client, "test-endpoints-updates", t)
@@ -208,7 +208,7 @@ func TestExternalNameToClusterIPTransition(t *testing.T) {
},
}
createdPod, err := client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{})
createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}
@@ -218,20 +218,20 @@ func TestExternalNameToClusterIPTransition(t *testing.T) {
Phase: v1.PodRunning,
PodIPs: []v1.PodIP{{IP: "1.1.1.1"}, {IP: "2001:db8::"}},
}
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, createdPod, metav1.UpdateOptions{})
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
}
// Create an ExternalName service associated to the pod
svc := newExternalNameService(ns.Name, "foo1")
svc1, err := client.CoreV1().Services(ns.Name).Create(ctx, svc, metav1.CreateOptions{})
svc1, err := client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
}
err = wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) {
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(ctx, svc.Name, metav1.GetOptions{})
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
if err == nil {
t.Errorf("expected no endpoints for externalName service, got: %v", endpoints)
return true, nil
@@ -244,13 +244,13 @@ func TestExternalNameToClusterIPTransition(t *testing.T) {
// update service to ClusterIP type and verify endpoint was created
svc1.Spec.Type = v1.ServiceTypeClusterIP
_, err = client.CoreV1().Services(ns.Name).Update(ctx, svc1, metav1.UpdateOptions{})
_, err = client.CoreV1().Services(ns.Name).Update(tCtx, svc1, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update service %s: %v", svc1.Name, err)
}
if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
ep, err := client.CoreV1().Endpoints(ns.Name).Get(ctx, svc1.Name, metav1.GetOptions{})
ep, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc1.Name, metav1.GetOptions{})
if err != nil {
t.Logf("no endpoints found, error: %v", err)
return false, nil
@@ -282,7 +282,9 @@ func TestEndpointWithTerminatingPod(t *testing.T) {
informers := informers.NewSharedInformerFactory(client, 0)
tCtx := ktesting.Init(t)
epController := endpoint.NewEndpointController(
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().Services(),
informers.Core().V1().Endpoints(),
@@ -290,10 +292,8 @@ func TestEndpointWithTerminatingPod(t *testing.T) {
0)
// Start informer and controllers
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go epController.Run(ctx, 1)
informers.Start(tCtx.Done())
go epController.Run(tCtx, 1)
// Create namespace
ns := framework.CreateNamespaceOrDie(client, "test-endpoints-terminating", t)
@@ -337,13 +337,13 @@ func TestEndpointWithTerminatingPod(t *testing.T) {
},
}
createdPod, err := client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{})
createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}
createdPod.Status = pod.Status
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, createdPod, metav1.UpdateOptions{})
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
}
@@ -366,14 +366,14 @@ func TestEndpointWithTerminatingPod(t *testing.T) {
},
},
}
_, err = client.CoreV1().Services(ns.Name).Create(ctx, svc, metav1.CreateOptions{})
_, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
}
// poll until associated Endpoints to the previously created Service exists
if err := wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) {
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(ctx, svc.Name, metav1.GetOptions{})
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
if err != nil {
return false, nil
}
@@ -392,7 +392,7 @@ func TestEndpointWithTerminatingPod(t *testing.T) {
t.Fatalf("endpoints not found: %v", err)
}
err = client.CoreV1().Pods(ns.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})
err = client.CoreV1().Pods(ns.Name).Delete(tCtx, pod.Name, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("error deleting test pod: %v", err)
}
@@ -401,7 +401,7 @@ func TestEndpointWithTerminatingPod(t *testing.T) {
if err := wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) {
// Ensure that the recently deleted Pod exists but with a deletion timestamp. If the Pod does not exist,
// we should fail the test since it is no longer validating against a terminating pod.
pod, err := client.CoreV1().Pods(ns.Name).Get(ctx, pod.Name, metav1.GetOptions{})
pod, err := client.CoreV1().Pods(ns.Name).Get(tCtx, pod.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, fmt.Errorf("expected Pod %q to exist with deletion timestamp but was not found: %v", pod.Name, err)
}
@@ -413,7 +413,7 @@ func TestEndpointWithTerminatingPod(t *testing.T) {
return false, errors.New("pod did not have deletion timestamp set")
}
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(ctx, svc.Name, metav1.GetOptions{})
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
if err != nil {
return false, nil
}

View File

@@ -30,12 +30,12 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/endpointslice"
"k8s.io/kubernetes/pkg/controller/endpointslicemirroring"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestEndpointSliceMirroring(t *testing.T) {
@@ -48,11 +48,12 @@ func TestEndpointSliceMirroring(t *testing.T) {
t.Fatalf("Error creating clientset: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
tCtx := ktesting.Init(t)
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(client, resyncPeriod)
epController := endpoint.NewEndpointController(
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().Services(),
informers.Core().V1().Endpoints(),
@@ -60,7 +61,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
1*time.Second)
epsController := endpointslice.NewController(
ctx,
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().Services(),
informers.Core().V1().Nodes(),
@@ -70,7 +71,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
1*time.Second)
epsmController := endpointslicemirroring.NewController(
ctx,
tCtx,
informers.Core().V1().Endpoints(),
informers.Discovery().V1().EndpointSlices(),
informers.Core().V1().Services(),
@@ -79,11 +80,10 @@ func TestEndpointSliceMirroring(t *testing.T) {
1*time.Second)
// Start informer and controllers
defer cancel()
informers.Start(ctx.Done())
go epController.Run(ctx, 5)
go epsController.Run(ctx, 5)
go epsmController.Run(ctx, 5)
informers.Start(tCtx.Done())
go epController.Run(tCtx, 5)
go epsController.Run(tCtx, 5)
go epsmController.Run(tCtx, 5)
testCases := []struct {
testName string
@@ -259,7 +259,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
if tc.service != nil {
resourceName = tc.service.Name
tc.service.Namespace = ns.Name
_, err = client.CoreV1().Services(ns.Name).Create(ctx, tc.service, metav1.CreateOptions{})
_, err = client.CoreV1().Services(ns.Name).Create(tCtx, tc.service, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating service: %v", err)
}
@@ -268,7 +268,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
if tc.customEndpoints != nil {
resourceName = tc.customEndpoints.Name
tc.customEndpoints.Namespace = ns.Name
_, err = client.CoreV1().Endpoints(ns.Name).Create(ctx, tc.customEndpoints, metav1.CreateOptions{})
_, err = client.CoreV1().Endpoints(ns.Name).Create(tCtx, tc.customEndpoints, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating endpoints: %v", err)
}
@@ -276,7 +276,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
lSelector := discovery.LabelServiceName + "=" + resourceName
esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(ctx, metav1.ListOptions{LabelSelector: lSelector})
esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(tCtx, metav1.ListOptions{LabelSelector: lSelector})
if err != nil {
t.Logf("Error listing EndpointSlices: %v", err)
return false, err
@@ -312,7 +312,6 @@ func TestEndpointSliceMirroring(t *testing.T) {
}
func TestEndpointSliceMirroringUpdates(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
defer server.TearDownFn()
@@ -325,8 +324,9 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) {
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(client, resyncPeriod)
tCtx := ktesting.Init(t)
epsmController := endpointslicemirroring.NewController(
ctx,
tCtx,
informers.Core().V1().Endpoints(),
informers.Discovery().V1().EndpointSlices(),
informers.Core().V1().Services(),
@@ -335,10 +335,8 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) {
1*time.Second)
// Start informer and controllers
ctx, cancel := context.WithCancel(ctx)
defer cancel()
informers.Start(ctx.Done())
go epsmController.Run(ctx, 1)
informers.Start(tCtx.Done())
go epsmController.Run(tCtx, 1)
testCases := []struct {
testName string
@@ -405,19 +403,19 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) {
}},
}
_, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{})
_, err = client.CoreV1().Services(ns.Name).Create(tCtx, service, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating service: %v", err)
}
_, err = client.CoreV1().Endpoints(ns.Name).Create(ctx, customEndpoints, metav1.CreateOptions{})
_, err = client.CoreV1().Endpoints(ns.Name).Create(tCtx, customEndpoints, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating endpoints: %v", err)
}
// update endpoint
tc.tweakEndpoint(customEndpoints)
_, err = client.CoreV1().Endpoints(ns.Name).Update(ctx, customEndpoints, metav1.UpdateOptions{})
_, err = client.CoreV1().Endpoints(ns.Name).Update(tCtx, customEndpoints, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error updating endpoints: %v", err)
}
@@ -425,7 +423,7 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) {
// verify the endpoint updates were mirrored
err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
lSelector := discovery.LabelServiceName + "=" + service.Name
esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(ctx, metav1.ListOptions{LabelSelector: lSelector})
esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(tCtx, metav1.ListOptions{LabelSelector: lSelector})
if err != nil {
t.Logf("Error listing EndpointSlices: %v", err)
return false, err
@@ -489,7 +487,6 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) {
}
func TestEndpointSliceMirroringSelectorTransition(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
defer server.TearDownFn()
@@ -502,8 +499,9 @@ func TestEndpointSliceMirroringSelectorTransition(t *testing.T) {
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(client, resyncPeriod)
tCtx := ktesting.Init(t)
epsmController := endpointslicemirroring.NewController(
ctx,
tCtx,
informers.Core().V1().Endpoints(),
informers.Discovery().V1().EndpointSlices(),
informers.Core().V1().Services(),
@@ -512,10 +510,8 @@ func TestEndpointSliceMirroringSelectorTransition(t *testing.T) {
1*time.Second)
// Start informer and controllers
ctx, cancel := context.WithCancel(ctx)
defer cancel()
informers.Start(ctx.Done())
go epsmController.Run(ctx, 1)
informers.Start(tCtx.Done())
go epsmController.Run(tCtx, 1)
testCases := []struct {
testName string

View File

@@ -29,10 +29,10 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/endpointslice"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
utilpointer "k8s.io/utils/pointer"
)
@@ -116,9 +116,9 @@ func TestEndpointSliceTerminating(t *testing.T) {
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(client, resyncPeriod)
_, ctx := ktesting.NewTestContext(t)
tCtx := ktesting.Init(t)
epsController := endpointslice.NewController(
ctx,
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().Services(),
informers.Core().V1().Nodes(),
@@ -128,10 +128,8 @@ func TestEndpointSliceTerminating(t *testing.T) {
1*time.Second)
// Start informer and controllers
ctx, cancel := context.WithCancel(ctx)
defer cancel()
informers.Start(ctx.Done())
go epsController.Run(ctx, 1)
informers.Start(tCtx.Done())
go epsController.Run(tCtx, 1)
// Create namespace
ns := framework.CreateNamespaceOrDie(client, "test-endpoints-terminating", t)

View File

@@ -47,11 +47,11 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
@@ -247,9 +247,13 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
}
sharedInformers := informers.NewSharedInformerFactory(clientSet, 0)
metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0)
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := garbagecollector.NewGarbageCollector(
ctx,
clientSet,
metadataClient,
restMapper,
@@ -261,8 +265,6 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
t.Fatalf("failed to create garbage collector: %v", err)
}
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
tearDown := func() {
cancel()
result.TearDownFn()

View File

@@ -37,13 +37,13 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/controller"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
quotainstall "k8s.io/kubernetes/pkg/quota/v1/install"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
)
const (
@@ -60,7 +60,7 @@ const (
// quota_test.go:100: Took 4.196205966s to scale up without quota
// quota_test.go:115: Took 12.021640372s to scale up with quota
func TestQuota(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -82,7 +82,7 @@ func TestQuota(t *testing.T) {
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
rm := replicationcontroller.NewReplicationManager(
logger,
ctx,
informers.Core().V1().Pods(),
informers.Core().V1().ReplicationControllers(),
clientset,
@@ -291,12 +291,10 @@ plugins:
t.Fatal(err)
}
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tCtx := ktesting.Init(t)
// Set up an API server
_, kubeConfig, tearDownFn := framework.StartTestServer(ctx, t, framework.TestServerSetup{
_, kubeConfig, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
@@ -313,13 +311,13 @@ plugins:
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
rm := replicationcontroller.NewReplicationManager(
logger,
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().ReplicationControllers(),
clientset,
replicationcontroller.BurstReplicas,
)
go rm.Run(ctx, 3)
go rm.Run(tCtx, 3)
discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
listerFuncForResource := generic.ListerFuncForResourceFunc(informers.ForResource)
@@ -336,16 +334,16 @@ plugins:
InformersStarted: informersStarted,
Registry: generic.NewRegistry(qc.Evaluators()),
}
resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions)
resourceQuotaController, err := resourcequotacontroller.NewController(tCtx, resourceQuotaControllerOptions)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
go resourceQuotaController.Run(ctx, 2)
go resourceQuotaController.Run(tCtx, 2)
// Periodically the quota controller to detect new resource types
go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second)
go resourceQuotaController.Sync(tCtx, discoveryFunc, 30*time.Second)
informers.Start(ctx.Done())
informers.Start(tCtx.Done())
close(informersStarted)
// try to create a pod
@@ -363,7 +361,7 @@ plugins:
},
},
}
if _, err := clientset.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{}); err == nil {
if _, err := clientset.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}); err == nil {
t.Fatalf("expected error for insufficient quota")
}
@@ -386,7 +384,7 @@ plugins:
// attempt to create a new pod once the quota is propagated
err = wait.PollImmediate(5*time.Second, time.Minute, func() (bool, error) {
// retry until we succeed (to allow time for all changes to propagate)
if _, err := clientset.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{}); err == nil {
if _, err := clientset.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}); err == nil {
return true, nil
}
return false, nil
@@ -419,12 +417,10 @@ plugins:
t.Fatal(err)
}
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tCtx := ktesting.Init(t)
// Set up an API server
_, kubeConfig, tearDownFn := framework.StartTestServer(ctx, t, framework.TestServerSetup{
_, kubeConfig, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
@@ -441,13 +437,13 @@ plugins:
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
rm := replicationcontroller.NewReplicationManager(
logger,
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().ReplicationControllers(),
clientset,
replicationcontroller.BurstReplicas,
)
go rm.Run(ctx, 3)
go rm.Run(tCtx, 3)
discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
listerFuncForResource := generic.ListerFuncForResourceFunc(informers.ForResource)
@@ -464,16 +460,16 @@ plugins:
InformersStarted: informersStarted,
Registry: generic.NewRegistry(qc.Evaluators()),
}
resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions)
resourceQuotaController, err := resourcequotacontroller.NewController(tCtx, resourceQuotaControllerOptions)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
go resourceQuotaController.Run(ctx, 2)
go resourceQuotaController.Run(tCtx, 2)
// Periodically the quota controller to detect new resource types
go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second)
go resourceQuotaController.Sync(tCtx, discoveryFunc, 30*time.Second)
informers.Start(ctx.Done())
informers.Start(tCtx.Done())
close(informersStarted)
// now create a covering quota
@@ -496,14 +492,14 @@ plugins:
// Creating the first node port service should succeed
nodePortService := newService("np-svc", v1.ServiceTypeNodePort, true)
_, err = clientset.CoreV1().Services(ns.Name).Create(ctx, nodePortService, metav1.CreateOptions{})
_, err = clientset.CoreV1().Services(ns.Name).Create(tCtx, nodePortService, metav1.CreateOptions{})
if err != nil {
t.Errorf("creating first node port Service should not have returned error: %v", err)
}
// Creating the first loadbalancer service should succeed
lbServiceWithNodePort1 := newService("lb-svc-withnp1", v1.ServiceTypeLoadBalancer, true)
_, err = clientset.CoreV1().Services(ns.Name).Create(ctx, lbServiceWithNodePort1, metav1.CreateOptions{})
_, err = clientset.CoreV1().Services(ns.Name).Create(tCtx, lbServiceWithNodePort1, metav1.CreateOptions{})
if err != nil {
t.Errorf("creating first loadbalancer Service should not have returned error: %v", err)
}
@@ -522,7 +518,7 @@ plugins:
// Creating a loadbalancer Service without node ports should succeed
lbServiceWithoutNodePort1 := newService("lb-svc-wonp1", v1.ServiceTypeLoadBalancer, false)
_, err = clientset.CoreV1().Services(ns.Name).Create(ctx, lbServiceWithoutNodePort1, metav1.CreateOptions{})
_, err = clientset.CoreV1().Services(ns.Name).Create(tCtx, lbServiceWithoutNodePort1, metav1.CreateOptions{})
if err != nil {
t.Errorf("creating another loadbalancer Service without node ports should not have returned error: %v", err)
}
@@ -541,7 +537,7 @@ plugins:
// Creating a ClusterIP Service should succeed
clusterIPService1 := newService("clusterip-svc1", v1.ServiceTypeClusterIP, false)
_, err = clientset.CoreV1().Services(ns.Name).Create(ctx, clusterIPService1, metav1.CreateOptions{})
_, err = clientset.CoreV1().Services(ns.Name).Create(tCtx, clusterIPService1, metav1.CreateOptions{})
if err != nil {
t.Errorf("creating a cluster IP Service should not have returned error: %v", err)
}

View File

@@ -40,7 +40,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core"
@@ -48,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
testutil "k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
@@ -118,7 +118,8 @@ func newMatchingPod(podName, namespace string) *v1.Pod {
}
}
func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *replicaset.ReplicaSetController, informers.SharedInformerFactory, clientset.Interface) {
func rmSetup(t *testing.T) (context.Context, kubeapiservertesting.TearDownFunc, *replicaset.ReplicaSetController, informers.SharedInformerFactory, clientset.Interface) {
tCtx := ktesting.Init(t)
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
@@ -129,17 +130,21 @@ func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *replicaset.Repli
}
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "rs-informers")), resyncPeriod)
logger, _ := ktesting.NewTestContext(t)
rm := replicaset.NewReplicaSetController(
logger,
tCtx,
informers.Apps().V1().ReplicaSets(),
informers.Core().V1().Pods(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "replicaset-controller")),
replicaset.BurstReplicas,
)
return server.TearDownFn, rm, informers, clientSet
newTeardown := func() {
tCtx.Cancel("tearing down controller")
server.TearDownFn()
}
return tCtx, newTeardown, rm, informers, clientSet
}
func rmSimpleSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, clientset.Interface) {
@@ -426,22 +431,23 @@ func TestAdoption(t *testing.T) {
}
for i, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
closeFn, rm, informers, clientSet := rmSetup(t)
tCtx, closeFn, rm, informers, clientSet := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientSet, fmt.Sprintf("rs-adoption-%d", i), t)
defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
rsClient := clientSet.AppsV1().ReplicaSets(ns.Name)
podClient := clientSet.CoreV1().Pods(ns.Name)
const rsName = "rs"
rs, err := rsClient.Create(context.TODO(), newRS(rsName, ns.Name, 1), metav1.CreateOptions{})
rs, err := rsClient.Create(tCtx, newRS(rsName, ns.Name, 1), metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create replica set: %v", err)
}
podName := fmt.Sprintf("pod%d", i)
pod := newMatchingPod(podName, ns.Name)
pod.OwnerReferences = tc.existingOwnerReferences(rs)
_, err = podClient.Create(context.TODO(), pod, metav1.CreateOptions{})
_, err = podClient.Create(tCtx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create Pod: %v", err)
}
@@ -449,7 +455,7 @@ func TestAdoption(t *testing.T) {
stopControllers := runControllerAndInformers(t, rm, informers, 1)
defer stopControllers()
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
updatedPod, err := podClient.Get(context.TODO(), pod.Name, metav1.GetOptions{})
updatedPod, err := podClient.Get(tCtx, pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
@@ -497,7 +503,7 @@ func TestRSSelectorImmutability(t *testing.T) {
}
func TestSpecReplicasChange(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-spec-replicas-change", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -528,7 +534,7 @@ func TestSpecReplicasChange(t *testing.T) {
}
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newRS, err := rsClient.Get(context.TODO(), rs.Name, metav1.GetOptions{})
newRS, err := rsClient.Get(tCtx, rs.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
@@ -539,8 +545,9 @@ func TestSpecReplicasChange(t *testing.T) {
}
func TestDeletingAndFailedPods(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-deleting-and-failed-pods", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
stopControllers := runControllerAndInformers(t, rm, informers, 0)
@@ -564,7 +571,7 @@ func TestDeletingAndFailedPods(t *testing.T) {
updatePod(t, podClient, deletingPod.Name, func(pod *v1.Pod) {
pod.Finalizers = []string{"fake.example.com/blockDeletion"}
})
if err := c.CoreV1().Pods(ns.Name).Delete(context.TODO(), deletingPod.Name, metav1.DeleteOptions{}); err != nil {
if err := c.CoreV1().Pods(ns.Name).Delete(tCtx, deletingPod.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Error deleting pod %s: %v", deletingPod.Name, err)
}
@@ -642,7 +649,7 @@ func TestPodDeletionCost(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDeletionCost, tc.enabled)()
closeFn, rm, informers, c := rmSetup(t)
_, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, tc.name, t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -701,7 +708,7 @@ func TestPodDeletionCost(t *testing.T) {
}
func TestOverlappingRSs(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-overlapping-rss", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -725,7 +732,7 @@ func TestOverlappingRSs(t *testing.T) {
// Expect both RSs have .status.replicas = .spec.replicas
for i := 0; i < 2; i++ {
newRS, err := c.AppsV1().ReplicaSets(ns.Name).Get(context.TODO(), fmt.Sprintf("rs-%d", i+1), metav1.GetOptions{})
newRS, err := c.AppsV1().ReplicaSets(ns.Name).Get(tCtx, fmt.Sprintf("rs-%d", i+1), metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to obtain rs rs-%d: %v", i+1, err)
}
@@ -736,7 +743,7 @@ func TestOverlappingRSs(t *testing.T) {
}
func TestPodOrphaningAndAdoptionWhenLabelsChange(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-pod-orphaning-and-adoption-when-labels-change", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -765,7 +772,7 @@ func TestPodOrphaningAndAdoptionWhenLabelsChange(t *testing.T) {
pod.Labels = newLabelMap
})
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newPod, err := podClient.Get(context.TODO(), pod.Name, metav1.GetOptions{})
newPod, err := podClient.Get(tCtx, pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
@@ -780,7 +787,7 @@ func TestPodOrphaningAndAdoptionWhenLabelsChange(t *testing.T) {
pod.Labels = labelMap()
})
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newPod, err := podClient.Get(context.TODO(), pod.Name, metav1.GetOptions{})
newPod, err := podClient.Get(tCtx, pod.Name, metav1.GetOptions{})
if err != nil {
// If the pod is not found, it means the RS picks the pod for deletion (it is extra)
// Verify there is only one pod in namespace and it has ControllerRef to the RS
@@ -814,7 +821,7 @@ func TestPodOrphaningAndAdoptionWhenLabelsChange(t *testing.T) {
}
func TestGeneralPodAdoption(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
_, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-general-pod-adoption", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -846,7 +853,7 @@ func TestGeneralPodAdoption(t *testing.T) {
}
func TestReadyAndAvailableReplicas(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-ready-and-available-replicas", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -886,7 +893,7 @@ func TestReadyAndAvailableReplicas(t *testing.T) {
rsClient := c.AppsV1().ReplicaSets(ns.Name)
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newRS, err := rsClient.Get(context.TODO(), rs.Name, metav1.GetOptions{})
newRS, err := rsClient.Get(tCtx, rs.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
@@ -898,7 +905,7 @@ func TestReadyAndAvailableReplicas(t *testing.T) {
}
func TestRSScaleSubresource(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
_, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-rs-scale-subresource", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -917,7 +924,7 @@ func TestRSScaleSubresource(t *testing.T) {
}
func TestExtraPodsAdoptionAndDeletion(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
_, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-extra-pods-adoption-and-deletion", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -949,7 +956,7 @@ func TestExtraPodsAdoptionAndDeletion(t *testing.T) {
}
func TestFullyLabeledReplicas(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-fully-labeled-replicas", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -981,7 +988,7 @@ func TestFullyLabeledReplicas(t *testing.T) {
// Verify only one pod is fully labeled
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newRS, err := rsClient.Get(context.TODO(), rs.Name, metav1.GetOptions{})
newRS, err := rsClient.Get(tCtx, rs.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
@@ -992,7 +999,7 @@ func TestFullyLabeledReplicas(t *testing.T) {
}
func TestReplicaSetsAppsV1DefaultGCPolicy(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-default-gc-v1", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -1014,14 +1021,14 @@ func TestReplicaSetsAppsV1DefaultGCPolicy(t *testing.T) {
}
rsClient := c.AppsV1().ReplicaSets(ns.Name)
err := rsClient.Delete(context.TODO(), rs.Name, metav1.DeleteOptions{})
err := rsClient.Delete(tCtx, rs.Name, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("Failed to delete rs: %v", err)
}
// Verify no new finalizer has been added
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newRS, err := rsClient.Get(context.TODO(), rs.Name, metav1.GetOptions{})
newRS, err := rsClient.Get(tCtx, rs.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
@@ -1047,5 +1054,5 @@ func TestReplicaSetsAppsV1DefaultGCPolicy(t *testing.T) {
rs.Finalizers = finalizers
})
rsClient.Delete(context.TODO(), rs.Name, metav1.DeleteOptions{})
_ = rsClient.Delete(tCtx, rs.Name, metav1.DeleteOptions{})
}

View File

@@ -38,12 +38,12 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
)
const (
@@ -111,7 +111,8 @@ func newMatchingPod(podName, namespace string) *v1.Pod {
}
}
func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *replication.ReplicationManager, informers.SharedInformerFactory, clientset.Interface) {
func rmSetup(t *testing.T) (context.Context, kubeapiservertesting.TearDownFunc, *replication.ReplicationManager, informers.SharedInformerFactory, clientset.Interface) {
tCtx := ktesting.Init(t)
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
@@ -123,16 +124,19 @@ func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *replication.Repl
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "rc-informers")), resyncPeriod)
logger, _ := ktesting.NewTestContext(t)
rm := replication.NewReplicationManager(
logger,
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().ReplicationControllers(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "replication-controller")),
replication.BurstReplicas,
)
newTeardown := func() {
tCtx.Cancel("tearing down controller")
server.TearDownFn()
}
return server.TearDownFn, rm, informers, clientSet
return tCtx, newTeardown, rm, informers, clientSet
}
// Run RC controller and informers
@@ -414,7 +418,7 @@ func TestAdoption(t *testing.T) {
}
for i, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
closeFn, rm, informers, clientSet := rmSetup(t)
tCtx, closeFn, rm, informers, clientSet := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientSet, fmt.Sprintf("rc-adoption-%d", i), t)
defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
@@ -422,14 +426,14 @@ func TestAdoption(t *testing.T) {
rcClient := clientSet.CoreV1().ReplicationControllers(ns.Name)
podClient := clientSet.CoreV1().Pods(ns.Name)
const rcName = "rc"
rc, err := rcClient.Create(context.TODO(), newRC(rcName, ns.Name, 1), metav1.CreateOptions{})
rc, err := rcClient.Create(tCtx, newRC(rcName, ns.Name, 1), metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create replication controllers: %v", err)
}
podName := fmt.Sprintf("pod%d", i)
pod := newMatchingPod(podName, ns.Name)
pod.OwnerReferences = tc.existingOwnerReferences(rc)
_, err = podClient.Create(context.TODO(), pod, metav1.CreateOptions{})
_, err = podClient.Create(tCtx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create Pod: %v", err)
}
@@ -437,7 +441,7 @@ func TestAdoption(t *testing.T) {
stopControllers := runControllerAndInformers(t, rm, informers, 1)
defer stopControllers()
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
updatedPod, err := podClient.Get(context.TODO(), pod.Name, metav1.GetOptions{})
updatedPod, err := podClient.Get(tCtx, pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
@@ -457,7 +461,7 @@ func TestAdoption(t *testing.T) {
}
func TestSpecReplicasChange(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-spec-replicas-change", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -488,7 +492,7 @@ func TestSpecReplicasChange(t *testing.T) {
}
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newRC, err := rcClient.Get(context.TODO(), rc.Name, metav1.GetOptions{})
newRC, err := rcClient.Get(tCtx, rc.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
@@ -500,7 +504,7 @@ func TestSpecReplicasChange(t *testing.T) {
func TestLogarithmicScaleDown(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LogarithmicScaleDown, true)()
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-spec-replicas-change", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -513,7 +517,7 @@ func TestLogarithmicScaleDown(t *testing.T) {
waitRCStable(t, c, rc)
// get list of pods in the cluster
pods, err := c.CoreV1().Pods(ns.Name).List(context.TODO(), metav1.ListOptions{})
pods, err := c.CoreV1().Pods(ns.Name).List(tCtx, metav1.ListOptions{})
if err != nil {
t.Fatalf("failed to get pods in namespace %s: %+v", ns.Name, err)
}
@@ -526,7 +530,7 @@ func TestLogarithmicScaleDown(t *testing.T) {
// (meaning the 3rd one was deleted)
scaleRC(t, c, rc, 2)
newPods, err := c.CoreV1().Pods(ns.Name).List(context.TODO(), metav1.ListOptions{})
newPods, err := c.CoreV1().Pods(ns.Name).List(tCtx, metav1.ListOptions{})
if err != nil {
t.Fatalf("failed to get pods in namespace %s: %+v", ns.Name, err)
}
@@ -537,7 +541,7 @@ func TestLogarithmicScaleDown(t *testing.T) {
}
func TestDeletingAndFailedPods(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-deleting-and-failed-pods", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -562,7 +566,7 @@ func TestDeletingAndFailedPods(t *testing.T) {
updatePod(t, podClient, deletingPod.Name, func(pod *v1.Pod) {
pod.Finalizers = []string{"fake.example.com/blockDeletion"}
})
if err := c.CoreV1().Pods(ns.Name).Delete(context.TODO(), deletingPod.Name, metav1.DeleteOptions{}); err != nil {
if err := c.CoreV1().Pods(ns.Name).Delete(tCtx, deletingPod.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Error deleting pod %s: %v", deletingPod.Name, err)
}
@@ -602,7 +606,7 @@ func TestDeletingAndFailedPods(t *testing.T) {
}
func TestOverlappingRCs(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-overlapping-rcs", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -626,7 +630,7 @@ func TestOverlappingRCs(t *testing.T) {
// Expect both RCs have .status.replicas = .spec.replicas
for i := 0; i < 2; i++ {
newRC, err := c.CoreV1().ReplicationControllers(ns.Name).Get(context.TODO(), fmt.Sprintf("rc-%d", i+1), metav1.GetOptions{})
newRC, err := c.CoreV1().ReplicationControllers(ns.Name).Get(tCtx, fmt.Sprintf("rc-%d", i+1), metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to obtain rc rc-%d: %v", i+1, err)
}
@@ -637,7 +641,7 @@ func TestOverlappingRCs(t *testing.T) {
}
func TestPodOrphaningAndAdoptionWhenLabelsChange(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-pod-orphaning-and-adoption-when-labels-change", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -666,7 +670,7 @@ func TestPodOrphaningAndAdoptionWhenLabelsChange(t *testing.T) {
pod.Labels = newLabelMap
})
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newPod, err := podClient.Get(context.TODO(), pod.Name, metav1.GetOptions{})
newPod, err := podClient.Get(tCtx, pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
@@ -681,7 +685,7 @@ func TestPodOrphaningAndAdoptionWhenLabelsChange(t *testing.T) {
pod.Labels = labelMap()
})
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newPod, err := podClient.Get(context.TODO(), pod.Name, metav1.GetOptions{})
newPod, err := podClient.Get(tCtx, pod.Name, metav1.GetOptions{})
if err != nil {
// If the pod is not found, it means the RC picks the pod for deletion (it is extra)
// Verify there is only one pod in namespace and it has ControllerRef to the RC
@@ -715,7 +719,7 @@ func TestPodOrphaningAndAdoptionWhenLabelsChange(t *testing.T) {
}
func TestGeneralPodAdoption(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
_, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-general-pod-adoption", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -747,7 +751,7 @@ func TestGeneralPodAdoption(t *testing.T) {
}
func TestReadyAndAvailableReplicas(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-ready-and-available-replicas", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -787,7 +791,7 @@ func TestReadyAndAvailableReplicas(t *testing.T) {
rcClient := c.CoreV1().ReplicationControllers(ns.Name)
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newRC, err := rcClient.Get(context.TODO(), rc.Name, metav1.GetOptions{})
newRC, err := rcClient.Get(tCtx, rc.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
@@ -799,7 +803,7 @@ func TestReadyAndAvailableReplicas(t *testing.T) {
}
func TestRCScaleSubresource(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
_, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-rc-scale-subresource", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -818,7 +822,7 @@ func TestRCScaleSubresource(t *testing.T) {
}
func TestExtraPodsAdoptionAndDeletion(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
_, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-extra-pods-adoption-and-deletion", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -850,7 +854,7 @@ func TestExtraPodsAdoptionAndDeletion(t *testing.T) {
}
func TestFullyLabeledReplicas(t *testing.T) {
closeFn, rm, informers, c := rmSetup(t)
tCtx, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-fully-labeled-replicas", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@@ -882,7 +886,7 @@ func TestFullyLabeledReplicas(t *testing.T) {
// Verify only one pod is fully labeled
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newRC, err := rcClient.Get(context.TODO(), rc.Name, metav1.GetOptions{})
newRC, err := rcClient.Get(tCtx, rc.Name, metav1.GetOptions{})
if err != nil {
return false, err
}

View File

@@ -19,7 +19,6 @@ package taint
// This file tests the Taint feature.
import (
"context"
"fmt"
"testing"
"time"
@@ -85,7 +84,7 @@ func TestTaintNodeByCondition(t *testing.T) {
// Start NodeLifecycleController for taint.
nc, err := nodelifecycle.NewNodeLifecycleController(
context.TODO(),
testCtx.Ctx,
externalInformers.Coordination().V1().Leases(),
externalInformers.Core().V1().Pods(),
externalInformers.Core().V1().Nodes(),

View File

@@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/controlplane/controller/defaultservicecidr"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
)
// TestMigrateServiceCIDR validates the steps necessary to migrate a cluster default ServiceCIDR
@@ -49,8 +50,7 @@ import (
// 7. delete the kubernetes.default service, the new apiserver will recreate it within the new ServiceCIDR
func TestMigrateServiceCIDR(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRServiceAllocator, true)()
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
tCtx := ktesting.Init(t)
cidr1 := "192.168.0.0/29"
cidr2 := "10.168.0.0/24"
@@ -78,11 +78,12 @@ func TestMigrateServiceCIDR(t *testing.T) {
informers1 := informers.NewSharedInformerFactory(client1, resyncPeriod)
// ServiceCIDR controller
go servicecidrs.NewController(
tCtx,
informers1.Networking().V1alpha1().ServiceCIDRs(),
informers1.Networking().V1alpha1().IPAddresses(),
client1,
).Run(ctx, 5)
informers1.Start(ctx.Done())
).Run(tCtx, 5)
informers1.Start(tCtx.Done())
// the default serviceCIDR should have a finalizer and ready condition set to true
if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, time.Minute, false, func(ctx context.Context) (bool, error) {
@@ -203,19 +204,20 @@ func TestMigrateServiceCIDR(t *testing.T) {
defer framework.DeleteNamespaceOrDie(client2, ns, t)
// switch the controller to the new apiserver
cancelFn()
tCtx.Cancel("tearing down ServiceCIDR controller 1")
s1.TearDownFn()
// ServiceCIDR controller
ctx2, cancelFn2 := context.WithCancel(context.Background())
defer cancelFn2()
tCtx2 := ktesting.Init(t)
defer tCtx.Cancel("tearing down ServiceCIDR controller 2")
informers2 := informers.NewSharedInformerFactory(client2, resyncPeriod)
go servicecidrs.NewController(
tCtx2,
informers2.Networking().V1alpha1().ServiceCIDRs(),
informers2.Networking().V1alpha1().IPAddresses(),
client2,
).Run(ctx2, 5)
informers2.Start(ctx2.Done())
).Run(tCtx2, 5)
informers2.Start(tCtx2.Done())
// delete the kubernetes.default service so the old DefaultServiceCIDR can be deleted
// and the new apiserver can take over

View File

@@ -65,6 +65,7 @@ func TestServiceAllocNewServiceCIDR(t *testing.T) {
resyncPeriod := 12 * time.Hour
informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
go servicecidrs.NewController(
ctx,
informerFactory.Networking().V1alpha1().ServiceCIDRs(),
informerFactory.Networking().V1alpha1().IPAddresses(),
client,
@@ -165,6 +166,7 @@ func TestServiceCIDRDeletion(t *testing.T) {
resyncPeriod := 12 * time.Hour
informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
go servicecidrs.NewController(
ctx,
informerFactory.Networking().V1alpha1().ServiceCIDRs(),
informerFactory.Networking().V1alpha1().IPAddresses(),
client,

View File

@@ -37,13 +37,13 @@ import (
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/statefulset"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
@@ -126,12 +126,11 @@ func TestVolumeTemplateNoopUpdate(t *testing.T) {
}
func TestSpecReplicasChange(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
closeFn, rm, informers, c := scSetup(ctx, t)
tCtx, closeFn, rm, informers, c := scSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-spec-replicas-change", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
cancel := runControllerAndInformers(rm, informers)
cancel := runControllerAndInformers(tCtx, rm, informers)
defer cancel()
createHeadlessService(t, c, newHeadlessService(ns.Name))
@@ -170,12 +169,11 @@ func TestSpecReplicasChange(t *testing.T) {
}
func TestDeletingAndTerminatingPods(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
closeFn, rm, informers, c := scSetup(ctx, t)
tCtx, closeFn, rm, informers, c := scSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-deleting-and-failed-pods", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
cancel := runControllerAndInformers(rm, informers)
cancel := runControllerAndInformers(tCtx, rm, informers)
defer cancel()
podCount := 3
@@ -289,12 +287,11 @@ func TestStatefulSetAvailable(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
closeFn, rm, informers, c := scSetup(ctx, t)
tCtx, closeFn, rm, informers, c := scSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-available-pods", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
cancel := runControllerAndInformers(rm, informers)
cancel := runControllerAndInformers(tCtx, rm, informers)
defer cancel()
labelMap := labelMap()
@@ -380,12 +377,9 @@ func setPodsReadyCondition(t *testing.T, clientSet clientset.Interface, pods *v1
// add for issue: https://github.com/kubernetes/kubernetes/issues/108837
func TestStatefulSetStatusWithPodFail(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tCtx := ktesting.Init(t)
limitedPodNumber := 2
c, config, closeFn := framework.StartTestServer(ctx, t, framework.TestServerSetup{
c, config, closeFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
ModifyServerConfig: func(config *controlplane.Config) {
config.GenericConfig.AdmissionControl = &fakePodFailAdmission{
limitedPodNumber: limitedPodNumber,
@@ -393,11 +387,11 @@ func TestStatefulSetStatusWithPodFail(t *testing.T) {
},
})
defer closeFn()
defer tCtx.Cancel("test has completed")
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod)
ssc := statefulset.NewStatefulSetController(
ctx,
tCtx,
informers.Core().V1().Pods(),
informers.Apps().V1().StatefulSets(),
informers.Core().V1().PersistentVolumeClaims(),
@@ -408,11 +402,11 @@ func TestStatefulSetStatusWithPodFail(t *testing.T) {
ns := framework.CreateNamespaceOrDie(c, "test-pod-fail", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
informers.Start(ctx.Done())
go ssc.Run(ctx, 5)
informers.Start(tCtx.Done())
go ssc.Run(tCtx, 5)
sts := newSTS("sts", ns.Name, 4)
_, err := c.AppsV1().StatefulSets(sts.Namespace).Create(ctx, sts, metav1.CreateOptions{})
_, err := c.AppsV1().StatefulSets(sts.Namespace).Create(tCtx, sts, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Could not create statefulSet %s: %v", sts.Name, err)
}
@@ -420,7 +414,7 @@ func TestStatefulSetStatusWithPodFail(t *testing.T) {
wantReplicas := limitedPodNumber
var gotReplicas int32
if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
newSTS, err := c.AppsV1().StatefulSets(sts.Namespace).Get(ctx, sts.Name, metav1.GetOptions{})
newSTS, err := c.AppsV1().StatefulSets(sts.Namespace).Get(tCtx, sts.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
@@ -477,10 +471,10 @@ func TestAutodeleteOwnerRefs(t *testing.T) {
}
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
_, ctx := ktesting.NewTestContext(t)
closeFn, rm, informers, c := scSetup(ctx, t)
tCtx, closeFn, rm, informers, c := scSetup(t)
defer closeFn()
cancel := runControllerAndInformers(rm, informers)
cancel := runControllerAndInformers(tCtx, rm, informers)
defer cancel()
for _, test := range tests {
@@ -521,12 +515,11 @@ func TestAutodeleteOwnerRefs(t *testing.T) {
}
func TestDeletingPodForRollingUpdatePartition(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
closeFn, rm, informers, c := scSetup(ctx, t)
tCtx, closeFn, rm, informers, c := scSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-deleting-pod-for-rolling-update-partition", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
cancel := runControllerAndInformers(rm, informers)
cancel := runControllerAndInformers(tCtx, rm, informers)
defer cancel()
labelMap := labelMap()
@@ -570,7 +563,7 @@ func TestDeletingPodForRollingUpdatePartition(t *testing.T) {
})
// Await for the pod-1 to be recreated, while pod-0 remains running
if err := wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextTimeout(tCtx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
ss, err := stsClient.Get(ctx, sts.Name, metav1.GetOptions{})
if err != nil {
return false, err
@@ -599,7 +592,7 @@ func TestDeletingPodForRollingUpdatePartition(t *testing.T) {
}
// Await for pod-0 to be not ready
if err := wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextTimeout(tCtx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
ss, err := stsClient.Get(ctx, sts.Name, metav1.GetOptions{})
if err != nil {
return false, err
@@ -615,7 +608,7 @@ func TestDeletingPodForRollingUpdatePartition(t *testing.T) {
})
// Await for pod-0 to be recreated and make it running
if err := wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextTimeout(tCtx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
pods := getPods(t, podClient, labelMap)
recreatedPods := v1.PodList{}
for _, pod := range pods.Items {
@@ -630,7 +623,7 @@ func TestDeletingPodForRollingUpdatePartition(t *testing.T) {
}
// Await for all stateful set status to record all replicas as ready
if err := wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextTimeout(tCtx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
ss, err := stsClient.Get(ctx, sts.Name, metav1.GetOptions{})
if err != nil {
return false, err
@@ -702,10 +695,9 @@ func TestStatefulSetStartOrdinal(t *testing.T) {
}
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetStartOrdinal, true)()
_, ctx := ktesting.NewTestContext(t)
closeFn, rm, informers, c := scSetup(ctx, t)
tCtx, closeFn, rm, informers, c := scSetup(t)
defer closeFn()
cancel := runControllerAndInformers(rm, informers)
cancel := runControllerAndInformers(tCtx, rm, informers)
defer cancel()
for _, test := range tests {

View File

@@ -19,6 +19,7 @@ package statefulset
import (
"context"
"fmt"
"k8s.io/kubernetes/test/utils/ktesting"
"sync"
"testing"
"time"
@@ -161,7 +162,8 @@ func newStatefulSetPVC(name string) v1.PersistentVolumeClaim {
}
// scSetup sets up necessities for Statefulset integration test, including control plane, apiserver, informers, and clientset
func scSetup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) {
func scSetup(t *testing.T) (context.Context, kubeapiservertesting.TearDownFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) {
tCtx := ktesting.Init(t)
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
@@ -174,7 +176,7 @@ func scSetup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFu
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod)
sc := statefulset.NewStatefulSetController(
ctx,
tCtx,
informers.Core().V1().Pods(),
informers.Apps().V1().StatefulSets(),
informers.Core().V1().PersistentVolumeClaims(),
@@ -182,12 +184,16 @@ func scSetup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFu
clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-controller")),
)
return server.TearDownFn, sc, informers, clientSet
teardown := func() {
tCtx.Cancel("tearing down controller")
server.TearDownFn()
}
return tCtx, teardown, sc, informers, clientSet
}
// Run STS controller and informers
func runControllerAndInformers(sc *statefulset.StatefulSetController, informers informers.SharedInformerFactory) context.CancelFunc {
ctx, cancel := context.WithCancel(context.Background())
func runControllerAndInformers(ctx context.Context, sc *statefulset.StatefulSetController, informers informers.SharedInformerFactory) context.CancelFunc {
ctx, cancel := context.WithCancel(ctx)
informers.Start(ctx.Done())
go sc.Run(ctx, 5)
return cancel

View File

@@ -203,6 +203,7 @@ func CreateGCController(ctx context.Context, tb ktesting.TB, restConfig restclie
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := garbagecollector.NewGarbageCollector(
ctx,
clientSet,
metadataClient,
restMapper,
@@ -660,7 +661,6 @@ func PodScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond
// InitDisruptionController initializes and runs a Disruption Controller to properly
// update PodDisuptionBudget objects.
func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.DisruptionController {
_, ctx := ktesting.NewTestContext(t)
informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour)
discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery())
@@ -674,7 +674,7 @@ func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.Di
}
dc := disruption.NewDisruptionController(
ctx,
testCtx.Ctx,
informers.Core().V1().Pods(),
informers.Policy().V1().PodDisruptionBudgets(),
informers.Core().V1().ReplicationControllers(),

View File

@@ -31,7 +31,6 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
fakecloud "k8s.io/cloud-provider/fake"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
@@ -41,6 +40,7 @@ import (
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
)
func fakePodWithVol(namespace string) *v1.Pod {
@@ -156,36 +156,35 @@ func TestPodDeletionWithDswp(t *testing.T) {
},
}
testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
defer framework.DeleteNamespaceOrDie(testClient, ns, t)
pod := fakePodWithVol(namespaceName)
if _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil {
if _, err := testClient.CoreV1().Nodes().Create(tCtx, node, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to created node : %v", err)
}
// start controller loop
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go informers.Core().V1().Nodes().Informer().Run(ctx.Done())
if _, err := testClient.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
go informers.Core().V1().Nodes().Informer().Run(tCtx.Done())
if _, err := testClient.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}); err != nil {
t.Errorf("Failed to create pod : %v", err)
}
podInformer := informers.Core().V1().Pods().Informer()
go podInformer.Run(ctx.Done())
go podInformer.Run(tCtx.Done())
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(ctx.Done())
go informers.Core().V1().PersistentVolumes().Informer().Run(ctx.Done())
go informers.Storage().V1().VolumeAttachments().Informer().Run(ctx.Done())
initCSIObjects(ctx.Done(), informers)
go ctrl.Run(ctx)
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(tCtx.Done())
go informers.Core().V1().PersistentVolumes().Informer().Run(tCtx.Done())
go informers.Storage().V1().VolumeAttachments().Informer().Run(tCtx.Done())
initCSIObjects(tCtx.Done(), informers)
go ctrl.Run(tCtx)
// Run pvCtrl to avoid leaking goroutines started during its creation.
go pvCtrl.Run(ctx)
go pvCtrl.Run(tCtx)
waitToObservePods(t, podInformer, 1)
podKey, err := cache.MetaNamespaceKeyFunc(pod)
@@ -231,7 +230,9 @@ func TestPodUpdateWithWithADC(t *testing.T) {
},
}
testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
defer framework.DeleteNamespaceOrDie(testClient, ns, t)
@@ -254,16 +255,13 @@ func TestPodUpdateWithWithADC(t *testing.T) {
go podInformer.Run(podStopCh)
// start controller loop
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(ctx.Done())
go informers.Core().V1().PersistentVolumes().Informer().Run(ctx.Done())
go informers.Storage().V1().VolumeAttachments().Informer().Run(ctx.Done())
initCSIObjects(ctx.Done(), informers)
go ctrl.Run(ctx)
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(tCtx.Done())
go informers.Core().V1().PersistentVolumes().Informer().Run(tCtx.Done())
go informers.Storage().V1().VolumeAttachments().Informer().Run(tCtx.Done())
initCSIObjects(tCtx.Done(), informers)
go ctrl.Run(tCtx)
// Run pvCtrl to avoid leaking goroutines started during its creation.
go pvCtrl.Run(ctx)
go pvCtrl.Run(tCtx)
waitToObservePods(t, podInformer, 1)
podKey, err := cache.MetaNamespaceKeyFunc(pod)
@@ -304,7 +302,9 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
},
}
testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
defer framework.DeleteNamespaceOrDie(testClient, ns, t)
@@ -327,16 +327,13 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
go podInformer.Run(podStopCh)
// start controller loop
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(ctx.Done())
go informers.Core().V1().PersistentVolumes().Informer().Run(ctx.Done())
go informers.Storage().V1().VolumeAttachments().Informer().Run(ctx.Done())
initCSIObjects(ctx.Done(), informers)
go ctrl.Run(ctx)
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(tCtx.Done())
go informers.Core().V1().PersistentVolumes().Informer().Run(tCtx.Done())
go informers.Storage().V1().VolumeAttachments().Informer().Run(tCtx.Done())
initCSIObjects(tCtx.Done(), informers)
go ctrl.Run(tCtx)
// Run pvCtrl to avoid leaking goroutines started during its creation.
go pvCtrl.Run(ctx)
go pvCtrl.Run(tCtx)
waitToObservePods(t, podInformer, 1)
podKey, err := cache.MetaNamespaceKeyFunc(pod)
@@ -402,7 +399,7 @@ func waitForPodFuncInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld, ch
}
}
func createAdClients(t *testing.T, server *kubeapiservertesting.TestServer, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, clientgoinformers.SharedInformerFactory) {
func createAdClients(ctx context.Context, t *testing.T, server *kubeapiservertesting.TestServer, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, clientgoinformers.SharedInformerFactory) {
config := restclient.CopyConfig(server.ClientConfig)
config.QPS = 1000000
config.Burst = 1000000
@@ -425,9 +422,8 @@ func createAdClients(t *testing.T, server *kubeapiservertesting.TestServer, sync
plugins := []volume.VolumePlugin{plugin}
cloud := &fakecloud.Cloud{}
informers := clientgoinformers.NewSharedInformerFactory(testClient, resyncPeriod)
logger, ctx := ktesting.NewTestContext(t)
ctrl, err := attachdetach.NewAttachDetachController(
logger,
ctx,
testClient,
informers.Core().V1().Pods(),
informers.Core().V1().Nodes(),
@@ -488,7 +484,10 @@ func TestPodAddedByDswp(t *testing.T) {
},
},
}
testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
defer framework.DeleteNamespaceOrDie(testClient, ns, t)
@@ -496,13 +495,13 @@ func TestPodAddedByDswp(t *testing.T) {
pod := fakePodWithVol(namespaceName)
podStopCh := make(chan struct{})
if _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil {
if _, err := testClient.CoreV1().Nodes().Create(tCtx, node, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to created node : %v", err)
}
go informers.Core().V1().Nodes().Informer().Run(podStopCh)
if _, err := testClient.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
if _, err := testClient.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}); err != nil {
t.Errorf("Failed to create pod : %v", err)
}
@@ -510,17 +509,13 @@ func TestPodAddedByDswp(t *testing.T) {
go podInformer.Run(podStopCh)
// start controller loop
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(ctx.Done())
go informers.Core().V1().PersistentVolumes().Informer().Run(ctx.Done())
go informers.Storage().V1().VolumeAttachments().Informer().Run(ctx.Done())
initCSIObjects(ctx.Done(), informers)
go ctrl.Run(ctx)
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(tCtx.Done())
go informers.Core().V1().PersistentVolumes().Informer().Run(tCtx.Done())
go informers.Storage().V1().VolumeAttachments().Informer().Run(tCtx.Done())
initCSIObjects(tCtx.Done(), informers)
go ctrl.Run(tCtx)
// Run pvCtrl to avoid leaking goroutines started during its creation.
go pvCtrl.Run(ctx)
go pvCtrl.Run(tCtx)
waitToObservePods(t, podInformer, 1)
podKey, err := cache.MetaNamespaceKeyFunc(pod)
@@ -556,9 +551,13 @@ func TestPVCBoundWithADC(t *testing.T) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
defer server.TearDownFn()
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
namespaceName := "test-pod-deletion"
testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, attachdetach.TimerConfig{
testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, attachdetach.TimerConfig{
ReconcilerLoopPeriod: 100 * time.Millisecond,
ReconcilerMaxWaitForUnmountDuration: 6 * time.Second,
DesiredStateOfWorldPopulatorLoopSleepPeriod: 24 * time.Hour,
@@ -601,14 +600,11 @@ func TestPVCBoundWithADC(t *testing.T) {
}
// start controller loop
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
informers.WaitForCacheSync(ctx.Done())
initCSIObjects(ctx.Done(), informers)
go ctrl.Run(ctx)
go pvCtrl.Run(ctx)
informers.Start(tCtx.Done())
informers.WaitForCacheSync(tCtx.Done())
initCSIObjects(tCtx.Done(), informers)
go ctrl.Run(tCtx)
go pvCtrl.Run(tCtx)
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4)
// Give attachdetach controller enough time to populate pods into DSWP.

View File

@@ -42,9 +42,9 @@ import (
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
)
// Several tests in this file are configurable by environment variables:
@@ -114,7 +114,10 @@ func TestPersistentVolumeRecycler(t *testing.T) {
defer s.TearDownFn()
namespaceName := "pv-recycler"
testClient, ctrl, informers, watchPV, watchPVC := createClients(namespaceName, t, s, defaultSyncPeriod)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, ctrl, informers, watchPV, watchPVC := createClients(tCtx, namespaceName, t, s, defaultSyncPeriod)
defer watchPV.Stop()
defer watchPVC.Stop()
@@ -125,10 +128,8 @@ func TestPersistentVolumeRecycler(t *testing.T) {
// non-namespaced objects (PersistenceVolumes).
defer testClient.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
ctx, cancel := context.WithCancel(context.TODO())
informers.Start(ctx.Done())
go ctrl.Run(ctx)
defer cancel()
informers.Start(tCtx.Done())
go ctrl.Run(tCtx)
// This PV will be claimed, released, and recycled.
pv := createPV("fake-pv-recycler", "/tmp/foo", "10G", []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, v1.PersistentVolumeReclaimRecycle)
@@ -170,7 +171,9 @@ func TestPersistentVolumeDeleter(t *testing.T) {
defer s.TearDownFn()
namespaceName := "pv-deleter"
testClient, ctrl, informers, watchPV, watchPVC := createClients(namespaceName, t, s, defaultSyncPeriod)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, ctrl, informers, watchPV, watchPVC := createClients(tCtx, namespaceName, t, s, defaultSyncPeriod)
defer watchPV.Stop()
defer watchPVC.Stop()
@@ -181,10 +184,8 @@ func TestPersistentVolumeDeleter(t *testing.T) {
// non-namespaced objects (PersistenceVolumes).
defer testClient.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
ctx, cancel := context.WithCancel(context.TODO())
informers.Start(ctx.Done())
go ctrl.Run(ctx)
defer cancel()
informers.Start(tCtx.Done())
go ctrl.Run(tCtx)
// This PV will be claimed, released, and deleted.
pv := createPV("fake-pv-deleter", "/tmp/foo", "10G", []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, v1.PersistentVolumeReclaimDelete)
@@ -231,7 +232,9 @@ func TestPersistentVolumeBindRace(t *testing.T) {
defer s.TearDownFn()
namespaceName := "pv-bind-race"
testClient, ctrl, informers, watchPV, watchPVC := createClients(namespaceName, t, s, defaultSyncPeriod)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, ctrl, informers, watchPV, watchPVC := createClients(tCtx, namespaceName, t, s, defaultSyncPeriod)
defer watchPV.Stop()
defer watchPVC.Stop()
@@ -242,10 +245,8 @@ func TestPersistentVolumeBindRace(t *testing.T) {
// non-namespaced objects (PersistenceVolumes).
defer testClient.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
ctx, cancel := context.WithCancel(context.TODO())
informers.Start(ctx.Done())
go ctrl.Run(ctx)
defer cancel()
informers.Start(tCtx.Done())
go ctrl.Run(tCtx)
pv := createPV("fake-pv-race", "/tmp/foo", "10G", []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, v1.PersistentVolumeReclaimRetain)
pvc := createPVC("fake-pvc-race", ns.Name, "5G", []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, "")
@@ -302,7 +303,9 @@ func TestPersistentVolumeClaimLabelSelector(t *testing.T) {
defer s.TearDownFn()
namespaceName := "pvc-label-selector"
testClient, controller, informers, watchPV, watchPVC := createClients(namespaceName, t, s, defaultSyncPeriod)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, controller, informers, watchPV, watchPVC := createClients(tCtx, namespaceName, t, s, defaultSyncPeriod)
defer watchPV.Stop()
defer watchPVC.Stop()
@@ -313,10 +316,8 @@ func TestPersistentVolumeClaimLabelSelector(t *testing.T) {
// non-namespaced objects (PersistenceVolumes).
defer testClient.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
ctx, cancel := context.WithCancel(context.TODO())
informers.Start(ctx.Done())
go controller.Run(ctx)
defer cancel()
informers.Start(tCtx.Done())
go controller.Run(tCtx)
var (
err error
@@ -384,7 +385,9 @@ func TestPersistentVolumeClaimLabelSelectorMatchExpressions(t *testing.T) {
defer s.TearDownFn()
namespaceName := "pvc-match-expressions"
testClient, controller, informers, watchPV, watchPVC := createClients(namespaceName, t, s, defaultSyncPeriod)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, controller, informers, watchPV, watchPVC := createClients(tCtx, namespaceName, t, s, defaultSyncPeriod)
defer watchPV.Stop()
defer watchPVC.Stop()
@@ -395,10 +398,8 @@ func TestPersistentVolumeClaimLabelSelectorMatchExpressions(t *testing.T) {
// non-namespaced objects (PersistenceVolumes).
defer testClient.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
ctx, cancel := context.WithCancel(context.TODO())
informers.Start(ctx.Done())
go controller.Run(ctx)
defer cancel()
informers.Start(tCtx.Done())
go controller.Run(tCtx)
var (
err error
@@ -485,7 +486,9 @@ func TestPersistentVolumeMultiPVs(t *testing.T) {
defer s.TearDownFn()
namespaceName := "multi-pvs"
testClient, controller, informers, watchPV, watchPVC := createClients(namespaceName, t, s, defaultSyncPeriod)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, controller, informers, watchPV, watchPVC := createClients(tCtx, namespaceName, t, s, defaultSyncPeriod)
defer watchPV.Stop()
defer watchPVC.Stop()
@@ -496,10 +499,8 @@ func TestPersistentVolumeMultiPVs(t *testing.T) {
// non-namespaced objects (PersistenceVolumes).
defer testClient.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
ctx, cancel := context.WithCancel(context.TODO())
informers.Start(ctx.Done())
go controller.Run(ctx)
defer cancel()
informers.Start(tCtx.Done())
go controller.Run(tCtx)
maxPVs := getObjectCount()
pvs := make([]*v1.PersistentVolume, maxPVs)
@@ -576,7 +577,9 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) {
defer s.TearDownFn()
namespaceName := "multi-pvs-pvcs"
testClient, binder, informers, watchPV, watchPVC := createClients(namespaceName, t, s, defaultSyncPeriod)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, binder, informers, watchPV, watchPVC := createClients(tCtx, namespaceName, t, s, defaultSyncPeriod)
defer watchPV.Stop()
defer watchPVC.Stop()
@@ -587,10 +590,8 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) {
// non-namespaced objects (PersistenceVolumes).
defer testClient.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
ctx, cancel := context.WithCancel(context.TODO())
informers.Start(ctx.Done())
go binder.Run(ctx)
defer cancel()
informers.Start(tCtx.Done())
go binder.Run(tCtx)
objCount := getObjectCount()
pvs := make([]*v1.PersistentVolume, objCount)
@@ -742,7 +743,9 @@ func TestPersistentVolumeControllerStartup(t *testing.T) {
const shortSyncPeriod = 2 * time.Second
syncPeriod := getSyncPeriod(shortSyncPeriod)
testClient, binder, informers, watchPV, watchPVC := createClients(namespaceName, t, s, shortSyncPeriod)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, binder, informers, watchPV, watchPVC := createClients(tCtx, namespaceName, t, s, shortSyncPeriod)
defer watchPV.Stop()
defer watchPVC.Stop()
@@ -801,10 +804,8 @@ func TestPersistentVolumeControllerStartup(t *testing.T) {
}
// Start the controller when all PVs and PVCs are already saved in etcd
ctx, cancel := context.WithCancel(context.TODO())
informers.Start(ctx.Done())
go binder.Run(ctx)
defer cancel()
informers.Start(tCtx.Done())
go binder.Run(tCtx)
// wait for at least two sync periods for changes. No volume should be
// Released and no claim should be Lost during this time.
@@ -867,7 +868,9 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) {
defer s.TearDownFn()
namespaceName := "provision-multi-pvs"
testClient, binder, informers, watchPV, watchPVC := createClients(namespaceName, t, s, defaultSyncPeriod)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, binder, informers, watchPV, watchPVC := createClients(tCtx, namespaceName, t, s, defaultSyncPeriod)
defer watchPV.Stop()
defer watchPVC.Stop()
@@ -890,10 +893,8 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) {
}
testClient.StorageV1().StorageClasses().Create(context.TODO(), &storageClass, metav1.CreateOptions{})
ctx, cancel := context.WithCancel(context.TODO())
informers.Start(ctx.Done())
go binder.Run(ctx)
defer cancel()
informers.Start(tCtx.Done())
go binder.Run(tCtx)
objCount := getObjectCount()
pvcs := make([]*v1.PersistentVolumeClaim, objCount)
@@ -963,7 +964,9 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) {
defer s.TearDownFn()
namespaceName := "multi-pvs-diff-access"
testClient, controller, informers, watchPV, watchPVC := createClients(namespaceName, t, s, defaultSyncPeriod)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, controller, informers, watchPV, watchPVC := createClients(tCtx, namespaceName, t, s, defaultSyncPeriod)
defer watchPV.Stop()
defer watchPVC.Stop()
@@ -974,10 +977,8 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) {
// non-namespaced objects (PersistenceVolumes).
defer testClient.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
ctx, cancel := context.WithCancel(context.TODO())
informers.Start(ctx.Done())
go controller.Run(ctx)
defer cancel()
informers.Start(tCtx.Done())
go controller.Run(tCtx)
// This PV will be claimed, released, and deleted
pvRwo := createPV("pv-rwo", "/tmp/foo", "10G",
@@ -1048,7 +1049,9 @@ func TestRetroactiveStorageClassAssignment(t *testing.T) {
defaultStorageClassName := "gold"
storageClassName := "silver"
testClient, binder, informers, watchPV, watchPVC := createClients(namespaceName, t, s, defaultSyncPeriod)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
testClient, binder, informers, watchPV, watchPVC := createClients(tCtx, namespaceName, t, s, defaultSyncPeriod)
defer watchPV.Stop()
defer watchPVC.Stop()
@@ -1078,10 +1081,8 @@ func TestRetroactiveStorageClassAssignment(t *testing.T) {
t.Errorf("Failed to create a storage class: %v", err)
}
ctx, cancel := context.WithCancel(context.TODO())
informers.Start(ctx.Done())
go binder.Run(ctx)
defer cancel()
informers.Start(tCtx.Done())
go binder.Run(tCtx)
klog.V(2).Infof("TestRetroactiveStorageClassAssignment: start")
@@ -1326,7 +1327,7 @@ func waitForPersistentVolumeClaimStorageClass(t *testing.T, claimName, scName st
}
}
func createClients(namespaceName string, t *testing.T, s *kubeapiservertesting.TestServer, syncPeriod time.Duration) (*clientset.Clientset, *persistentvolumecontroller.PersistentVolumeController, informers.SharedInformerFactory, watch.Interface, watch.Interface) {
func createClients(ctx context.Context, namespaceName string, t *testing.T, s *kubeapiservertesting.TestServer, syncPeriod time.Duration) (*clientset.Clientset, *persistentvolumecontroller.PersistentVolumeController, informers.SharedInformerFactory, watch.Interface, watch.Interface) {
// Use higher QPS and Burst, there is a test for race conditions which
// creates many objects and default values were too low.
binderConfig := restclient.CopyConfig(s.ClientConfig)
@@ -1354,7 +1355,6 @@ func createClients(namespaceName string, t *testing.T, s *kubeapiservertesting.T
plugins := []volume.VolumePlugin{plugin}
cloud := &fakecloud.Cloud{}
informers := informers.NewSharedInformerFactory(testClient, getSyncPeriod(syncPeriod))
_, ctx := ktesting.NewTestContext(t)
ctrl, err := persistentvolumecontroller.NewController(
ctx,
persistentvolumecontroller.ControllerParameters{

View File

@@ -27,7 +27,6 @@ import (
"time"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
@@ -1121,8 +1120,7 @@ func initPVController(t *testing.T, testCtx *testutil.TestContext, provisionDela
NodeInformer: informerFactory.Core().V1().Nodes(),
EnableDynamicProvisioning: true,
}
_, ctx := ktesting.NewTestContext(t)
ctrl, err := persistentvolume.NewController(ctx, params)
ctrl, err := persistentvolume.NewController(testCtx.Ctx, params)
if err != nil {
return nil, nil, err
}