diff --git a/test/e2e/dra/OWNERS b/test/e2e/dra/OWNERS new file mode 100644 index 00000000000..03cc9aff5b4 --- /dev/null +++ b/test/e2e/dra/OWNERS @@ -0,0 +1,11 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - klueska + - pohly +reviewers: + - klueska + - pohly + - bart0sh +labels: + - sig/node diff --git a/test/e2e/dra/README.md b/test/e2e/dra/README.md new file mode 100644 index 00000000000..94fd70d1808 --- /dev/null +++ b/test/e2e/dra/README.md @@ -0,0 +1,12 @@ +The tests in this directory cover dynamic resource allocation support in +Kubernetes. They do not test the correct behavior of arbitrary dynamic resource +allocation drivers. + +If such a driver is needed, then the in-tree test/e2e/dra/test-driver is used, +with a slight twist: instead of deploying that driver directly in the cluster, +the necessary sockets for interaction with kubelet (registration and dynamic +resource allocation) get proxied into the e2e.test binary. This reuses the work +done for CSI mock testing. The advantage is that no separate images are needed +for the test driver and that the e2e test has full control over all gRPC calls, +in case that it needs that for operations like error injection or checking +calls. diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go new file mode 100644 index 00000000000..7a6bc35f9d0 --- /dev/null +++ b/test/e2e/dra/deploy.go @@ -0,0 +1,339 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dra + +import ( + "bytes" + "context" + "errors" + "fmt" + "net" + "path" + "sort" + "sync" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "google.golang.org/grpc" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/dynamic-resource-allocation/kubeletplugin" + "k8s.io/klog/v2" + "k8s.io/kubernetes/test/e2e/dra/test-driver/app" + "k8s.io/kubernetes/test/e2e/framework" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2ereplicaset "k8s.io/kubernetes/test/e2e/framework/replicaset" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + "k8s.io/kubernetes/test/e2e/storage/drivers/proxy" + "k8s.io/kubernetes/test/e2e/storage/utils" +) + +const ( + NodePrepareResourceMethod = "/v1alpha1.Node/NodePrepareResource" + NodeUnprepareResourceMethod = "/v1alpha1.Node/NodeUnprepareResource" +) + +type Nodes struct { + NodeNames []string +} + +// NewNodes selects nodes to run the test on. +func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes { + nodes := &Nodes{} + ginkgo.BeforeEach(func() { + ginkgo.By("selecting nodes") + // The kubelet plugin is harder. We deploy the builtin manifest + // after patching in the driver name and all nodes on which we + // want the plugin to run. + // + // Only a subset of the nodes are picked to avoid causing + // unnecessary load on a big cluster. + nodeList, err := e2enode.GetBoundedReadySchedulableNodes(f.ClientSet, maxNodes) + framework.ExpectNoError(err, "get nodes") + numNodes := int32(len(nodeList.Items)) + if int(numNodes) < minNodes { + e2eskipper.Skipf("%d ready nodes required, only have %d", minNodes, numNodes) + } + nodes.NodeNames = nil + for _, node := range nodeList.Items { + nodes.NodeNames = append(nodes.NodeNames, node.Name) + } + framework.Logf("testing on nodes %v", nodes.NodeNames) + }) + return nodes +} + +// NewDriver sets up controller (as client of the cluster) and +// kubelet plugin (via proxy) before the test runs. It cleans +// up after the test. +func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() app.Resources) *Driver { + d := &Driver{ + f: f, + fail: map[MethodInstance]bool{}, + callCounts: map[MethodInstance]int64{}, + } + + ginkgo.BeforeEach(func() { + resources := configureResources() + if len(resources.Nodes) == 0 { + // This always has to be set because the driver might + // not run on all nodes. + resources.Nodes = nodes.NodeNames + } + d.SetUp(nodes, resources) + ginkgo.DeferCleanup(d.TearDown) + }) + return d +} + +type MethodInstance struct { + Nodename string + FullMethod string +} + +type Driver struct { + f *framework.Framework + ctx context.Context + cleanup []func() // executed first-in-first-out + wg sync.WaitGroup + + NameSuffix string + Controller *app.ExampleController + Name string + Nodes map[string]*app.ExamplePlugin + + mutex sync.Mutex + fail map[MethodInstance]bool + callCounts map[MethodInstance]int64 +} + +func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) { + ginkgo.By(fmt.Sprintf("deploying driver on nodes %v", nodes.NodeNames)) + d.Nodes = map[string]*app.ExamplePlugin{} + d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io" + + ctx, cancel := context.WithCancel(context.Background()) + if d.NameSuffix != "" { + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "instance"+d.NameSuffix) + ctx = klog.NewContext(ctx, logger) + } + d.ctx = ctx + d.cleanup = append(d.cleanup, cancel) + + // The controller is easy: we simply connect to the API server. It + // would be slightly nicer if we had a way to wait for all goroutines, but + // SharedInformerFactory has no API for that. At least we can wait + // for our own goroutine to stop once the context gets cancelled. + d.Controller = app.NewController(d.f.ClientSet, d.Name, resources) + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.Controller.Run(d.ctx, 5 /* workers */) + }() + + manifests := []string{ + // The code below matches the content of this manifest (ports, + // container names, etc.). + "test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml", + } + instanceKey := "app.kubernetes.io/instance" + rsName := "" + draAddr := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name+".sock") + numNodes := int32(len(nodes.NodeNames)) + undeploy, err := utils.CreateFromManifests(d.f, d.f.Namespace, func(item interface{}) error { + switch item := item.(type) { + case *appsv1.ReplicaSet: + item.Name += d.NameSuffix + rsName = item.Name + item.Spec.Replicas = &numNodes + item.Spec.Selector.MatchLabels[instanceKey] = d.Name + item.Spec.Template.Labels[instanceKey] = d.Name + item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = d.Name + item.Spec.Template.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "kubernetes.io/hostname", + Operator: v1.NodeSelectorOpIn, + Values: nodes.NodeNames, + }, + }, + }, + }, + }, + } + item.Spec.Template.Spec.Volumes[0].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins") + item.Spec.Template.Spec.Volumes[2].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins_registry") + item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint=/plugins_registry/"+d.Name+"-reg.sock") + item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint=/dra/"+d.Name+".sock") + } + return nil + }, manifests...) + framework.ExpectNoError(err, "deploy kubelet plugin replicaset") + d.cleanup = append(d.cleanup, undeploy) + + rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{}) + framework.ExpectNoError(err, "get replicaset") + + // Wait for all pods to be running. + if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(d.f.ClientSet, rs, numNodes); err != nil { + framework.ExpectNoError(err, "all kubelet plugin proxies running") + } + requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{d.Name}) + framework.ExpectNoError(err, "create label selector requirement") + selector := labels.NewSelector().Add(*requirement) + pods, err := d.f.ClientSet.CoreV1().Pods(d.f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: selector.String()}) + framework.ExpectNoError(err, "list proxy pods") + framework.ExpectEqual(numNodes, int32(len(pods.Items)), "number of proxy pods") + + // Run registar and plugin for each of the pods. + for _, pod := range pods.Items { + // Need a local variable, not the loop variable, for the anonymous + // callback functions below. + pod := pod + nodename := pod.Spec.NodeName + logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod)) + plugin, err := app.StartPlugin(logger, "/cdi", d.Name, nodename, + app.FileOperations{ + Create: func(name string, content []byte) error { + ginkgo.By(fmt.Sprintf("creating CDI file %s on node %s:\n%s", name, nodename, string(content))) + return d.createFile(&pod, name, content) + }, + Remove: func(name string) error { + ginkgo.By(fmt.Sprintf("deleting CDI file %s on node %s", name, nodename)) + return d.removeFile(&pod, name) + }, + }, + kubeletplugin.GRPCVerbosity(0), + kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return d.interceptor(nodename, ctx, req, info, handler) + }), + kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)), + kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)), + kubeletplugin.KubeletPluginSocketPath(draAddr), + ) + framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) + d.cleanup = append(d.cleanup, func() { + // Depends on cancel being called first. + plugin.Stop() + }) + d.Nodes[nodename] = plugin + } + + // Wait for registration. + ginkgo.By("wait for plugin registration") + gomega.Eventually(func() []string { + var notRegistered []string + for nodename, plugin := range d.Nodes { + if !plugin.IsRegistered() { + notRegistered = append(notRegistered, nodename) + } + } + sort.Strings(notRegistered) + return notRegistered + }).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "hosts where the plugin has not been registered yet") +} + +func (d *Driver) createFile(pod *v1.Pod, name string, content []byte) error { + buffer := bytes.NewBuffer(content) + // Writing the content can be slow. Better create a temporary file and + // move it to the final destination once it is complete. + tmpName := name + ".tmp" + if err := d.podIO(pod).CreateFile(tmpName, buffer); err != nil { + _ = d.podIO(pod).RemoveAll(tmpName) + return err + } + return d.podIO(pod).Rename(tmpName, name) +} + +func (d *Driver) removeFile(pod *v1.Pod, name string) error { + return d.podIO(pod).RemoveAll(name) +} + +func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO { + logger := klog.Background() + return proxy.PodDirIO{ + F: d.f, + Namespace: pod.Namespace, + PodName: pod.Name, + ContainerName: "plugin", + Logger: &logger, + } +} + +func listen(ctx context.Context, f *framework.Framework, podName, containerName string, port int) net.Listener { + addr := proxy.Addr{ + Namespace: f.Namespace.Name, + PodName: podName, + ContainerName: containerName, + Port: port, + } + listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr) + framework.ExpectNoError(err, "listen for connections from %+v", addr) + return listener +} + +func (d *Driver) TearDown() { + for _, c := range d.cleanup { + c() + } + d.cleanup = nil + d.wg.Wait() +} + +func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + d.mutex.Lock() + defer d.mutex.Unlock() + + m := MethodInstance{nodename, info.FullMethod} + d.callCounts[m]++ + if d.fail[m] { + return nil, errors.New("injected error") + } + + return handler(ctx, req) +} + +func (d *Driver) Fail(m MethodInstance, injectError bool) { + d.mutex.Lock() + defer d.mutex.Unlock() + + d.fail[m] = injectError +} + +func (d *Driver) CallCount(m MethodInstance) int64 { + d.mutex.Lock() + defer d.mutex.Unlock() + + return d.callCounts[m] +} + +func (d *Driver) Nodenames() (nodenames []string) { + for nodename := range d.Nodes { + nodenames = append(nodenames, nodename) + } + sort.Strings(nodenames) + return +} diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go new file mode 100644 index 00000000000..ef1a3ebcba2 --- /dev/null +++ b/test/e2e/dra/dra.go @@ -0,0 +1,835 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dra + +import ( + "context" + "errors" + "fmt" + "strings" + "sync/atomic" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + v1 "k8s.io/api/core/v1" + resourcev1alpha1 "k8s.io/api/resource/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + "k8s.io/kubernetes/test/e2e/dra/test-driver/app" + "k8s.io/kubernetes/test/e2e/framework" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + admissionapi "k8s.io/pod-security-admission/api" +) + +const ( + // podStartTimeout is how long to wait for the pod to be started. + podStartTimeout = 5 * time.Minute +) + +func networkResources() app.Resources { + return app.Resources{ + Shareable: true, + } +} + +var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", func() { + f := framework.NewDefaultFramework("dra") + ctx := context.Background() + + // The driver containers have to run with sufficient privileges to + // modify /var/lib/kubelet/plugins. + f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged + + ginkgo.Context("kubelet", func() { + nodes := NewNodes(f, 1, 1) + driver := NewDriver(f, nodes, networkResources) // All tests get their own driver instance. + b := newBuilder(f, driver) + ginkgo.It("registers plugin", func() { + ginkgo.By("the driver is running") + }) + + // This test does not pass at the moment because kubelet doesn't retry. + ginkgo.It("must retry NodePrepareResource", func() { + // We have exactly one host. + m := MethodInstance{driver.Nodenames()[0], NodePrepareResourceMethod} + + driver.Fail(m, true) + + ginkgo.By("waiting for container startup to fail") + parameters := b.parameters() + pod, template := b.podInline(resourcev1alpha1.AllocationModeWaitForFirstConsumer) + + b.create(ctx, parameters, pod, template) + + ginkgo.By("wait for NodePrepareResource call") + gomega.Eventually(func() error { + if driver.CallCount(m) == 0 { + return errors.New("NodePrepareResource not called yet") + } + return nil + }).WithTimeout(podStartTimeout).Should(gomega.Succeed()) + + ginkgo.By("allowing container startup to succeed") + callCount := driver.CallCount(m) + driver.Fail(m, false) + err := e2epod.WaitForPodNameRunningInNamespace(f.ClientSet, pod.Name, pod.Namespace) + framework.ExpectNoError(err, "start pod with inline resource claim") + if driver.CallCount(m) == callCount { + framework.Fail("NodePrepareResource should have been called again") + } + }) + ginkgo.It("must not run a pod if a claim is not reserved for it", func() { + parameters := b.parameters() + claim := b.externalClaim(resourcev1alpha1.AllocationModeImmediate) + pod := b.podExternal() + + // This bypasses scheduling and therefore the pod gets + // to run on the node although it never gets added to + // the `ReservedFor` field of the claim. + pod.Spec.NodeName = nodes.NodeNames[0] + + b.create(ctx, parameters, claim, pod) + + gomega.Consistently(func() error { + testPod, err := b.f.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("expected the test pod %s to exist: %v", pod.Name, err) + } + if testPod.Status.Phase != v1.PodPending { + return fmt.Errorf("pod %s: unexpected status %s, expected status: %s", pod.Name, testPod.Status.Phase, v1.PodPending) + } + return nil + }, 20*time.Second, 200*time.Millisecond).Should(gomega.BeNil()) + }) + ginkgo.It("must unprepare resources for force-deleted pod", func() { + parameters := b.parameters() + claim := b.externalClaim(resourcev1alpha1.AllocationModeImmediate) + pod := b.podExternal() + zero := int64(0) + pod.Spec.TerminationGracePeriodSeconds = &zero + + b.create(ctx, parameters, claim, pod) + + b.testPod(f.ClientSet, pod) + + ginkgo.By(fmt.Sprintf("force delete test pod %s", pod.Name)) + err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero}) + if !apierrors.IsNotFound(err) { + framework.ExpectNoError(err, "force delete test pod") + } + + for host, plugin := range b.driver.Nodes { + ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host)) + gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host) + } + }) + }) + + ginkgo.Context("driver", func() { + nodes := NewNodes(f, 1, 1) + driver := NewDriver(f, nodes, networkResources) // All tests get their own driver instance. + b := newBuilder(f, driver) + // We need the parameters name *before* creating it. + b.parametersCounter = 1 + b.classParametersName = b.parametersName() + + ginkgo.It("supports claim and class parameters", func() { + classParameters := b.parameters("x", "y") + claimParameters := b.parameters() + pod, template := b.podInline(resourcev1alpha1.AllocationModeWaitForFirstConsumer) + + b.create(ctx, classParameters, claimParameters, pod, template) + + b.testPod(f.ClientSet, pod, "user_a", "b", "admin_x", "y") + }) + }) + + ginkgo.Context("cluster", func() { + nodes := NewNodes(f, 1, 4) + driver := NewDriver(f, nodes, networkResources) + b := newBuilder(f, driver) + + // claimTests tries out several different combinations of pods with + // claims, both inline and external. + claimTests := func(allocationMode resourcev1alpha1.AllocationMode) { + ginkgo.It("supports simple pod referencing inline resource claim", func() { + parameters := b.parameters() + pod, template := b.podInline(allocationMode) + b.create(ctx, parameters, pod, template) + + b.testPod(f.ClientSet, pod) + }) + + ginkgo.It("supports inline claim referenced by multiple containers", func() { + parameters := b.parameters() + pod, template := b.podInlineMultiple(allocationMode) + b.create(ctx, parameters, pod, template) + + b.testPod(f.ClientSet, pod) + }) + + ginkgo.It("supports simple pod referencing external resource claim", func() { + parameters := b.parameters() + pod := b.podExternal() + b.create(ctx, parameters, b.externalClaim(allocationMode), pod) + + b.testPod(f.ClientSet, pod) + }) + + ginkgo.It("supports external claim referenced by multiple pods", func() { + parameters := b.parameters() + pod1 := b.podExternal() + pod2 := b.podExternal() + pod3 := b.podExternal() + claim := b.externalClaim(allocationMode) + b.create(ctx, parameters, claim, pod1, pod2, pod3) + + for _, pod := range []*v1.Pod{pod1, pod2, pod3} { + b.testPod(f.ClientSet, pod) + } + }) + + ginkgo.It("supports external claim referenced by multiple containers of multiple pods", func() { + parameters := b.parameters() + pod1 := b.podExternalMultiple() + pod2 := b.podExternalMultiple() + pod3 := b.podExternalMultiple() + claim := b.externalClaim(allocationMode) + b.create(ctx, parameters, claim, pod1, pod2, pod3) + + for _, pod := range []*v1.Pod{pod1, pod2, pod3} { + b.testPod(f.ClientSet, pod) + } + }) + + ginkgo.It("supports init containers", func() { + parameters := b.parameters() + pod, template := b.podInline(allocationMode) + pod.Spec.InitContainers = []v1.Container{pod.Spec.Containers[0]} + pod.Spec.InitContainers[0].Name += "-init" + // This must succeed for the pod to start. + pod.Spec.InitContainers[0].Command = []string{"sh", "-c", "env | grep user_a=b"} + b.create(ctx, parameters, pod, template) + + b.testPod(f.ClientSet, pod) + }) + } + + ginkgo.Context("with delayed allocation", func() { + claimTests(resourcev1alpha1.AllocationModeWaitForFirstConsumer) + }) + + ginkgo.Context("with immediate allocation", func() { + claimTests(resourcev1alpha1.AllocationModeImmediate) + }) + }) + + ginkgo.Context("multiple nodes", func() { + nodes := NewNodes(f, 2, 8) + ginkgo.Context("with network-attached resources", func() { + driver := NewDriver(f, nodes, networkResources) + b := newBuilder(f, driver) + + ginkgo.It("schedules onto different nodes", func() { + parameters := b.parameters() + label := "app.kubernetes.io/instance" + instance := f.UniqueName + "-test-app" + antiAffinity := &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + TopologyKey: "kubernetes.io/hostname", + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + label: instance, + }, + }, + }, + }, + }, + } + createPod := func() *v1.Pod { + pod := b.podExternal() + pod.Labels[label] = instance + pod.Spec.Affinity = antiAffinity + return pod + } + pod1 := createPod() + pod2 := createPod() + claim := b.externalClaim(resourcev1alpha1.AllocationModeWaitForFirstConsumer) + b.create(ctx, parameters, claim, pod1, pod2) + + for _, pod := range []*v1.Pod{pod1, pod2} { + err := e2epod.WaitForPodRunningInNamespace(f.ClientSet, pod) + framework.ExpectNoError(err, "start pod") + } + }) + }) + + ginkgo.Context("with node-local resources", func() { + driver := NewDriver(f, nodes, func() app.Resources { + return app.Resources{ + NodeLocal: true, + MaxAllocations: 1, + Nodes: nodes.NodeNames, + } + }) + b := newBuilder(f, driver) + + tests := func(allocationMode resourcev1alpha1.AllocationMode) { + ginkgo.It("uses all resources", func() { + var objs = []klog.KMetadata{ + b.parameters(), + } + var pods []*v1.Pod + for i := 0; i < len(nodes.NodeNames); i++ { + pod, template := b.podInline(allocationMode) + pods = append(pods, pod) + objs = append(objs, pod, template) + } + b.create(ctx, objs...) + + for _, pod := range pods { + err := e2epod.WaitForPodRunningInNamespace(f.ClientSet, pod) + framework.ExpectNoError(err, "start pod") + } + + // The pods all should run on different + // nodes because the maximum number of + // claims per node was limited to 1 for + // this test. + // + // We cannot know for sure why the pods + // ran on two different nodes (could + // also be a coincidence) but if they + // don't cover all nodes, then we have + // a problem. + used := make(map[string]*v1.Pod) + for _, pod := range pods { + pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "get pod") + nodeName := pod.Spec.NodeName + if other, ok := used[nodeName]; ok { + framework.Failf("Pod %s got started on the same node %s as pod %s although claim allocation should have been limited to one claim per node.", pod.Name, nodeName, other.Name) + } + used[nodeName] = pod + } + }) + } + + ginkgo.Context("with delayed allocation", func() { + tests(resourcev1alpha1.AllocationModeWaitForFirstConsumer) + }) + + ginkgo.Context("with immediate allocation", func() { + tests(resourcev1alpha1.AllocationModeImmediate) + }) + }) + + ginkgo.Context("reallocation", func() { + var allocateWrapper app.AllocateWrapperType + driver := NewDriver(f, nodes, func() app.Resources { + return app.Resources{ + NodeLocal: true, + MaxAllocations: 2, + Nodes: nodes.NodeNames, + + AllocateWrapper: func(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{}, class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string, + handler func(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{}, class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha1.AllocationResult, err error)) (result *resourcev1alpha1.AllocationResult, err error) { + return allocateWrapper(ctx, claim, claimParameters, class, classParameters, selectedNode, handler) + }, + } + }) + b := newBuilder(f, driver) + + ginkgo.It("works", func() { + // A pod with two claims can run on a node, but + // only if allocation of both succeeds. This + // tests simulates the scenario where one claim + // gets allocated but the second doesn't + // because of a race with some other pod. + // + // To ensure the right timing, allocation of the second + // claim gets delayed while creating another pod + // that gets the remaining resource on the node. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + parameters := b.parameters() + claim1 := b.externalClaim(resourcev1alpha1.AllocationModeWaitForFirstConsumer) + claim2 := b.externalClaim(resourcev1alpha1.AllocationModeWaitForFirstConsumer) + pod1 := b.podExternal() + pod1.Spec.ResourceClaims = append(pod1.Spec.ResourceClaims, + v1.PodResourceClaim{ + Name: "claim2", + Source: v1.ClaimSource{ + ResourceClaimName: &claim2.Name, + }, + }, + ) + + // Block on the second external claim that is to be allocated. + blockClaim, cancelBlockClaim := context.WithCancel(ctx) + defer cancelBlockClaim() + var allocated int32 + allocateWrapper = func(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{}, + class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string, + handler func(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{}, + class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha1.AllocationResult, err error), + ) (result *resourcev1alpha1.AllocationResult, err error) { + oldAllocated := atomic.AddInt32(&allocated, 0) + if oldAllocated == 1 && strings.HasPrefix(claim.Name, "external-claim") { + <-blockClaim.Done() + } + result, err = handler(ctx, claim, claimParameters, class, classParameters, selectedNode) + if err == nil { + atomic.AddInt32(&allocated, 1) + } + return + } + b.create(ctx, parameters, claim1, claim2, pod1) + + ginkgo.By("waiting for one claim to be allocated") + var nodeSelector *v1.NodeSelector + gomega.Eventually(func() (int, error) { + claims, err := f.ClientSet.ResourceV1alpha1().ResourceClaims(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + return 0, err + } + allocated := 0 + for _, claim := range claims.Items { + if claim.Status.Allocation != nil { + allocated++ + nodeSelector = claim.Status.Allocation.AvailableOnNodes + } + } + return allocated, nil + }).WithTimeout(time.Minute).Should(gomega.Equal(1), "one claim allocated") + + // Now create a second pod which we force to + // run on the same node that is currently being + // considered for the first one. We know what + // the node selector looks like and can + // directly access the key and value from it. + ginkgo.By(fmt.Sprintf("create second pod on the same node %s", nodeSelector)) + pod2, template2 := b.podInline(resourcev1alpha1.AllocationModeWaitForFirstConsumer) + req := nodeSelector.NodeSelectorTerms[0].MatchExpressions[0] + node := req.Values[0] + pod2.Spec.NodeSelector = map[string]string{req.Key: node} + b.create(ctx, pod2, template2) + framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(f.ClientSet, pod2), "start pod 2") + + // Allow allocation of claim2 to proceed. It should fail now + // and the other node must be used instead, after deallocating + // the first claim. + ginkgo.By("move first pod to other node") + cancelBlockClaim() + framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(f.ClientSet, pod1), "start pod 1") + pod1, err := f.ClientSet.CoreV1().Pods(pod1.Namespace).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "get first pod") + if pod1.Spec.NodeName == "" { + framework.Fail("first pod should be running on node, was not scheduled") + } + framework.ExpectNotEqual(pod1.Spec.NodeName, node, "first pod should run on different node than second one") + framework.ExpectEqual(driver.Controller.GetNumDeallocations(), int64(1), "number of deallocations") + }) + }) + }) + + ginkgo.Context("multiple drivers", func() { + nodes := NewNodes(f, 1, 4) + driver1 := NewDriver(f, nodes, func() app.Resources { + return app.Resources{ + NodeLocal: true, + MaxAllocations: 1, + Nodes: nodes.NodeNames, + } + }) + b1 := newBuilder(f, driver1) + driver2 := NewDriver(f, nodes, func() app.Resources { + return app.Resources{ + NodeLocal: true, + MaxAllocations: 1, + Nodes: nodes.NodeNames, + } + }) + driver2.NameSuffix = "-other" + b2 := newBuilder(f, driver2) + + ginkgo.It("work", func() { + parameters1 := b1.parameters() + parameters2 := b2.parameters() + claim1 := b1.externalClaim(resourcev1alpha1.AllocationModeWaitForFirstConsumer) + claim2 := b2.externalClaim(resourcev1alpha1.AllocationModeWaitForFirstConsumer) + pod := b1.podExternal() + pod.Spec.ResourceClaims = append(pod.Spec.ResourceClaims, + v1.PodResourceClaim{ + Name: "claim2", + Source: v1.ClaimSource{ + ResourceClaimName: &claim2.Name, + }, + }, + ) + b1.create(ctx, parameters1, parameters2, claim1, claim2, pod) + b1.testPod(f.ClientSet, pod) + }) + }) +}) + +// builder contains a running counter to make objects unique within thir +// namespace. +type builder struct { + f *framework.Framework + driver *Driver + + podCounter int + parametersCounter int + claimCounter int + + classParametersName string +} + +// className returns the default resource class name. +func (b *builder) className() string { + return b.f.UniqueName + b.driver.NameSuffix + "-class" +} + +// class returns the resource class that the builder's other objects +// reference. +func (b *builder) class() *resourcev1alpha1.ResourceClass { + class := &resourcev1alpha1.ResourceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: b.className(), + }, + DriverName: b.driver.Name, + SuitableNodes: b.nodeSelector(), + } + if b.classParametersName != "" { + class.ParametersRef = &resourcev1alpha1.ResourceClassParametersReference{ + Kind: "ConfigMap", + Name: b.classParametersName, + Namespace: b.f.Namespace.Name, + } + } + return class +} + +// nodeSelector returns a node selector that matches all nodes on which the +// kubelet plugin was deployed. +func (b *builder) nodeSelector() *v1.NodeSelector { + return &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "kubernetes.io/hostname", + Operator: v1.NodeSelectorOpIn, + Values: b.driver.Nodenames(), + }, + }, + }, + }, + } +} + +// externalClaim returns external resource claim +// that test pods can reference +func (b *builder) externalClaim(allocationMode resourcev1alpha1.AllocationMode) *resourcev1alpha1.ResourceClaim { + b.claimCounter++ + name := "external-claim" + b.driver.NameSuffix // This is what podExternal expects. + if b.claimCounter > 1 { + name += fmt.Sprintf("-%d", b.claimCounter) + } + return &resourcev1alpha1.ResourceClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: resourcev1alpha1.ResourceClaimSpec{ + ResourceClassName: b.className(), + ParametersRef: &resourcev1alpha1.ResourceClaimParametersReference{ + Kind: "ConfigMap", + Name: b.parametersName(), + }, + AllocationMode: allocationMode, + }, + } +} + +// parametersName returns the current ConfigMap name for resource +// claim or class parameters. +func (b *builder) parametersName() string { + return fmt.Sprintf("parameters%s-%d", b.driver.NameSuffix, b.parametersCounter) +} + +// parametersEnv returns the default env variables. +func (b *builder) parametersEnv() map[string]string { + return map[string]string{ + "a": "b", + } +} + +// parameters returns a config map with the default env variables. +func (b *builder) parameters(kv ...string) *v1.ConfigMap { + b.parametersCounter++ + data := map[string]string{} + for i := 0; i < len(kv); i += 2 { + data[kv[i]] = kv[i+1] + } + if len(data) == 0 { + data = b.parametersEnv() + } + return &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: b.f.Namespace.Name, + Name: b.parametersName(), + }, + Data: data, + } +} + +// makePod returns a simple pod with no resource claims. +// The pod prints its env and waits. +func (b *builder) pod() *v1.Pod { + pod := e2epod.MakePod(b.f.Namespace.Name, nil, nil, false, "env && sleep 100000") + pod.Labels = make(map[string]string) + pod.Spec.RestartPolicy = v1.RestartPolicyNever + // Let kubelet kill the pods quickly. Setting + // TerminationGracePeriodSeconds to zero would bypass kubelet + // completely because then the apiserver enables a force-delete even + // when DeleteOptions for the pod don't ask for it (see + // https://github.com/kubernetes/kubernetes/blob/0f582f7c3f504e807550310d00f130cb5c18c0c3/pkg/registry/core/pod/strategy.go#L151-L171). + // + // We don't do that because it breaks tracking of claim usage: the + // kube-controller-manager assumes that kubelet is done with the pod + // once it got removed or has a grace period of 0. Setting the grace + // period to zero directly in DeletionOptions or indirectly through + // TerminationGracePeriodSeconds causes the controller to remove + // the pod from ReservedFor before it actually has stopped on + // the node. + one := int64(1) + pod.Spec.TerminationGracePeriodSeconds = &one + pod.ObjectMeta.GenerateName = "" + b.podCounter++ + pod.ObjectMeta.Name = fmt.Sprintf("tester%s-%d", b.driver.NameSuffix, b.podCounter) + return pod +} + +// makePodInline adds an inline resource claim with default class name and parameters. +func (b *builder) podInline(allocationMode resourcev1alpha1.AllocationMode) (*v1.Pod, *resourcev1alpha1.ResourceClaimTemplate) { + pod := b.pod() + pod.Spec.Containers[0].Name = "with-resource" + podClaimName := "my-inline-claim" + pod.Spec.Containers[0].Resources.Claims = []v1.ResourceClaim{{Name: podClaimName}} + pod.Spec.ResourceClaims = []v1.PodResourceClaim{ + { + Name: podClaimName, + Source: v1.ClaimSource{ + ResourceClaimTemplateName: &pod.Name, + }, + }, + } + template := &resourcev1alpha1.ResourceClaimTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + Spec: resourcev1alpha1.ResourceClaimTemplateSpec{ + Spec: resourcev1alpha1.ResourceClaimSpec{ + ResourceClassName: b.className(), + ParametersRef: &resourcev1alpha1.ResourceClaimParametersReference{ + Kind: "ConfigMap", + Name: b.parametersName(), + }, + AllocationMode: allocationMode, + }, + }, + } + return pod, template +} + +// podInlineMultiple returns a pod with inline resource claim referenced by 3 containers +func (b *builder) podInlineMultiple(allocationMode resourcev1alpha1.AllocationMode) (*v1.Pod, *resourcev1alpha1.ResourceClaimTemplate) { + pod, template := b.podInline(allocationMode) + pod.Spec.Containers = append(pod.Spec.Containers, *pod.Spec.Containers[0].DeepCopy(), *pod.Spec.Containers[0].DeepCopy()) + pod.Spec.Containers[1].Name = pod.Spec.Containers[1].Name + "-1" + pod.Spec.Containers[2].Name = pod.Spec.Containers[1].Name + "-2" + return pod, template +} + +// podExternal adds a pod that references external resource claim with default class name and parameters. +func (b *builder) podExternal() *v1.Pod { + pod := b.pod() + pod.Spec.Containers[0].Name = "with-resource" + podClaimName := "resource-claim" + externalClaimName := "external-claim" + b.driver.NameSuffix + pod.Spec.ResourceClaims = []v1.PodResourceClaim{ + { + Name: podClaimName, + Source: v1.ClaimSource{ + ResourceClaimName: &externalClaimName, + }, + }, + } + pod.Spec.Containers[0].Resources.Claims = []v1.ResourceClaim{{Name: podClaimName}} + return pod +} + +// podShared returns a pod with 3 containers that reference external resource claim with default class name and parameters. +func (b *builder) podExternalMultiple() *v1.Pod { + pod := b.podExternal() + pod.Spec.Containers = append(pod.Spec.Containers, *pod.Spec.Containers[0].DeepCopy(), *pod.Spec.Containers[0].DeepCopy()) + pod.Spec.Containers[1].Name = pod.Spec.Containers[1].Name + "-1" + pod.Spec.Containers[2].Name = pod.Spec.Containers[1].Name + "-2" + return pod +} + +// create takes a bunch of objects and calls their Create function. +func (b *builder) create(ctx context.Context, objs ...klog.KMetadata) { + for _, obj := range objs { + ginkgo.By(fmt.Sprintf("creating %T %s", obj, obj.GetName()), func() { + var err error + switch obj := obj.(type) { + case *resourcev1alpha1.ResourceClass: + _, err = b.f.ClientSet.ResourceV1alpha1().ResourceClasses().Create(ctx, obj, metav1.CreateOptions{}) + case *v1.Pod: + _, err = b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{}) + case *v1.ConfigMap: + _, err = b.f.ClientSet.CoreV1().ConfigMaps(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{}) + case *resourcev1alpha1.ResourceClaim: + _, err = b.f.ClientSet.ResourceV1alpha1().ResourceClaims(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{}) + case *resourcev1alpha1.ResourceClaimTemplate: + _, err = b.f.ClientSet.ResourceV1alpha1().ResourceClaimTemplates(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{}) + default: + framework.Fail(fmt.Sprintf("internal error, unsupported type %T", obj), 1) + } + framework.ExpectNoErrorWithOffset(1, err, "create %T", obj) + }) + } +} + +// testPod runs pod and checks if container logs contain expected environment variables +func (b *builder) testPod(clientSet kubernetes.Interface, pod *v1.Pod, env ...string) { + err := e2epod.WaitForPodRunningInNamespace(clientSet, pod) + framework.ExpectNoError(err, "start pod") + + for _, container := range pod.Spec.Containers { + log, err := e2epod.GetPodLogs(clientSet, pod.Namespace, pod.Name, container.Name) + framework.ExpectNoError(err, "get logs") + if len(env) == 0 { + for key, value := range b.parametersEnv() { + envStr := fmt.Sprintf("\nuser_%s=%s\n", key, value) + gomega.Expect(log).To(gomega.ContainSubstring(envStr), "container env variables") + } + } else { + for i := 0; i < len(env); i += 2 { + envStr := fmt.Sprintf("\n%s=%s\n", env[i], env[i+1]) + gomega.Expect(log).To(gomega.ContainSubstring(envStr), "container env variables") + } + } + } +} + +func newBuilder(f *framework.Framework, driver *Driver) *builder { + b := &builder{f: f, driver: driver} + + ginkgo.BeforeEach(b.setUp) + + return b +} + +func (b *builder) setUp() { + b.podCounter = 0 + b.parametersCounter = 0 + b.claimCounter = 0 + b.create(context.Background(), b.class()) + ginkgo.DeferCleanup(b.tearDown) +} + +func (b *builder) tearDown() { + ctx := context.Background() + + err := b.f.ClientSet.ResourceV1alpha1().ResourceClasses().Delete(ctx, b.className(), metav1.DeleteOptions{}) + framework.ExpectNoError(err, "delete resource class") + + // Before we allow the namespace and all objects in it do be deleted by + // the framework, we must ensure that test pods and the claims that + // they use are deleted. Otherwise the driver might get deleted first, + // in which case deleting the claims won't work anymore. + ginkgo.By("delete pods and claims") + pods, err := b.listTestPods(ctx) + framework.ExpectNoError(err, "list pods") + for _, pod := range pods { + if pod.DeletionTimestamp != nil { + continue + } + ginkgo.By(fmt.Sprintf("deleting %T %s", &pod, klog.KObj(&pod))) + err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + if !apierrors.IsNotFound(err) { + framework.ExpectNoError(err, "delete pod") + } + } + gomega.Eventually(func() ([]v1.Pod, error) { + return b.listTestPods(ctx) + }).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "remaining pods despite deletion") + + claims, err := b.f.ClientSet.ResourceV1alpha1().ResourceClaims(b.f.Namespace.Name).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err, "get resource claims") + for _, claim := range claims.Items { + if claim.DeletionTimestamp != nil { + continue + } + ginkgo.By(fmt.Sprintf("deleting %T %s", &claim, klog.KObj(&claim))) + err := b.f.ClientSet.ResourceV1alpha1().ResourceClaims(b.f.Namespace.Name).Delete(ctx, claim.Name, metav1.DeleteOptions{}) + if !apierrors.IsNotFound(err) { + framework.ExpectNoError(err, "delete claim") + } + } + + for host, plugin := range b.driver.Nodes { + ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host)) + gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host) + } + + ginkgo.By("waiting for claims to be deallocated and deleted") + gomega.Eventually(func() ([]resourcev1alpha1.ResourceClaim, error) { + claims, err := b.f.ClientSet.ResourceV1alpha1().ResourceClaims(b.f.Namespace.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + return claims.Items, nil + }).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "claims in the namespaces") +} + +func (b *builder) listTestPods(ctx context.Context) ([]v1.Pod, error) { + pods, err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + var testPods []v1.Pod + for _, pod := range pods.Items { + if pod.Labels["app.kubernetes.io/part-of"] == "dra-test-driver" { + continue + } + testPods = append(testPods, pod) + } + return testPods, nil +} diff --git a/test/e2e/dra/test-driver/README.md b/test/e2e/dra/test-driver/README.md new file mode 100644 index 00000000000..b79edda6dd1 --- /dev/null +++ b/test/e2e/dra/test-driver/README.md @@ -0,0 +1,103 @@ +# dra-test-driver + +This driver implements the controller and a resource kubelet plugin for dynamic +resource allocation. This is done in a single binary to minimize the amount of +boilerplate code. "Real" drivers could also implement both in different +binaries. + +## Usage + +The driver could get deployed as a Deployment for the controller, with leader +election. A DaemonSet could get used for the kubelet plugin. The controller can +also run as a Kubernetes client outside of a cluster. The same works for the +kubelet plugin when using port forwarding. This is how it is used during +testing. + +Valid parameters are key/value string pairs stored in a ConfigMap. +Those get copied into the ResourceClaimStatus with "user_" and "admin_" as +prefix, depending on whether they came from the ResourceClaim or ResourceClass. +They get stored in the `ResourceHandle` field as JSON map by the controller. +The kubelet plugin then sets these attributes as environment variables in each +container that uses the resource. + +Resource availability is configurable and can simulate different scenarios: + +- Network-attached resources, available on all nodes where the node driver runs, or + host-local resources, available only on the node whether they were allocated. +- Shared or unshared allocations. +- Unlimited or limited resources. The limit is a simple number of allocations + per cluster or node. + +While the functionality itself is very limited, the code strives to showcase +best practices and supports metrics, leader election, and the same logging +options as Kubernetes. + +## Design + +The binary itself is a Cobra command with two operations, `controller` and +`kubelet-plugin`. Logging is done with [contextual +logging](https://github.com/kubernetes/enhancements/tree/master/keps/sig-instrumentation/3077-contextual-logging). + +The `k8s.io/dynamic-resource-allocation/controller` package implements the +interaction with ResourceClaims. It is generic and relies on an interface to +implement the actual driver logic. Long-term that part could be split out into +a reusable utility package. + +The `k8s.io/dynamic-resource-allocation/kubelet-plugin` package implements the +interaction with kubelet, again relying only on the interface defined for the +kubelet<->dynamic resource allocation plugin interaction. + +`app` is the driver itself with a very simple implementation of the interfaces. + +## Deployment + +### `local-up-cluster.sh` + +To try out the feature, build Kubernetes, then in one console run: +```console +FEATURE_GATES=DynamicResourceAllocation=true ALLOW_PRIVILEGED=1 ./hack/local-up-cluster.sh -O +``` + +In another: +```console +go run ./test/e2e/dra/test-driver --feature-gates ContextualLogging=true -v=5 controller +``` + +In yet another: +```console +sudo mkdir -p /var/run/cdi && sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry +go run ./test/e2e/dra/test-driver --feature-gates ContextualLogging=true -v=5 kubelet-plugin +``` + +And finally: +```console +$ kubectl create -f test/e2e/dra/test-driver/deploy/example/resourceclass.yaml +resourceclass/example created +$ kubectl create -f test/e2e/dra/test-driver/deploy/example/pod-inline.yaml +configmap/pause-claim-parameters created +pod/pause created + +$ kubectl get resourceclaims +NAME CLASSNAME ALLOCATIONMODE STATE AGE +pause-resource example WaitForFirstConsumer allocated,reserved 19s + +$ kubectl get pods +NAME READY STATUS RESTARTS AGE +pause 1/1 Running 0 23s +``` + +There are also examples for other scenarios (multiple pods, multiple claims). + +### multi-node cluster + +At this point there are no container images that contain the test driver and +therefore it cannot be deployed on "normal" clusters. + +## Prior art + +Some of this code was derived from the +[external-resizer](https://github.com/kubernetes-csi/external-resizer/). `controller` +corresponds to the [controller +logic](https://github.com/kubernetes-csi/external-resizer/blob/master/pkg/controller/controller.go), +which in turn is similar to the +[sig-storage-lib-external-provisioner](https://github.com/kubernetes-sigs/sig-storage-lib-external-provisioner). diff --git a/test/e2e/dra/test-driver/app/cdi.go b/test/e2e/dra/test-driver/app/cdi.go new file mode 100644 index 00000000000..41d1976c8a7 --- /dev/null +++ b/test/e2e/dra/test-driver/app/cdi.go @@ -0,0 +1,44 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +// These definitions are sufficient to generate simple CDI files which set env +// variables in a container, which is all that the test driver does. A real +// driver will either use pre-generated CDI files or can use the +// github.com/container-orchestrated-devices/container-device-interface/pkg/cdi +// helper package to generate files. +// +// This is not done in Kubernetes to minimize dependencies. + +// spec is the base configuration for CDI. +type spec struct { + Version string `json:"cdiVersion"` + Kind string `json:"kind"` + + Devices []device `json:"devices"` +} + +// device is a "Device" a container runtime can add to a container. +type device struct { + Name string `json:"name"` + ContainerEdits containerEdits `json:"containerEdits"` +} + +// containerEdits are edits a container runtime must make to the OCI spec to expose the device. +type containerEdits struct { + Env []string `json:"env,omitempty"` +} diff --git a/test/e2e/dra/test-driver/app/controller.go b/test/e2e/dra/test-driver/app/controller.go new file mode 100644 index 00000000000..079e7739dcf --- /dev/null +++ b/test/e2e/dra/test-driver/app/controller.go @@ -0,0 +1,357 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package app does all of the work necessary to configure and run a +// Kubernetes app process. +package app + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/rand" + "strings" + "sync" + + v1 "k8s.io/api/core/v1" + resourcev1alpha1 "k8s.io/api/resource/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/dynamic-resource-allocation/controller" + "k8s.io/klog/v2" +) + +type Resources struct { + NodeLocal bool + Nodes []string + MaxAllocations int + Shareable bool + + // AllocateWrapper, if set, gets called for each Allocate call. + AllocateWrapper AllocateWrapperType +} + +type AllocateWrapperType func(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{}, + class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string, + handler func(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{}, + class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha1.AllocationResult, err error), +) (result *resourcev1alpha1.AllocationResult, err error) + +type ExampleController struct { + clientset kubernetes.Interface + resources Resources + driverName string + + // mutex must be locked at the gRPC call level. + mutex sync.Mutex + // allocated maps claim.UID to the node (if network-attached) or empty (if not). + allocated map[types.UID]string + + numAllocations, numDeallocations int64 +} + +func NewController(clientset kubernetes.Interface, driverName string, resources Resources) *ExampleController { + c := &ExampleController{ + clientset: clientset, + resources: resources, + driverName: driverName, + + allocated: make(map[types.UID]string), + } + return c +} + +func (c *ExampleController) Run(ctx context.Context, workers int) *ExampleController { + informerFactory := informers.NewSharedInformerFactory(c.clientset, 0 /* resync period */) + ctrl := controller.New(ctx, c.driverName, c, c.clientset, informerFactory) + informerFactory.Start(ctx.Done()) + ctrl.Run(workers) + + return c +} + +type parameters struct { + EnvVars map[string]string + NodeName string +} + +var _ controller.Driver = &ExampleController{} + +func (c *ExampleController) countAllocations(node string) int { + total := 0 + for _, n := range c.allocated { + if n == node { + total++ + } + } + return total +} + +// GetNumAllocations returns the number of times that a claim was allocated. +// Idempotent calls to Allocate that do not need to allocate the claim again do +// not contribute to that counter. +func (c *ExampleController) GetNumAllocations() int64 { + c.mutex.Lock() + defer c.mutex.Unlock() + + return c.numAllocations +} + +// GetNumDeallocations returns the number of times that a claim was allocated. +// Idempotent calls to Allocate that do not need to allocate the claim again do +// not contribute to that counter. +func (c *ExampleController) GetNumDeallocations() int64 { + c.mutex.Lock() + defer c.mutex.Unlock() + + return c.numDeallocations +} + +func (c *ExampleController) GetClassParameters(ctx context.Context, class *resourcev1alpha1.ResourceClass) (interface{}, error) { + if class.ParametersRef != nil { + if class.ParametersRef.APIGroup != "" || + class.ParametersRef.Kind != "ConfigMap" { + return nil, fmt.Errorf("class parameters are only supported in APIVersion v1, Kind ConfigMap, got: %v", class.ParametersRef) + } + return c.readParametersFromConfigMap(ctx, class.ParametersRef.Namespace, class.ParametersRef.Name) + } + return nil, nil +} + +func (c *ExampleController) GetClaimParameters(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, class *resourcev1alpha1.ResourceClass, classParameters interface{}) (interface{}, error) { + if claim.Spec.ParametersRef != nil { + if claim.Spec.ParametersRef.APIGroup != "" || + claim.Spec.ParametersRef.Kind != "ConfigMap" { + return nil, fmt.Errorf("claim parameters are only supported in APIVersion v1, Kind ConfigMap, got: %v", claim.Spec.ParametersRef) + } + return c.readParametersFromConfigMap(ctx, claim.Namespace, claim.Spec.ParametersRef.Name) + } + return nil, nil +} + +func (c *ExampleController) readParametersFromConfigMap(ctx context.Context, namespace, name string) (map[string]string, error) { + configMap, err := c.clientset.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("get config map: %v", err) + } + return configMap.Data, nil +} + +func (c *ExampleController) Allocate(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{}, class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha1.AllocationResult, err error) { + if c.resources.AllocateWrapper != nil { + return c.resources.AllocateWrapper(ctx, claim, claimParameters, class, classParameters, selectedNode, c.allocate) + } + return c.allocate(ctx, claim, claimParameters, class, classParameters, selectedNode) +} + +// allocate simply copies parameters as JSON map into ResourceHandle. +func (c *ExampleController) allocate(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{}, class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha1.AllocationResult, err error) { + logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "Allocate"), "claim", klog.KObj(claim), "uid", claim.UID) + defer func() { + logger.Info("done", "result", prettyPrint(result), "err", err) + }() + + c.mutex.Lock() + defer c.mutex.Unlock() + + // Already allocated? Then we don't need to count it again. + node, alreadyAllocated := c.allocated[claim.UID] + if alreadyAllocated { + // Idempotent result - kind of. We don't check whether + // the parameters changed in the meantime. A real + // driver would have to do that. + logger.Info("already allocated") + } else { + logger.Info("starting", "selectedNode", selectedNode) + if c.resources.NodeLocal { + node = selectedNode + if node == "" { + // If none has been selected because we do immediate allocation, + // then we need to pick one ourselves. + var viableNodes []string + for _, n := range c.resources.Nodes { + if c.resources.MaxAllocations == 0 || + c.countAllocations(n) < c.resources.MaxAllocations { + viableNodes = append(viableNodes, n) + } + } + if len(viableNodes) == 0 { + return nil, errors.New("resources exhausted on all nodes") + } + // Pick randomly. We could also prefer the one with the least + // number of allocations (even spreading) or the most (packing). + node = viableNodes[rand.Intn(len(viableNodes))] + logger.Info("picked a node ourselves", "selectedNode", selectedNode) + } else if c.resources.MaxAllocations > 0 && + c.countAllocations(node) >= c.resources.MaxAllocations { + return nil, fmt.Errorf("resources exhausted on node %q", node) + } + } else { + if c.resources.MaxAllocations > 0 && + len(c.allocated) >= c.resources.MaxAllocations { + return nil, errors.New("resources exhausted in the cluster") + } + } + } + + allocation := &resourcev1alpha1.AllocationResult{ + Shareable: c.resources.Shareable, + } + p := parameters{ + EnvVars: make(map[string]string), + NodeName: node, + } + toEnvVars("user", claimParameters, p.EnvVars) + toEnvVars("admin", classParameters, p.EnvVars) + data, err := json.Marshal(p) + if err != nil { + return nil, fmt.Errorf("encode parameters: %v", err) + } + allocation.ResourceHandle = string(data) + var nodes []string + if node != "" { + nodes = append(nodes, node) + } else { + nodes = c.resources.Nodes + } + if len(nodes) > 0 { + allocation.AvailableOnNodes = &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "kubernetes.io/hostname", + Operator: v1.NodeSelectorOpIn, + Values: nodes, + }, + }, + }, + }, + } + } + if !alreadyAllocated { + c.numAllocations++ + c.allocated[claim.UID] = node + } + return allocation, nil +} + +func (c *ExampleController) Deallocate(ctx context.Context, claim *resourcev1alpha1.ResourceClaim) error { + logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "Deallocate"), "claim", klog.KObj(claim), "uid", claim.UID) + c.mutex.Lock() + defer c.mutex.Unlock() + + if _, ok := c.allocated[claim.UID]; !ok { + logger.Info("already deallocated") + return nil + } + + logger.Info("done") + c.numDeallocations++ + delete(c.allocated, claim.UID) + return nil +} + +func (c *ExampleController) UnsuitableNodes(ctx context.Context, pod *v1.Pod, claims []*controller.ClaimAllocation, potentialNodes []string) (finalErr error) { + logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "UnsuitableNodes"), "pod", klog.KObj(pod)) + logger.Info("starting", "claim", prettyPrintSlice(claims), "potentialNodes", potentialNodes) + defer func() { + // UnsuitableNodes is the same for all claims. + logger.Info("done", "unsuitableNodes", claims[0].UnsuitableNodes, "err", finalErr) + }() + if c.resources.MaxAllocations == 0 { + // All nodes are suitable. + return nil + } + if c.resources.NodeLocal { + allocationsPerNode := make(map[string]int) + for _, node := range c.resources.Nodes { + allocationsPerNode[node] = c.countAllocations(node) + } + for _, claim := range claims { + claim.UnsuitableNodes = nil + for _, node := range potentialNodes { + // If we have more than one claim, then a + // single pod wants to use all of them. That + // can only work if a node has capacity left + // for all of them. Also, nodes that the driver + // doesn't run on cannot be used. + if contains(c.resources.Nodes, node) && + allocationsPerNode[node]+len(claims) > c.resources.MaxAllocations { + claim.UnsuitableNodes = append(claim.UnsuitableNodes, node) + } + } + } + return nil + } + + allocations := c.countAllocations("") + for _, claim := range claims { + claim.UnsuitableNodes = nil + for _, node := range potentialNodes { + if contains(c.resources.Nodes, node) && + allocations+len(claims) > c.resources.MaxAllocations { + claim.UnsuitableNodes = append(claim.UnsuitableNodes, node) + } + } + } + + return nil +} + +func toEnvVars(what string, from interface{}, to map[string]string) { + if from == nil { + return + } + + env := from.(map[string]string) + for key, value := range env { + to[what+"_"+strings.ToLower(key)] = value + } +} + +func contains[T comparable](list []T, value T) bool { + for _, v := range list { + if v == value { + return true + } + } + + return false +} + +func prettyPrint[T any](obj *T) interface{} { + if obj == nil { + return "" + } + return *obj +} + +// prettyPrintSlice prints the values the slice points to, not the pointers. +func prettyPrintSlice[T any](slice []*T) interface{} { + var values []interface{} + for _, v := range slice { + if v == nil { + values = append(values, "") + } else { + values = append(values, *v) + } + } + return values +} diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go new file mode 100644 index 00000000000..89e9b85734b --- /dev/null +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -0,0 +1,208 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + + "k8s.io/dynamic-resource-allocation/kubeletplugin" + "k8s.io/klog/v2" + drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha1" +) + +type ExamplePlugin struct { + logger klog.Logger + d kubeletplugin.DRAPlugin + fileOps FileOperations + + cdiDir string + driverName string + nodeName string + + mutex sync.Mutex + prepared map[ClaimID]bool +} + +// ClaimID contains both claim name and UID to simplify debugging. The +// namespace is not included because it is random in E2E tests and the UID is +// sufficient to make the ClaimID unique. +type ClaimID struct { + Name string + UID string +} + +var _ drapbv1.NodeServer = &ExamplePlugin{} + +// getJSONFilePath returns the absolute path where CDI file is/should be. +func (ex *ExamplePlugin) getJSONFilePath(claimUID string) string { + return filepath.Join(ex.cdiDir, fmt.Sprintf("%s-%s.json", ex.driverName, claimUID)) +} + +// FileOperations defines optional callbacks for handling CDI files. +type FileOperations struct { + // Create must overwrite the file. + Create func(name string, content []byte) error + + // Remove must remove the file. It must not return an error when the + // file does not exist. + Remove func(name string) error +} + +// StartPlugin sets up the servers that are necessary for a DRA kubelet plugin. +func StartPlugin(logger klog.Logger, cdiDir, driverName string, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) { + if fileOps.Create == nil { + fileOps.Create = func(name string, content []byte) error { + return os.WriteFile(name, content, os.FileMode(0644)) + } + } + if fileOps.Remove == nil { + fileOps.Remove = func(name string) error { + if err := os.Remove(name); err != nil && !os.IsNotExist(err) { + return err + } + return nil + } + } + ex := &ExamplePlugin{ + logger: logger, + fileOps: fileOps, + cdiDir: cdiDir, + driverName: driverName, + nodeName: nodeName, + prepared: make(map[ClaimID]bool), + } + + opts = append(opts, + kubeletplugin.Logger(logger), + kubeletplugin.DriverName(driverName), + ) + d, err := kubeletplugin.Start(ex, opts...) + if err != nil { + return nil, fmt.Errorf("start kubelet plugin: %v", err) + } + ex.d = d + + return ex, nil +} + +// stop ensures that all servers are stopped and resources freed. +func (ex *ExamplePlugin) Stop() { + ex.d.Stop() +} + +func (ex *ExamplePlugin) IsRegistered() bool { + status := ex.d.RegistrationStatus() + if status == nil { + return false + } + return status.PluginRegistered +} + +// NodePrepareResource ensures that the CDI file for the claim exists. It uses +// a deterministic name to simplify NodeUnprepareResource (no need to remember +// or discover the name) and idempotency (when called again, the file simply +// gets written again). +func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1.NodePrepareResourceRequest) (*drapbv1.NodePrepareResourceResponse, error) { + logger := klog.FromContext(ctx) + + // Determine environment variables. + var p parameters + if err := json.Unmarshal([]byte(req.ResourceHandle), &p); err != nil { + return nil, fmt.Errorf("unmarshal resource handle: %v", err) + } + + // Sanity check scheduling. + if p.NodeName != "" && ex.nodeName != "" && p.NodeName != ex.nodeName { + return nil, fmt.Errorf("claim was allocated for %q, cannot be prepared on %q", p.NodeName, ex.nodeName) + } + + // CDI wants env variables as set of strings. + envs := []string{} + for key, val := range p.EnvVars { + envs = append(envs, key+"="+val) + } + + deviceName := "claim-" + req.ClaimUid + vendor := ex.driverName + class := "test" + spec := &spec{ + Version: "0.2.0", // This has to be a version accepted by the runtimes. + Kind: vendor + "/" + class, + // At least one device is required and its entry must have more + // than just the name. + Devices: []device{ + { + Name: deviceName, + ContainerEdits: containerEdits{ + Env: envs, + }, + }, + }, + } + filePath := ex.getJSONFilePath(req.ClaimUid) + buffer, err := json.Marshal(spec) + if err != nil { + return nil, fmt.Errorf("marshal spec: %v", err) + } + if err := ex.fileOps.Create(filePath, buffer); err != nil { + return nil, fmt.Errorf("failed to write CDI file %v", err) + } + + dev := vendor + "/" + class + "=" + deviceName + resp := &drapbv1.NodePrepareResourceResponse{CdiDevices: []string{dev}} + + ex.mutex.Lock() + defer ex.mutex.Unlock() + ex.prepared[ClaimID{Name: req.ClaimName, UID: req.ClaimUid}] = true + + logger.V(3).Info("CDI file created", "path", filePath, "device", dev) + return resp, nil +} + +// NodeUnprepareResource removes the CDI file created by +// NodePrepareResource. It's idempotent, therefore it is not an error when that +// file is already gone. +func (ex *ExamplePlugin) NodeUnprepareResource(ctx context.Context, req *drapbv1.NodeUnprepareResourceRequest) (*drapbv1.NodeUnprepareResourceResponse, error) { + logger := klog.FromContext(ctx) + + filePath := ex.getJSONFilePath(req.ClaimUid) + if err := ex.fileOps.Remove(filePath); err != nil { + return nil, fmt.Errorf("error removing CDI file: %v", err) + } + logger.V(3).Info("CDI file removed", "path", filePath) + + ex.mutex.Lock() + defer ex.mutex.Unlock() + delete(ex.prepared, ClaimID{Name: req.ClaimName, UID: req.ClaimUid}) + + return &drapbv1.NodeUnprepareResourceResponse{}, nil +} + +func (ex *ExamplePlugin) GetPreparedResources() []ClaimID { + ex.mutex.Lock() + defer ex.mutex.Unlock() + var prepared []ClaimID + for claimID := range ex.prepared { + prepared = append(prepared, claimID) + } + return prepared +} diff --git a/test/e2e/dra/test-driver/app/server.go b/test/e2e/dra/test-driver/app/server.go new file mode 100644 index 00000000000..4be3ea34f32 --- /dev/null +++ b/test/e2e/dra/test-driver/app/server.go @@ -0,0 +1,319 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package app does all of the work necessary to configure and run a +// Kubernetes app process. +package app + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "net/http/pprof" + "os" + "os/signal" + "path" + "path/filepath" + "strings" + "syscall" + "time" + + "github.com/spf13/cobra" + "k8s.io/component-base/metrics" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + cliflag "k8s.io/component-base/cli/flag" + "k8s.io/component-base/featuregate" + "k8s.io/component-base/logs" + logsapi "k8s.io/component-base/logs/api/v1" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/term" + "k8s.io/dynamic-resource-allocation/kubeletplugin" + "k8s.io/dynamic-resource-allocation/leaderelection" + "k8s.io/klog/v2" +) + +// NewCommand creates a *cobra.Command object with default parameters. +func NewCommand() *cobra.Command { + o := logsapi.NewLoggingConfiguration() + var clientset kubernetes.Interface + var config *rest.Config + ctx := context.Background() + logger := klog.Background() + + cmd := &cobra.Command{ + Use: "cdi-test-driver", + Long: "cdi-test-driver implements a resource driver controller and kubelet plugin.", + } + sharedFlagSets := cliflag.NamedFlagSets{} + fs := sharedFlagSets.FlagSet("logging") + logsapi.AddFlags(o, fs) + logs.AddFlags(fs, logs.SkipLoggingConfigurationFlags()) + + fs = sharedFlagSets.FlagSet("Kubernetes client") + kubeconfig := fs.String("kubeconfig", "", "Absolute path to the kube.config file. Either this or KUBECONFIG need to be set if the driver is being run out of cluster.") + kubeAPIQPS := fs.Float32("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver.") + kubeAPIBurst := fs.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver.") + workers := fs.Int("workers", 10, "Concurrency to process multiple claims") + + fs = sharedFlagSets.FlagSet("http server") + httpEndpoint := fs.String("http-endpoint", "", + "The TCP network address where the HTTP server for diagnostics, including pprof, metrics and (if applicable) leader election health check, will listen (example: `:8080`). The default is the empty string, which means the server is disabled.") + metricsPath := fs.String("metrics-path", "/metrics", "The HTTP path where Prometheus metrics will be exposed, disabled if empty.") + profilePath := fs.String("pprof-path", "", "The HTTP path where pprof profiling will be available, disabled if empty.") + + fs = sharedFlagSets.FlagSet("CDI") + driverName := fs.String("drivername", "test-driver.cdi.k8s.io", "Resource driver name.") + + fs = sharedFlagSets.FlagSet("other") + featureGate := featuregate.NewFeatureGate() + utilruntime.Must(logsapi.AddFeatureGates(featureGate)) + featureGate.AddFlag(fs) + + fs = cmd.PersistentFlags() + for _, f := range sharedFlagSets.FlagSets { + fs.AddFlagSet(f) + } + + mux := http.NewServeMux() + + cmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error { + // Activate logging as soon as possible, after that + // show flags with the final logging configuration. + + if err := logsapi.ValidateAndApply(o, featureGate); err != nil { + return err + } + + // get the KUBECONFIG from env if specified (useful for local/debug cluster) + kubeconfigEnv := os.Getenv("KUBECONFIG") + + if kubeconfigEnv != "" { + logger.Info("Found KUBECONFIG environment variable set, using that..") + *kubeconfig = kubeconfigEnv + } + + var err error + if *kubeconfig == "" { + config, err = rest.InClusterConfig() + if err != nil { + return fmt.Errorf("create in-cluster client configuration: %v", err) + } + } else { + config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig) + if err != nil { + return fmt.Errorf("create out-of-cluster client configuration: %v", err) + } + } + config.QPS = *kubeAPIQPS + config.Burst = *kubeAPIBurst + + clientset, err = kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("create client: %v", err) + } + + if *httpEndpoint != "" { + if *metricsPath != "" { + // For workqueue and leader election metrics, set up via the anonymous imports of: + // https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/workqueue/metrics.go + // https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/leaderelection/metrics.go + // + // Also to happens to include Go runtime and process metrics: + // https://github.com/kubernetes/kubernetes/blob/9780d88cb6a4b5b067256ecb4abf56892093ee87/staging/src/k8s.io/component-base/metrics/legacyregistry/registry.go#L46-L49 + gatherer := legacyregistry.DefaultGatherer + actualPath := path.Join("/", *metricsPath) + logger.Info("Starting metrics", "path", actualPath) + mux.Handle(actualPath, + metrics.HandlerFor(gatherer, metrics.HandlerOpts{})) + } + + if *profilePath != "" { + actualPath := path.Join("/", *profilePath) + logger.Info("Starting profiling", "path", actualPath) + mux.HandleFunc(path.Join("/", *profilePath), pprof.Index) + mux.HandleFunc(path.Join("/", *profilePath, "cmdline"), pprof.Cmdline) + mux.HandleFunc(path.Join("/", *profilePath, "profile"), pprof.Profile) + mux.HandleFunc(path.Join("/", *profilePath, "symbol"), pprof.Symbol) + mux.HandleFunc(path.Join("/", *profilePath, "trace"), pprof.Trace) + } + + listener, err := net.Listen("tcp", *httpEndpoint) + if err != nil { + return fmt.Errorf("listen on HTTP endpoint: %v", err) + } + + go func() { + logger.Info("Starting HTTP server", "endpoint", *httpEndpoint) + err := http.Serve(listener, mux) + if err != nil { + logger.Error(err, "HTTP server failed") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + }() + } + + return nil + } + + controller := &cobra.Command{ + Use: "controller", + Short: "run as resource controller", + Long: "cdi-test-driver controller runs as a resource driver controller.", + Args: cobra.ExactArgs(0), + } + controllerFlagSets := cliflag.NamedFlagSets{} + fs = controllerFlagSets.FlagSet("leader election") + enableLeaderElection := fs.Bool("leader-election", false, + "Enables leader election. If leader election is enabled, additional RBAC rules are required.") + leaderElectionNamespace := fs.String("leader-election-namespace", "", + "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.") + leaderElectionLeaseDuration := fs.Duration("leader-election-lease-duration", 15*time.Second, + "Duration, in seconds, that non-leader candidates will wait to force acquire leadership.") + leaderElectionRenewDeadline := fs.Duration("leader-election-renew-deadline", 10*time.Second, + "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up.") + leaderElectionRetryPeriod := fs.Duration("leader-election-retry-period", 5*time.Second, + "Duration, in seconds, the LeaderElector clients should wait between tries of actions.") + resourceConfig := fs.String("resource-config", "", "A JSON file containing a Resources struct. Defaults are unshared, network-attached resources.") + fs = controller.Flags() + for _, f := range controllerFlagSets.FlagSets { + fs.AddFlagSet(f) + } + + controller.RunE = func(cmd *cobra.Command, args []string) error { + resources := Resources{} + if *resourceConfig != "" { + file, err := os.Open(*resourceConfig) + if err != nil { + return fmt.Errorf("open resource config: %v", err) + } + decoder := json.NewDecoder(file) + decoder.DisallowUnknownFields() + if err := decoder.Decode(&resources); err != nil { + return fmt.Errorf("parse resource config %q: %v", *resourceConfig, err) + } + } + + run := func() { + controller := NewController(clientset, *driverName, resources) + controller.Run(ctx, *workers) + } + + if !*enableLeaderElection { + run() + return nil + } + + // This must not change between releases. + lockName := *driverName + + // Create a new clientset for leader election + // to avoid starving it when the normal traffic + // exceeds the QPS+burst limits. + leClientset, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("create leaderelection client: %v", err) + } + + le := leaderelection.New(leClientset, lockName, + func(ctx context.Context) { + run() + }, + leaderelection.LeaseDuration(*leaderElectionLeaseDuration), + leaderelection.RenewDeadline(*leaderElectionRenewDeadline), + leaderelection.RetryPeriod(*leaderElectionRetryPeriod), + leaderelection.Namespace(*leaderElectionNamespace), + ) + if *httpEndpoint != "" { + le.PrepareHealthCheck(mux) + } + if err := le.Run(); err != nil { + return fmt.Errorf("leader election failed: %v", err) + } + + return nil + } + cmd.AddCommand(controller) + + kubeletPlugin := &cobra.Command{ + Use: "kubelet-plugin", + Short: "run as kubelet plugin", + Long: "cdi-test-driver kubelet-plugin runs as a device plugin for kubelet that supports dynamic resource allocation.", + Args: cobra.ExactArgs(0), + } + kubeletPluginFlagSets := cliflag.NamedFlagSets{} + fs = kubeletPluginFlagSets.FlagSet("kubelet") + pluginRegistrationPath := fs.String("plugin-registration-path", "/var/lib/kubelet/plugins_registry", "The directory where kubelet looks for plugin registration sockets, in the filesystem of the driver.") + endpoint := fs.String("endpoint", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket where the driver will listen for kubelet requests, in the filesystem of the driver.") + draAddress := fs.String("dra-address", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket that kubelet will connect to for dynamic resource allocation requests, in the filesystem of kubelet.") + fs = kubeletPluginFlagSets.FlagSet("CDI") + cdiDir := fs.String("cdi-dir", "/var/run/cdi", "directory for dynamically created CDI JSON files") + fs = kubeletPlugin.Flags() + for _, f := range kubeletPluginFlagSets.FlagSets { + fs.AddFlagSet(f) + } + kubeletPlugin.RunE = func(cmd *cobra.Command, args []string) error { + // Ensure that directories exist, creating them if necessary. We want + // to know early if there is a setup problem that would prevent + // creating those directories. + if err := os.MkdirAll(*cdiDir, os.FileMode(0750)); err != nil { + return fmt.Errorf("create CDI directory: %v", err) + } + if err := os.MkdirAll(filepath.Dir(*endpoint), 0750); err != nil { + return fmt.Errorf("create socket directory: %v", err) + } + + plugin, err := StartPlugin(logger, *cdiDir, *driverName, "", FileOperations{}, + kubeletplugin.PluginSocketPath(*endpoint), + kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")), + kubeletplugin.KubeletPluginSocketPath(*draAddress), + ) + if err != nil { + return fmt.Errorf("start example plugin: %v", err) + } + + // Handle graceful shutdown. We need to delete Unix domain + // sockets. + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, os.Interrupt, syscall.SIGTERM) + logger.Info("Waiting for signal.") + sig := <-sigc + logger.Info("Received signal, shutting down.", "signal", sig) + plugin.Stop() + return nil + } + cmd.AddCommand(kubeletPlugin) + + // SetUsageAndHelpFunc takes care of flag grouping. However, + // it doesn't support listing child commands. We add those + // to cmd.Use. + cols, _, _ := term.TerminalSize(cmd.OutOrStdout()) + cliflag.SetUsageAndHelpFunc(cmd, sharedFlagSets, cols) + var children []string + for _, child := range cmd.Commands() { + children = append(children, child.Use) + } + cmd.Use += " [shared flags] " + strings.Join(children, "|") + cliflag.SetUsageAndHelpFunc(controller, controllerFlagSets, cols) + cliflag.SetUsageAndHelpFunc(kubeletPlugin, kubeletPluginFlagSets, cols) + + return cmd +} diff --git a/test/e2e/dra/test-driver/deploy/example/broken-resourceclass.yaml b/test/e2e/dra/test-driver/deploy/example/broken-resourceclass.yaml new file mode 100644 index 00000000000..f4ecb6f2eb0 --- /dev/null +++ b/test/e2e/dra/test-driver/deploy/example/broken-resourceclass.yaml @@ -0,0 +1,14 @@ +# This storage class intentionally doesn't match any nodes. +# When using it instead of a functional one, scheduling a pod leads to: +# Warning FailedScheduling 16s default-scheduler 0/1 nodes are available: 1 excluded via potential node filter in resource class. + +apiVersion: resource.k8s.io/v1alpha1 +kind: ResourceClass +metadata: + name: example +driverName: test-driver.cdi.k8s.io +suitableNodes: + nodeSelectorTerms: + - matchExpressions: + - key: no-such-label + operator: Exists diff --git a/test/e2e/dra/test-driver/deploy/example/pod-external.yaml b/test/e2e/dra/test-driver/deploy/example/pod-external.yaml new file mode 100644 index 00000000000..4aea64fd0df --- /dev/null +++ b/test/e2e/dra/test-driver/deploy/example/pod-external.yaml @@ -0,0 +1,40 @@ +# One external resource claim, one pod, two containers. +# One container uses resource, one does not. +apiVersion: v1 +kind: ConfigMap +metadata: + name: external-claim-parameters + namespace: default +data: + a: b +--- +apiVersion: resource.k8s.io/v1alpha1 +kind: ResourceClaim +metadata: + name: external-claim +spec: + resourceClassName: example + parametersRef: + kind: ConfigMap + name: external-claim-parameters +--- +apiVersion: v1 +kind: Pod +metadata: + name: test-external-claim +spec: + restartPolicy: Never + containers: + - name: with-resource + image: registry.k8s.io/e2e-test-images/busybox:1.29-2 + command: ["sh", "-c", "set && mount && ls -la /dev/"] + resources: + claims: + - resource + - name: without-resource + image: registry.k8s.io/e2e-test-images/busybox:1.29-2 + command: ["sh", "-c", "set && mount && ls -la /dev/"] + resourceClaims: + - name: resource + source: + resourceClaimName: external-claim diff --git a/test/e2e/dra/test-driver/deploy/example/pod-inline-multiple.yaml b/test/e2e/dra/test-driver/deploy/example/pod-inline-multiple.yaml new file mode 100644 index 00000000000..3a49c9a7de0 --- /dev/null +++ b/test/e2e/dra/test-driver/deploy/example/pod-inline-multiple.yaml @@ -0,0 +1,48 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: pause-claim-parameters + namespace: default +data: + a: b +--- +apiVersion: resource.k8s.io/v1alpha1 +kind: ResourceClaimTemplate +metadata: + name: pause-template + namespace: default +spec: + metadata: + labels: + app: inline-resource + spec: + resourceClassName: example + parametersRef: + kind: ConfigMap + name: pause-claim-parameters +--- +apiVersion: v1 +kind: Pod +metadata: + name: pause + labels: + name: pause +spec: + containers: + - name: pause1 + image: "k8s.gcr.io/pause:3.6" + resources: + claims: + - name: resource1 + - name: pause2 + image: "k8s.gcr.io/pause:3.6" + resources: + claims: + - name: resource2 + resourceClaims: + - name: resource1 + source: + resourceClaimTemplateName: pause-template + - name: resource2 + source: + resourceClaimTemplateName: pause-template diff --git a/test/e2e/dra/test-driver/deploy/example/pod-inline.yaml b/test/e2e/dra/test-driver/deploy/example/pod-inline.yaml new file mode 100644 index 00000000000..d1145e3dae6 --- /dev/null +++ b/test/e2e/dra/test-driver/deploy/example/pod-inline.yaml @@ -0,0 +1,45 @@ +# One inline resource claim, one pod, two containers. +# One container uses resource, one does not. +apiVersion: v1 +kind: ConfigMap +metadata: + name: inline-claim-parameters + namespace: default +data: + a: b +--- +apiVersion: resource.k8s.io/v1alpha1 +kind: ResourceClaimTemplate +metadata: + name: test-inline-claim-template + namespace: default +spec: + metadata: + labels: + app: inline-resource + spec: + resourceClassName: example + parametersRef: + kind: ConfigMap + name: pause-claim-parameters +--- +apiVersion: v1 +kind: Pod +metadata: + name: test-inline-claim +spec: + restartPolicy: Never + containers: + - name: with-resource + image: registry.k8s.io/e2e-test-images/busybox:1.29-2 + command: ["sh", "-c", "set && mount && ls -la /dev/"] + resources: + claims: + - name: resource + - name: without-resource + image: registry.k8s.io/e2e-test-images/busybox:1.29-2 + command: ["sh", "-c", "set && mount && ls -la /dev/"] + resourceClaims: + - name: resource + source: + resourceClaimTemplateName: test-inline-claim-template diff --git a/test/e2e/dra/test-driver/deploy/example/pod-shared.yaml b/test/e2e/dra/test-driver/deploy/example/pod-shared.yaml new file mode 100644 index 00000000000..b32a0e40381 --- /dev/null +++ b/test/e2e/dra/test-driver/deploy/example/pod-shared.yaml @@ -0,0 +1,61 @@ +# One external resource claim, two pods, two containers in each pod. +# Pods share the same resource. +# One container uses resource, one does not. +apiVersion: v1 +kind: ConfigMap +metadata: + name: shared-claim-parameters +data: + a: b +--- +apiVersion: resource.k8s.io/v1alpha1 +kind: ResourceClaim +metadata: + name: shared-claim +spec: + resourceClassName: example + parametersRef: + kind: ConfigMap + name: shared-claim-parameters +--- +apiVersion: v1 +kind: Pod +metadata: + name: test-shared-claim +spec: + restartPolicy: Never + containers: + - name: with-resource + image: registry.k8s.io/e2e-test-images/busybox:1.29-2 + command: ["sh", "-c", "set && mount && ls -la /dev/"] + resources: + claims: + - name: resource + - name: without-resource + image: registry.k8s.io/e2e-test-images/busybox:1.29-2 + command: ["sh", "-c", "set && mount && ls -la /dev/"] + resourceClaims: + - name: resource + source: + resourceClaimName: shared-claim +--- +apiVersion: v1 +kind: Pod +metadata: + name: test-shared-claim-2 +spec: + restartPolicy: Never + containers: + - name: with-resource + image: registry.k8s.io/e2e-test-images/busybox:1.29-2 + command: ["sh", "-c", "set && mount && ls -la /dev/"] + resources: + claims: + - name: resource + - name: without-resource + image: registry.k8s.io/e2e-test-images/busybox:1.29-2 + command: ["sh", "-c", "set && mount && ls -la /dev/"] + resourceClaims: + - name: resource + source: + resourceClaimName: shared-claim diff --git a/test/e2e/dra/test-driver/deploy/example/resourceclaim.yaml b/test/e2e/dra/test-driver/deploy/example/resourceclaim.yaml new file mode 100644 index 00000000000..ce37440738d --- /dev/null +++ b/test/e2e/dra/test-driver/deploy/example/resourceclaim.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: example-claim-parameters + namespace: default +data: + a: b +--- +apiVersion: resource.k8s.io/v1alpha1 +kind: ResourceClaim +metadata: + name: example + namespace: default +spec: + allocationMode: Immediate + resourceClassName: example + parametersRef: + kind: ConfigMap + name: example-claim-parameters diff --git a/test/e2e/dra/test-driver/deploy/example/resourceclass.yaml b/test/e2e/dra/test-driver/deploy/example/resourceclass.yaml new file mode 100644 index 00000000000..42d5c2c9689 --- /dev/null +++ b/test/e2e/dra/test-driver/deploy/example/resourceclass.yaml @@ -0,0 +1,7 @@ +apiVersion: resource.k8s.io/v1alpha1 +kind: ResourceClass +metadata: + name: example +driverName: test-driver.cdi.k8s.io +# TODO: +# parameters diff --git a/test/e2e/dra/test-driver/dra-test-driver.go b/test/e2e/dra/test-driver/dra-test-driver.go new file mode 100644 index 00000000000..ad93cc84b1c --- /dev/null +++ b/test/e2e/dra/test-driver/dra-test-driver.go @@ -0,0 +1,35 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "os" + + "k8s.io/component-base/cli" + _ "k8s.io/component-base/logs/json/register" // for JSON log output support + _ "k8s.io/component-base/metrics/prometheus/clientgo/leaderelection" // register leader election in the default legacy registry + _ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration + _ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration + _ "k8s.io/component-base/metrics/prometheus/workqueue" // register work queues in the default legacy registry + "k8s.io/kubernetes/test/e2e/dra/test-driver/app" +) + +func main() { + command := app.NewCommand() + code := cli.Run(command) + os.Exit(code) +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 93032c185b0..1b124cc831f 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -52,6 +52,7 @@ import ( _ "k8s.io/kubernetes/test/e2e/autoscaling" _ "k8s.io/kubernetes/test/e2e/cloud" _ "k8s.io/kubernetes/test/e2e/common" + _ "k8s.io/kubernetes/test/e2e/dra" _ "k8s.io/kubernetes/test/e2e/instrumentation" _ "k8s.io/kubernetes/test/e2e/kubectl" _ "k8s.io/kubernetes/test/e2e/lifecycle" diff --git a/test/e2e/testing-manifests/dra/OWNERS b/test/e2e/testing-manifests/dra/OWNERS new file mode 100644 index 00000000000..03cc9aff5b4 --- /dev/null +++ b/test/e2e/testing-manifests/dra/OWNERS @@ -0,0 +1,11 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - klueska + - pohly +reviewers: + - klueska + - pohly + - bart0sh +labels: + - sig/node diff --git a/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml b/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml new file mode 100644 index 00000000000..377bb2e587b --- /dev/null +++ b/test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml @@ -0,0 +1,85 @@ +# This YAML file deploys the csi-driver-host-path on a number of nodes such +# that it proxies all connections from kubelet (plugin registration and dynamic +# resource allocation). The actual handling of those connections then happens +# inside the e2e.test binary via test/e2e/storage/drivers/proxy. This approach +# has the advantage that no separate container image with the test driver is +# needed and that tests have full control over the driver, for example for +# error injection. +# +# The csi-driver-host-path image is used because: +# - it has the necessary proxy mode (https://github.com/kubernetes-csi/csi-driver-host-path/commit/65480fc74d550a9a5aa81e850955cc20403857b1) +# - its base image contains a shell (useful for creating files) +# - the image is already a dependency of e2e.test + +kind: ReplicaSet +apiVersion: apps/v1 +metadata: + name: dra-test-driver + labels: + app.kubernetes.io/instance: test-driver.dra.k8s.io + app.kubernetes.io/part-of: dra-test-driver + app.kubernetes.io/name: dra-test-driver-kubelet-plugin + app.kubernetes.io/component: kubelet-plugin +spec: + selector: + matchLabels: + app.kubernetes.io/instance: test-driver.dra.k8s.io + app.kubernetes.io/part-of: dra-test-driver + app.kubernetes.io/name: dra-test-driver-kubelet-plugin + app.kubernetes.io/component: kubelet-plugin + replicas: 1 + template: + metadata: + labels: + app.kubernetes.io/instance: test-driver.dra.k8s.io + app.kubernetes.io/part-of: dra-test-driver + app.kubernetes.io/name: dra-test-driver-kubelet-plugin + app.kubernetes.io/component: kubelet-plugin + spec: + # Ensure that all pods run on distinct nodes. + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchLabels: + app.kubernetes.io/instance: test-driver.dra.k8s.io + topologyKey: kubernetes.io/hostname + + containers: + - name: registrar + image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 + args: + - "--v=5" + - "--endpoint=/plugins_registry/dra-test-driver-reg.sock" + - "--proxy-endpoint=tcp://:9000" + volumeMounts: + - mountPath: /plugins_registry + name: registration-dir + + - name: plugin + image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3 + args: + - "--v=5" + - "--endpoint=/dra/dra-test-driver.sock" + - "--proxy-endpoint=tcp://:9001" + securityContext: + privileged: true + volumeMounts: + - mountPath: /dra + name: socket-dir + - mountPath: /cdi + name: cdi-dir + + volumes: + - hostPath: + path: /var/lib/kubelet/plugins + type: DirectoryOrCreate + name: socket-dir + - hostPath: + path: /var/run/cdi + type: DirectoryOrCreate + name: cdi-dir + - hostPath: + path: /var/lib/kubelet/plugins_registry + type: DirectoryOrCreate + name: registration-dir diff --git a/vendor/modules.txt b/vendor/modules.txt index 883b8869ccb..4a732e228d6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2055,6 +2055,9 @@ k8s.io/csi-translation-lib k8s.io/csi-translation-lib/plugins # k8s.io/dynamic-resource-allocation v0.0.0 => ./staging/src/k8s.io/dynamic-resource-allocation ## explicit; go 1.19 +k8s.io/dynamic-resource-allocation/controller +k8s.io/dynamic-resource-allocation/kubeletplugin +k8s.io/dynamic-resource-allocation/leaderelection k8s.io/dynamic-resource-allocation/resourceclaim # k8s.io/gengo v0.0.0-20220902162205-c0856e24416d ## explicit; go 1.13