e2e dra: add test driver and tests for dynamic resource allocation

The driver can be used manually against a cluster started with
local-up-cluster.sh and is also used for E2E testing. Because the tests proxy
connections from the nodes into the e2e.test binary and create/delete files via
the equivalent of "kubectl exec dd/rm", they can be run against arbitrary
clusters. Each test gets its own driver instance and resource class, therefore
they can run in parallel.
This commit is contained in:
Patrick Ohly 2022-08-15 13:43:28 +02:00
parent b2c39798f4
commit 14db9d1f92
21 changed files with 2597 additions and 0 deletions

11
test/e2e/dra/OWNERS Normal file
View File

@ -0,0 +1,11 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- klueska
- pohly
reviewers:
- klueska
- pohly
- bart0sh
labels:
- sig/node

12
test/e2e/dra/README.md Normal file
View File

@ -0,0 +1,12 @@
The tests in this directory cover dynamic resource allocation support in
Kubernetes. They do not test the correct behavior of arbitrary dynamic resource
allocation drivers.
If such a driver is needed, then the in-tree test/e2e/dra/test-driver is used,
with a slight twist: instead of deploying that driver directly in the cluster,
the necessary sockets for interaction with kubelet (registration and dynamic
resource allocation) get proxied into the e2e.test binary. This reuses the work
done for CSI mock testing. The advantage is that no separate images are needed
for the test driver and that the e2e test has full control over all gRPC calls,
in case that it needs that for operations like error injection or checking
calls.

339
test/e2e/dra/deploy.go Normal file
View File

@ -0,0 +1,339 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dra
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"path"
"sort"
"sync"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"google.golang.org/grpc"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"
"k8s.io/kubernetes/test/e2e/dra/test-driver/app"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2ereplicaset "k8s.io/kubernetes/test/e2e/framework/replicaset"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
"k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
"k8s.io/kubernetes/test/e2e/storage/utils"
)
const (
NodePrepareResourceMethod = "/v1alpha1.Node/NodePrepareResource"
NodeUnprepareResourceMethod = "/v1alpha1.Node/NodeUnprepareResource"
)
type Nodes struct {
NodeNames []string
}
// NewNodes selects nodes to run the test on.
func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes {
nodes := &Nodes{}
ginkgo.BeforeEach(func() {
ginkgo.By("selecting nodes")
// The kubelet plugin is harder. We deploy the builtin manifest
// after patching in the driver name and all nodes on which we
// want the plugin to run.
//
// Only a subset of the nodes are picked to avoid causing
// unnecessary load on a big cluster.
nodeList, err := e2enode.GetBoundedReadySchedulableNodes(f.ClientSet, maxNodes)
framework.ExpectNoError(err, "get nodes")
numNodes := int32(len(nodeList.Items))
if int(numNodes) < minNodes {
e2eskipper.Skipf("%d ready nodes required, only have %d", minNodes, numNodes)
}
nodes.NodeNames = nil
for _, node := range nodeList.Items {
nodes.NodeNames = append(nodes.NodeNames, node.Name)
}
framework.Logf("testing on nodes %v", nodes.NodeNames)
})
return nodes
}
// NewDriver sets up controller (as client of the cluster) and
// kubelet plugin (via proxy) before the test runs. It cleans
// up after the test.
func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() app.Resources) *Driver {
d := &Driver{
f: f,
fail: map[MethodInstance]bool{},
callCounts: map[MethodInstance]int64{},
}
ginkgo.BeforeEach(func() {
resources := configureResources()
if len(resources.Nodes) == 0 {
// This always has to be set because the driver might
// not run on all nodes.
resources.Nodes = nodes.NodeNames
}
d.SetUp(nodes, resources)
ginkgo.DeferCleanup(d.TearDown)
})
return d
}
type MethodInstance struct {
Nodename string
FullMethod string
}
type Driver struct {
f *framework.Framework
ctx context.Context
cleanup []func() // executed first-in-first-out
wg sync.WaitGroup
NameSuffix string
Controller *app.ExampleController
Name string
Nodes map[string]*app.ExamplePlugin
mutex sync.Mutex
fail map[MethodInstance]bool
callCounts map[MethodInstance]int64
}
func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
ginkgo.By(fmt.Sprintf("deploying driver on nodes %v", nodes.NodeNames))
d.Nodes = map[string]*app.ExamplePlugin{}
d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io"
ctx, cancel := context.WithCancel(context.Background())
if d.NameSuffix != "" {
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "instance"+d.NameSuffix)
ctx = klog.NewContext(ctx, logger)
}
d.ctx = ctx
d.cleanup = append(d.cleanup, cancel)
// The controller is easy: we simply connect to the API server. It
// would be slightly nicer if we had a way to wait for all goroutines, but
// SharedInformerFactory has no API for that. At least we can wait
// for our own goroutine to stop once the context gets cancelled.
d.Controller = app.NewController(d.f.ClientSet, d.Name, resources)
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.Controller.Run(d.ctx, 5 /* workers */)
}()
manifests := []string{
// The code below matches the content of this manifest (ports,
// container names, etc.).
"test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml",
}
instanceKey := "app.kubernetes.io/instance"
rsName := ""
draAddr := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name+".sock")
numNodes := int32(len(nodes.NodeNames))
undeploy, err := utils.CreateFromManifests(d.f, d.f.Namespace, func(item interface{}) error {
switch item := item.(type) {
case *appsv1.ReplicaSet:
item.Name += d.NameSuffix
rsName = item.Name
item.Spec.Replicas = &numNodes
item.Spec.Selector.MatchLabels[instanceKey] = d.Name
item.Spec.Template.Labels[instanceKey] = d.Name
item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = d.Name
item.Spec.Template.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
Values: nodes.NodeNames,
},
},
},
},
},
}
item.Spec.Template.Spec.Volumes[0].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins")
item.Spec.Template.Spec.Volumes[2].HostPath.Path = path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint=/plugins_registry/"+d.Name+"-reg.sock")
item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint=/dra/"+d.Name+".sock")
}
return nil
}, manifests...)
framework.ExpectNoError(err, "deploy kubelet plugin replicaset")
d.cleanup = append(d.cleanup, undeploy)
rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{})
framework.ExpectNoError(err, "get replicaset")
// Wait for all pods to be running.
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(d.f.ClientSet, rs, numNodes); err != nil {
framework.ExpectNoError(err, "all kubelet plugin proxies running")
}
requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{d.Name})
framework.ExpectNoError(err, "create label selector requirement")
selector := labels.NewSelector().Add(*requirement)
pods, err := d.f.ClientSet.CoreV1().Pods(d.f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
framework.ExpectNoError(err, "list proxy pods")
framework.ExpectEqual(numNodes, int32(len(pods.Items)), "number of proxy pods")
// Run registar and plugin for each of the pods.
for _, pod := range pods.Items {
// Need a local variable, not the loop variable, for the anonymous
// callback functions below.
pod := pod
nodename := pod.Spec.NodeName
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
plugin, err := app.StartPlugin(logger, "/cdi", d.Name, nodename,
app.FileOperations{
Create: func(name string, content []byte) error {
ginkgo.By(fmt.Sprintf("creating CDI file %s on node %s:\n%s", name, nodename, string(content)))
return d.createFile(&pod, name, content)
},
Remove: func(name string) error {
ginkgo.By(fmt.Sprintf("deleting CDI file %s on node %s", name, nodename))
return d.removeFile(&pod, name)
},
},
kubeletplugin.GRPCVerbosity(0),
kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
return d.interceptor(nodename, ctx, req, info, handler)
}),
kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)),
kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)),
kubeletplugin.KubeletPluginSocketPath(draAddr),
)
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
d.cleanup = append(d.cleanup, func() {
// Depends on cancel being called first.
plugin.Stop()
})
d.Nodes[nodename] = plugin
}
// Wait for registration.
ginkgo.By("wait for plugin registration")
gomega.Eventually(func() []string {
var notRegistered []string
for nodename, plugin := range d.Nodes {
if !plugin.IsRegistered() {
notRegistered = append(notRegistered, nodename)
}
}
sort.Strings(notRegistered)
return notRegistered
}).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "hosts where the plugin has not been registered yet")
}
func (d *Driver) createFile(pod *v1.Pod, name string, content []byte) error {
buffer := bytes.NewBuffer(content)
// Writing the content can be slow. Better create a temporary file and
// move it to the final destination once it is complete.
tmpName := name + ".tmp"
if err := d.podIO(pod).CreateFile(tmpName, buffer); err != nil {
_ = d.podIO(pod).RemoveAll(tmpName)
return err
}
return d.podIO(pod).Rename(tmpName, name)
}
func (d *Driver) removeFile(pod *v1.Pod, name string) error {
return d.podIO(pod).RemoveAll(name)
}
func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO {
logger := klog.Background()
return proxy.PodDirIO{
F: d.f,
Namespace: pod.Namespace,
PodName: pod.Name,
ContainerName: "plugin",
Logger: &logger,
}
}
func listen(ctx context.Context, f *framework.Framework, podName, containerName string, port int) net.Listener {
addr := proxy.Addr{
Namespace: f.Namespace.Name,
PodName: podName,
ContainerName: containerName,
Port: port,
}
listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr)
framework.ExpectNoError(err, "listen for connections from %+v", addr)
return listener
}
func (d *Driver) TearDown() {
for _, c := range d.cleanup {
c()
}
d.cleanup = nil
d.wg.Wait()
}
func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
d.mutex.Lock()
defer d.mutex.Unlock()
m := MethodInstance{nodename, info.FullMethod}
d.callCounts[m]++
if d.fail[m] {
return nil, errors.New("injected error")
}
return handler(ctx, req)
}
func (d *Driver) Fail(m MethodInstance, injectError bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.fail[m] = injectError
}
func (d *Driver) CallCount(m MethodInstance) int64 {
d.mutex.Lock()
defer d.mutex.Unlock()
return d.callCounts[m]
}
func (d *Driver) Nodenames() (nodenames []string) {
for nodename := range d.Nodes {
nodenames = append(nodenames, nodename)
}
sort.Strings(nodenames)
return
}

835
test/e2e/dra/dra.go Normal file
View File

@ -0,0 +1,835 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dra
import (
"context"
"errors"
"fmt"
"strings"
"sync/atomic"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
resourcev1alpha1 "k8s.io/api/resource/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/kubernetes/test/e2e/dra/test-driver/app"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
admissionapi "k8s.io/pod-security-admission/api"
)
const (
// podStartTimeout is how long to wait for the pod to be started.
podStartTimeout = 5 * time.Minute
)
func networkResources() app.Resources {
return app.Resources{
Shareable: true,
}
}
var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", func() {
f := framework.NewDefaultFramework("dra")
ctx := context.Background()
// The driver containers have to run with sufficient privileges to
// modify /var/lib/kubelet/plugins.
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged
ginkgo.Context("kubelet", func() {
nodes := NewNodes(f, 1, 1)
driver := NewDriver(f, nodes, networkResources) // All tests get their own driver instance.
b := newBuilder(f, driver)
ginkgo.It("registers plugin", func() {
ginkgo.By("the driver is running")
})
// This test does not pass at the moment because kubelet doesn't retry.
ginkgo.It("must retry NodePrepareResource", func() {
// We have exactly one host.
m := MethodInstance{driver.Nodenames()[0], NodePrepareResourceMethod}
driver.Fail(m, true)
ginkgo.By("waiting for container startup to fail")
parameters := b.parameters()
pod, template := b.podInline(resourcev1alpha1.AllocationModeWaitForFirstConsumer)
b.create(ctx, parameters, pod, template)
ginkgo.By("wait for NodePrepareResource call")
gomega.Eventually(func() error {
if driver.CallCount(m) == 0 {
return errors.New("NodePrepareResource not called yet")
}
return nil
}).WithTimeout(podStartTimeout).Should(gomega.Succeed())
ginkgo.By("allowing container startup to succeed")
callCount := driver.CallCount(m)
driver.Fail(m, false)
err := e2epod.WaitForPodNameRunningInNamespace(f.ClientSet, pod.Name, pod.Namespace)
framework.ExpectNoError(err, "start pod with inline resource claim")
if driver.CallCount(m) == callCount {
framework.Fail("NodePrepareResource should have been called again")
}
})
ginkgo.It("must not run a pod if a claim is not reserved for it", func() {
parameters := b.parameters()
claim := b.externalClaim(resourcev1alpha1.AllocationModeImmediate)
pod := b.podExternal()
// This bypasses scheduling and therefore the pod gets
// to run on the node although it never gets added to
// the `ReservedFor` field of the claim.
pod.Spec.NodeName = nodes.NodeNames[0]
b.create(ctx, parameters, claim, pod)
gomega.Consistently(func() error {
testPod, err := b.f.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("expected the test pod %s to exist: %v", pod.Name, err)
}
if testPod.Status.Phase != v1.PodPending {
return fmt.Errorf("pod %s: unexpected status %s, expected status: %s", pod.Name, testPod.Status.Phase, v1.PodPending)
}
return nil
}, 20*time.Second, 200*time.Millisecond).Should(gomega.BeNil())
})
ginkgo.It("must unprepare resources for force-deleted pod", func() {
parameters := b.parameters()
claim := b.externalClaim(resourcev1alpha1.AllocationModeImmediate)
pod := b.podExternal()
zero := int64(0)
pod.Spec.TerminationGracePeriodSeconds = &zero
b.create(ctx, parameters, claim, pod)
b.testPod(f.ClientSet, pod)
ginkgo.By(fmt.Sprintf("force delete test pod %s", pod.Name))
err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero})
if !apierrors.IsNotFound(err) {
framework.ExpectNoError(err, "force delete test pod")
}
for host, plugin := range b.driver.Nodes {
ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host))
gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host)
}
})
})
ginkgo.Context("driver", func() {
nodes := NewNodes(f, 1, 1)
driver := NewDriver(f, nodes, networkResources) // All tests get their own driver instance.
b := newBuilder(f, driver)
// We need the parameters name *before* creating it.
b.parametersCounter = 1
b.classParametersName = b.parametersName()
ginkgo.It("supports claim and class parameters", func() {
classParameters := b.parameters("x", "y")
claimParameters := b.parameters()
pod, template := b.podInline(resourcev1alpha1.AllocationModeWaitForFirstConsumer)
b.create(ctx, classParameters, claimParameters, pod, template)
b.testPod(f.ClientSet, pod, "user_a", "b", "admin_x", "y")
})
})
ginkgo.Context("cluster", func() {
nodes := NewNodes(f, 1, 4)
driver := NewDriver(f, nodes, networkResources)
b := newBuilder(f, driver)
// claimTests tries out several different combinations of pods with
// claims, both inline and external.
claimTests := func(allocationMode resourcev1alpha1.AllocationMode) {
ginkgo.It("supports simple pod referencing inline resource claim", func() {
parameters := b.parameters()
pod, template := b.podInline(allocationMode)
b.create(ctx, parameters, pod, template)
b.testPod(f.ClientSet, pod)
})
ginkgo.It("supports inline claim referenced by multiple containers", func() {
parameters := b.parameters()
pod, template := b.podInlineMultiple(allocationMode)
b.create(ctx, parameters, pod, template)
b.testPod(f.ClientSet, pod)
})
ginkgo.It("supports simple pod referencing external resource claim", func() {
parameters := b.parameters()
pod := b.podExternal()
b.create(ctx, parameters, b.externalClaim(allocationMode), pod)
b.testPod(f.ClientSet, pod)
})
ginkgo.It("supports external claim referenced by multiple pods", func() {
parameters := b.parameters()
pod1 := b.podExternal()
pod2 := b.podExternal()
pod3 := b.podExternal()
claim := b.externalClaim(allocationMode)
b.create(ctx, parameters, claim, pod1, pod2, pod3)
for _, pod := range []*v1.Pod{pod1, pod2, pod3} {
b.testPod(f.ClientSet, pod)
}
})
ginkgo.It("supports external claim referenced by multiple containers of multiple pods", func() {
parameters := b.parameters()
pod1 := b.podExternalMultiple()
pod2 := b.podExternalMultiple()
pod3 := b.podExternalMultiple()
claim := b.externalClaim(allocationMode)
b.create(ctx, parameters, claim, pod1, pod2, pod3)
for _, pod := range []*v1.Pod{pod1, pod2, pod3} {
b.testPod(f.ClientSet, pod)
}
})
ginkgo.It("supports init containers", func() {
parameters := b.parameters()
pod, template := b.podInline(allocationMode)
pod.Spec.InitContainers = []v1.Container{pod.Spec.Containers[0]}
pod.Spec.InitContainers[0].Name += "-init"
// This must succeed for the pod to start.
pod.Spec.InitContainers[0].Command = []string{"sh", "-c", "env | grep user_a=b"}
b.create(ctx, parameters, pod, template)
b.testPod(f.ClientSet, pod)
})
}
ginkgo.Context("with delayed allocation", func() {
claimTests(resourcev1alpha1.AllocationModeWaitForFirstConsumer)
})
ginkgo.Context("with immediate allocation", func() {
claimTests(resourcev1alpha1.AllocationModeImmediate)
})
})
ginkgo.Context("multiple nodes", func() {
nodes := NewNodes(f, 2, 8)
ginkgo.Context("with network-attached resources", func() {
driver := NewDriver(f, nodes, networkResources)
b := newBuilder(f, driver)
ginkgo.It("schedules onto different nodes", func() {
parameters := b.parameters()
label := "app.kubernetes.io/instance"
instance := f.UniqueName + "-test-app"
antiAffinity := &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
TopologyKey: "kubernetes.io/hostname",
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
label: instance,
},
},
},
},
},
}
createPod := func() *v1.Pod {
pod := b.podExternal()
pod.Labels[label] = instance
pod.Spec.Affinity = antiAffinity
return pod
}
pod1 := createPod()
pod2 := createPod()
claim := b.externalClaim(resourcev1alpha1.AllocationModeWaitForFirstConsumer)
b.create(ctx, parameters, claim, pod1, pod2)
for _, pod := range []*v1.Pod{pod1, pod2} {
err := e2epod.WaitForPodRunningInNamespace(f.ClientSet, pod)
framework.ExpectNoError(err, "start pod")
}
})
})
ginkgo.Context("with node-local resources", func() {
driver := NewDriver(f, nodes, func() app.Resources {
return app.Resources{
NodeLocal: true,
MaxAllocations: 1,
Nodes: nodes.NodeNames,
}
})
b := newBuilder(f, driver)
tests := func(allocationMode resourcev1alpha1.AllocationMode) {
ginkgo.It("uses all resources", func() {
var objs = []klog.KMetadata{
b.parameters(),
}
var pods []*v1.Pod
for i := 0; i < len(nodes.NodeNames); i++ {
pod, template := b.podInline(allocationMode)
pods = append(pods, pod)
objs = append(objs, pod, template)
}
b.create(ctx, objs...)
for _, pod := range pods {
err := e2epod.WaitForPodRunningInNamespace(f.ClientSet, pod)
framework.ExpectNoError(err, "start pod")
}
// The pods all should run on different
// nodes because the maximum number of
// claims per node was limited to 1 for
// this test.
//
// We cannot know for sure why the pods
// ran on two different nodes (could
// also be a coincidence) but if they
// don't cover all nodes, then we have
// a problem.
used := make(map[string]*v1.Pod)
for _, pod := range pods {
pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "get pod")
nodeName := pod.Spec.NodeName
if other, ok := used[nodeName]; ok {
framework.Failf("Pod %s got started on the same node %s as pod %s although claim allocation should have been limited to one claim per node.", pod.Name, nodeName, other.Name)
}
used[nodeName] = pod
}
})
}
ginkgo.Context("with delayed allocation", func() {
tests(resourcev1alpha1.AllocationModeWaitForFirstConsumer)
})
ginkgo.Context("with immediate allocation", func() {
tests(resourcev1alpha1.AllocationModeImmediate)
})
})
ginkgo.Context("reallocation", func() {
var allocateWrapper app.AllocateWrapperType
driver := NewDriver(f, nodes, func() app.Resources {
return app.Resources{
NodeLocal: true,
MaxAllocations: 2,
Nodes: nodes.NodeNames,
AllocateWrapper: func(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{}, class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string,
handler func(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{}, class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha1.AllocationResult, err error)) (result *resourcev1alpha1.AllocationResult, err error) {
return allocateWrapper(ctx, claim, claimParameters, class, classParameters, selectedNode, handler)
},
}
})
b := newBuilder(f, driver)
ginkgo.It("works", func() {
// A pod with two claims can run on a node, but
// only if allocation of both succeeds. This
// tests simulates the scenario where one claim
// gets allocated but the second doesn't
// because of a race with some other pod.
//
// To ensure the right timing, allocation of the second
// claim gets delayed while creating another pod
// that gets the remaining resource on the node.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
parameters := b.parameters()
claim1 := b.externalClaim(resourcev1alpha1.AllocationModeWaitForFirstConsumer)
claim2 := b.externalClaim(resourcev1alpha1.AllocationModeWaitForFirstConsumer)
pod1 := b.podExternal()
pod1.Spec.ResourceClaims = append(pod1.Spec.ResourceClaims,
v1.PodResourceClaim{
Name: "claim2",
Source: v1.ClaimSource{
ResourceClaimName: &claim2.Name,
},
},
)
// Block on the second external claim that is to be allocated.
blockClaim, cancelBlockClaim := context.WithCancel(ctx)
defer cancelBlockClaim()
var allocated int32
allocateWrapper = func(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{},
class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string,
handler func(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{},
class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha1.AllocationResult, err error),
) (result *resourcev1alpha1.AllocationResult, err error) {
oldAllocated := atomic.AddInt32(&allocated, 0)
if oldAllocated == 1 && strings.HasPrefix(claim.Name, "external-claim") {
<-blockClaim.Done()
}
result, err = handler(ctx, claim, claimParameters, class, classParameters, selectedNode)
if err == nil {
atomic.AddInt32(&allocated, 1)
}
return
}
b.create(ctx, parameters, claim1, claim2, pod1)
ginkgo.By("waiting for one claim to be allocated")
var nodeSelector *v1.NodeSelector
gomega.Eventually(func() (int, error) {
claims, err := f.ClientSet.ResourceV1alpha1().ResourceClaims(f.Namespace.Name).List(ctx, metav1.ListOptions{})
if err != nil {
return 0, err
}
allocated := 0
for _, claim := range claims.Items {
if claim.Status.Allocation != nil {
allocated++
nodeSelector = claim.Status.Allocation.AvailableOnNodes
}
}
return allocated, nil
}).WithTimeout(time.Minute).Should(gomega.Equal(1), "one claim allocated")
// Now create a second pod which we force to
// run on the same node that is currently being
// considered for the first one. We know what
// the node selector looks like and can
// directly access the key and value from it.
ginkgo.By(fmt.Sprintf("create second pod on the same node %s", nodeSelector))
pod2, template2 := b.podInline(resourcev1alpha1.AllocationModeWaitForFirstConsumer)
req := nodeSelector.NodeSelectorTerms[0].MatchExpressions[0]
node := req.Values[0]
pod2.Spec.NodeSelector = map[string]string{req.Key: node}
b.create(ctx, pod2, template2)
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(f.ClientSet, pod2), "start pod 2")
// Allow allocation of claim2 to proceed. It should fail now
// and the other node must be used instead, after deallocating
// the first claim.
ginkgo.By("move first pod to other node")
cancelBlockClaim()
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(f.ClientSet, pod1), "start pod 1")
pod1, err := f.ClientSet.CoreV1().Pods(pod1.Namespace).Get(ctx, pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "get first pod")
if pod1.Spec.NodeName == "" {
framework.Fail("first pod should be running on node, was not scheduled")
}
framework.ExpectNotEqual(pod1.Spec.NodeName, node, "first pod should run on different node than second one")
framework.ExpectEqual(driver.Controller.GetNumDeallocations(), int64(1), "number of deallocations")
})
})
})
ginkgo.Context("multiple drivers", func() {
nodes := NewNodes(f, 1, 4)
driver1 := NewDriver(f, nodes, func() app.Resources {
return app.Resources{
NodeLocal: true,
MaxAllocations: 1,
Nodes: nodes.NodeNames,
}
})
b1 := newBuilder(f, driver1)
driver2 := NewDriver(f, nodes, func() app.Resources {
return app.Resources{
NodeLocal: true,
MaxAllocations: 1,
Nodes: nodes.NodeNames,
}
})
driver2.NameSuffix = "-other"
b2 := newBuilder(f, driver2)
ginkgo.It("work", func() {
parameters1 := b1.parameters()
parameters2 := b2.parameters()
claim1 := b1.externalClaim(resourcev1alpha1.AllocationModeWaitForFirstConsumer)
claim2 := b2.externalClaim(resourcev1alpha1.AllocationModeWaitForFirstConsumer)
pod := b1.podExternal()
pod.Spec.ResourceClaims = append(pod.Spec.ResourceClaims,
v1.PodResourceClaim{
Name: "claim2",
Source: v1.ClaimSource{
ResourceClaimName: &claim2.Name,
},
},
)
b1.create(ctx, parameters1, parameters2, claim1, claim2, pod)
b1.testPod(f.ClientSet, pod)
})
})
})
// builder contains a running counter to make objects unique within thir
// namespace.
type builder struct {
f *framework.Framework
driver *Driver
podCounter int
parametersCounter int
claimCounter int
classParametersName string
}
// className returns the default resource class name.
func (b *builder) className() string {
return b.f.UniqueName + b.driver.NameSuffix + "-class"
}
// class returns the resource class that the builder's other objects
// reference.
func (b *builder) class() *resourcev1alpha1.ResourceClass {
class := &resourcev1alpha1.ResourceClass{
ObjectMeta: metav1.ObjectMeta{
Name: b.className(),
},
DriverName: b.driver.Name,
SuitableNodes: b.nodeSelector(),
}
if b.classParametersName != "" {
class.ParametersRef = &resourcev1alpha1.ResourceClassParametersReference{
Kind: "ConfigMap",
Name: b.classParametersName,
Namespace: b.f.Namespace.Name,
}
}
return class
}
// nodeSelector returns a node selector that matches all nodes on which the
// kubelet plugin was deployed.
func (b *builder) nodeSelector() *v1.NodeSelector {
return &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
Values: b.driver.Nodenames(),
},
},
},
},
}
}
// externalClaim returns external resource claim
// that test pods can reference
func (b *builder) externalClaim(allocationMode resourcev1alpha1.AllocationMode) *resourcev1alpha1.ResourceClaim {
b.claimCounter++
name := "external-claim" + b.driver.NameSuffix // This is what podExternal expects.
if b.claimCounter > 1 {
name += fmt.Sprintf("-%d", b.claimCounter)
}
return &resourcev1alpha1.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: resourcev1alpha1.ResourceClaimSpec{
ResourceClassName: b.className(),
ParametersRef: &resourcev1alpha1.ResourceClaimParametersReference{
Kind: "ConfigMap",
Name: b.parametersName(),
},
AllocationMode: allocationMode,
},
}
}
// parametersName returns the current ConfigMap name for resource
// claim or class parameters.
func (b *builder) parametersName() string {
return fmt.Sprintf("parameters%s-%d", b.driver.NameSuffix, b.parametersCounter)
}
// parametersEnv returns the default env variables.
func (b *builder) parametersEnv() map[string]string {
return map[string]string{
"a": "b",
}
}
// parameters returns a config map with the default env variables.
func (b *builder) parameters(kv ...string) *v1.ConfigMap {
b.parametersCounter++
data := map[string]string{}
for i := 0; i < len(kv); i += 2 {
data[kv[i]] = kv[i+1]
}
if len(data) == 0 {
data = b.parametersEnv()
}
return &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: b.f.Namespace.Name,
Name: b.parametersName(),
},
Data: data,
}
}
// makePod returns a simple pod with no resource claims.
// The pod prints its env and waits.
func (b *builder) pod() *v1.Pod {
pod := e2epod.MakePod(b.f.Namespace.Name, nil, nil, false, "env && sleep 100000")
pod.Labels = make(map[string]string)
pod.Spec.RestartPolicy = v1.RestartPolicyNever
// Let kubelet kill the pods quickly. Setting
// TerminationGracePeriodSeconds to zero would bypass kubelet
// completely because then the apiserver enables a force-delete even
// when DeleteOptions for the pod don't ask for it (see
// https://github.com/kubernetes/kubernetes/blob/0f582f7c3f504e807550310d00f130cb5c18c0c3/pkg/registry/core/pod/strategy.go#L151-L171).
//
// We don't do that because it breaks tracking of claim usage: the
// kube-controller-manager assumes that kubelet is done with the pod
// once it got removed or has a grace period of 0. Setting the grace
// period to zero directly in DeletionOptions or indirectly through
// TerminationGracePeriodSeconds causes the controller to remove
// the pod from ReservedFor before it actually has stopped on
// the node.
one := int64(1)
pod.Spec.TerminationGracePeriodSeconds = &one
pod.ObjectMeta.GenerateName = ""
b.podCounter++
pod.ObjectMeta.Name = fmt.Sprintf("tester%s-%d", b.driver.NameSuffix, b.podCounter)
return pod
}
// makePodInline adds an inline resource claim with default class name and parameters.
func (b *builder) podInline(allocationMode resourcev1alpha1.AllocationMode) (*v1.Pod, *resourcev1alpha1.ResourceClaimTemplate) {
pod := b.pod()
pod.Spec.Containers[0].Name = "with-resource"
podClaimName := "my-inline-claim"
pod.Spec.Containers[0].Resources.Claims = []v1.ResourceClaim{{Name: podClaimName}}
pod.Spec.ResourceClaims = []v1.PodResourceClaim{
{
Name: podClaimName,
Source: v1.ClaimSource{
ResourceClaimTemplateName: &pod.Name,
},
},
}
template := &resourcev1alpha1.ResourceClaimTemplate{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
Spec: resourcev1alpha1.ResourceClaimTemplateSpec{
Spec: resourcev1alpha1.ResourceClaimSpec{
ResourceClassName: b.className(),
ParametersRef: &resourcev1alpha1.ResourceClaimParametersReference{
Kind: "ConfigMap",
Name: b.parametersName(),
},
AllocationMode: allocationMode,
},
},
}
return pod, template
}
// podInlineMultiple returns a pod with inline resource claim referenced by 3 containers
func (b *builder) podInlineMultiple(allocationMode resourcev1alpha1.AllocationMode) (*v1.Pod, *resourcev1alpha1.ResourceClaimTemplate) {
pod, template := b.podInline(allocationMode)
pod.Spec.Containers = append(pod.Spec.Containers, *pod.Spec.Containers[0].DeepCopy(), *pod.Spec.Containers[0].DeepCopy())
pod.Spec.Containers[1].Name = pod.Spec.Containers[1].Name + "-1"
pod.Spec.Containers[2].Name = pod.Spec.Containers[1].Name + "-2"
return pod, template
}
// podExternal adds a pod that references external resource claim with default class name and parameters.
func (b *builder) podExternal() *v1.Pod {
pod := b.pod()
pod.Spec.Containers[0].Name = "with-resource"
podClaimName := "resource-claim"
externalClaimName := "external-claim" + b.driver.NameSuffix
pod.Spec.ResourceClaims = []v1.PodResourceClaim{
{
Name: podClaimName,
Source: v1.ClaimSource{
ResourceClaimName: &externalClaimName,
},
},
}
pod.Spec.Containers[0].Resources.Claims = []v1.ResourceClaim{{Name: podClaimName}}
return pod
}
// podShared returns a pod with 3 containers that reference external resource claim with default class name and parameters.
func (b *builder) podExternalMultiple() *v1.Pod {
pod := b.podExternal()
pod.Spec.Containers = append(pod.Spec.Containers, *pod.Spec.Containers[0].DeepCopy(), *pod.Spec.Containers[0].DeepCopy())
pod.Spec.Containers[1].Name = pod.Spec.Containers[1].Name + "-1"
pod.Spec.Containers[2].Name = pod.Spec.Containers[1].Name + "-2"
return pod
}
// create takes a bunch of objects and calls their Create function.
func (b *builder) create(ctx context.Context, objs ...klog.KMetadata) {
for _, obj := range objs {
ginkgo.By(fmt.Sprintf("creating %T %s", obj, obj.GetName()), func() {
var err error
switch obj := obj.(type) {
case *resourcev1alpha1.ResourceClass:
_, err = b.f.ClientSet.ResourceV1alpha1().ResourceClasses().Create(ctx, obj, metav1.CreateOptions{})
case *v1.Pod:
_, err = b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
case *v1.ConfigMap:
_, err = b.f.ClientSet.CoreV1().ConfigMaps(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
case *resourcev1alpha1.ResourceClaim:
_, err = b.f.ClientSet.ResourceV1alpha1().ResourceClaims(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
case *resourcev1alpha1.ResourceClaimTemplate:
_, err = b.f.ClientSet.ResourceV1alpha1().ResourceClaimTemplates(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
default:
framework.Fail(fmt.Sprintf("internal error, unsupported type %T", obj), 1)
}
framework.ExpectNoErrorWithOffset(1, err, "create %T", obj)
})
}
}
// testPod runs pod and checks if container logs contain expected environment variables
func (b *builder) testPod(clientSet kubernetes.Interface, pod *v1.Pod, env ...string) {
err := e2epod.WaitForPodRunningInNamespace(clientSet, pod)
framework.ExpectNoError(err, "start pod")
for _, container := range pod.Spec.Containers {
log, err := e2epod.GetPodLogs(clientSet, pod.Namespace, pod.Name, container.Name)
framework.ExpectNoError(err, "get logs")
if len(env) == 0 {
for key, value := range b.parametersEnv() {
envStr := fmt.Sprintf("\nuser_%s=%s\n", key, value)
gomega.Expect(log).To(gomega.ContainSubstring(envStr), "container env variables")
}
} else {
for i := 0; i < len(env); i += 2 {
envStr := fmt.Sprintf("\n%s=%s\n", env[i], env[i+1])
gomega.Expect(log).To(gomega.ContainSubstring(envStr), "container env variables")
}
}
}
}
func newBuilder(f *framework.Framework, driver *Driver) *builder {
b := &builder{f: f, driver: driver}
ginkgo.BeforeEach(b.setUp)
return b
}
func (b *builder) setUp() {
b.podCounter = 0
b.parametersCounter = 0
b.claimCounter = 0
b.create(context.Background(), b.class())
ginkgo.DeferCleanup(b.tearDown)
}
func (b *builder) tearDown() {
ctx := context.Background()
err := b.f.ClientSet.ResourceV1alpha1().ResourceClasses().Delete(ctx, b.className(), metav1.DeleteOptions{})
framework.ExpectNoError(err, "delete resource class")
// Before we allow the namespace and all objects in it do be deleted by
// the framework, we must ensure that test pods and the claims that
// they use are deleted. Otherwise the driver might get deleted first,
// in which case deleting the claims won't work anymore.
ginkgo.By("delete pods and claims")
pods, err := b.listTestPods(ctx)
framework.ExpectNoError(err, "list pods")
for _, pod := range pods {
if pod.DeletionTimestamp != nil {
continue
}
ginkgo.By(fmt.Sprintf("deleting %T %s", &pod, klog.KObj(&pod)))
err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if !apierrors.IsNotFound(err) {
framework.ExpectNoError(err, "delete pod")
}
}
gomega.Eventually(func() ([]v1.Pod, error) {
return b.listTestPods(ctx)
}).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "remaining pods despite deletion")
claims, err := b.f.ClientSet.ResourceV1alpha1().ResourceClaims(b.f.Namespace.Name).List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err, "get resource claims")
for _, claim := range claims.Items {
if claim.DeletionTimestamp != nil {
continue
}
ginkgo.By(fmt.Sprintf("deleting %T %s", &claim, klog.KObj(&claim)))
err := b.f.ClientSet.ResourceV1alpha1().ResourceClaims(b.f.Namespace.Name).Delete(ctx, claim.Name, metav1.DeleteOptions{})
if !apierrors.IsNotFound(err) {
framework.ExpectNoError(err, "delete claim")
}
}
for host, plugin := range b.driver.Nodes {
ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host))
gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host)
}
ginkgo.By("waiting for claims to be deallocated and deleted")
gomega.Eventually(func() ([]resourcev1alpha1.ResourceClaim, error) {
claims, err := b.f.ClientSet.ResourceV1alpha1().ResourceClaims(b.f.Namespace.Name).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
return claims.Items, nil
}).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "claims in the namespaces")
}
func (b *builder) listTestPods(ctx context.Context) ([]v1.Pod, error) {
pods, err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
var testPods []v1.Pod
for _, pod := range pods.Items {
if pod.Labels["app.kubernetes.io/part-of"] == "dra-test-driver" {
continue
}
testPods = append(testPods, pod)
}
return testPods, nil
}

View File

@ -0,0 +1,103 @@
# dra-test-driver
This driver implements the controller and a resource kubelet plugin for dynamic
resource allocation. This is done in a single binary to minimize the amount of
boilerplate code. "Real" drivers could also implement both in different
binaries.
## Usage
The driver could get deployed as a Deployment for the controller, with leader
election. A DaemonSet could get used for the kubelet plugin. The controller can
also run as a Kubernetes client outside of a cluster. The same works for the
kubelet plugin when using port forwarding. This is how it is used during
testing.
Valid parameters are key/value string pairs stored in a ConfigMap.
Those get copied into the ResourceClaimStatus with "user_" and "admin_" as
prefix, depending on whether they came from the ResourceClaim or ResourceClass.
They get stored in the `ResourceHandle` field as JSON map by the controller.
The kubelet plugin then sets these attributes as environment variables in each
container that uses the resource.
Resource availability is configurable and can simulate different scenarios:
- Network-attached resources, available on all nodes where the node driver runs, or
host-local resources, available only on the node whether they were allocated.
- Shared or unshared allocations.
- Unlimited or limited resources. The limit is a simple number of allocations
per cluster or node.
While the functionality itself is very limited, the code strives to showcase
best practices and supports metrics, leader election, and the same logging
options as Kubernetes.
## Design
The binary itself is a Cobra command with two operations, `controller` and
`kubelet-plugin`. Logging is done with [contextual
logging](https://github.com/kubernetes/enhancements/tree/master/keps/sig-instrumentation/3077-contextual-logging).
The `k8s.io/dynamic-resource-allocation/controller` package implements the
interaction with ResourceClaims. It is generic and relies on an interface to
implement the actual driver logic. Long-term that part could be split out into
a reusable utility package.
The `k8s.io/dynamic-resource-allocation/kubelet-plugin` package implements the
interaction with kubelet, again relying only on the interface defined for the
kubelet<->dynamic resource allocation plugin interaction.
`app` is the driver itself with a very simple implementation of the interfaces.
## Deployment
### `local-up-cluster.sh`
To try out the feature, build Kubernetes, then in one console run:
```console
FEATURE_GATES=DynamicResourceAllocation=true ALLOW_PRIVILEGED=1 ./hack/local-up-cluster.sh -O
```
In another:
```console
go run ./test/e2e/dra/test-driver --feature-gates ContextualLogging=true -v=5 controller
```
In yet another:
```console
sudo mkdir -p /var/run/cdi && sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry
go run ./test/e2e/dra/test-driver --feature-gates ContextualLogging=true -v=5 kubelet-plugin
```
And finally:
```console
$ kubectl create -f test/e2e/dra/test-driver/deploy/example/resourceclass.yaml
resourceclass/example created
$ kubectl create -f test/e2e/dra/test-driver/deploy/example/pod-inline.yaml
configmap/pause-claim-parameters created
pod/pause created
$ kubectl get resourceclaims
NAME CLASSNAME ALLOCATIONMODE STATE AGE
pause-resource example WaitForFirstConsumer allocated,reserved 19s
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
pause 1/1 Running 0 23s
```
There are also examples for other scenarios (multiple pods, multiple claims).
### multi-node cluster
At this point there are no container images that contain the test driver and
therefore it cannot be deployed on "normal" clusters.
## Prior art
Some of this code was derived from the
[external-resizer](https://github.com/kubernetes-csi/external-resizer/). `controller`
corresponds to the [controller
logic](https://github.com/kubernetes-csi/external-resizer/blob/master/pkg/controller/controller.go),
which in turn is similar to the
[sig-storage-lib-external-provisioner](https://github.com/kubernetes-sigs/sig-storage-lib-external-provisioner).

View File

@ -0,0 +1,44 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
// These definitions are sufficient to generate simple CDI files which set env
// variables in a container, which is all that the test driver does. A real
// driver will either use pre-generated CDI files or can use the
// github.com/container-orchestrated-devices/container-device-interface/pkg/cdi
// helper package to generate files.
//
// This is not done in Kubernetes to minimize dependencies.
// spec is the base configuration for CDI.
type spec struct {
Version string `json:"cdiVersion"`
Kind string `json:"kind"`
Devices []device `json:"devices"`
}
// device is a "Device" a container runtime can add to a container.
type device struct {
Name string `json:"name"`
ContainerEdits containerEdits `json:"containerEdits"`
}
// containerEdits are edits a container runtime must make to the OCI spec to expose the device.
type containerEdits struct {
Env []string `json:"env,omitempty"`
}

View File

@ -0,0 +1,357 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app does all of the work necessary to configure and run a
// Kubernetes app process.
package app
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"strings"
"sync"
v1 "k8s.io/api/core/v1"
resourcev1alpha1 "k8s.io/api/resource/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/controller"
"k8s.io/klog/v2"
)
type Resources struct {
NodeLocal bool
Nodes []string
MaxAllocations int
Shareable bool
// AllocateWrapper, if set, gets called for each Allocate call.
AllocateWrapper AllocateWrapperType
}
type AllocateWrapperType func(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{},
class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string,
handler func(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{},
class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha1.AllocationResult, err error),
) (result *resourcev1alpha1.AllocationResult, err error)
type ExampleController struct {
clientset kubernetes.Interface
resources Resources
driverName string
// mutex must be locked at the gRPC call level.
mutex sync.Mutex
// allocated maps claim.UID to the node (if network-attached) or empty (if not).
allocated map[types.UID]string
numAllocations, numDeallocations int64
}
func NewController(clientset kubernetes.Interface, driverName string, resources Resources) *ExampleController {
c := &ExampleController{
clientset: clientset,
resources: resources,
driverName: driverName,
allocated: make(map[types.UID]string),
}
return c
}
func (c *ExampleController) Run(ctx context.Context, workers int) *ExampleController {
informerFactory := informers.NewSharedInformerFactory(c.clientset, 0 /* resync period */)
ctrl := controller.New(ctx, c.driverName, c, c.clientset, informerFactory)
informerFactory.Start(ctx.Done())
ctrl.Run(workers)
return c
}
type parameters struct {
EnvVars map[string]string
NodeName string
}
var _ controller.Driver = &ExampleController{}
func (c *ExampleController) countAllocations(node string) int {
total := 0
for _, n := range c.allocated {
if n == node {
total++
}
}
return total
}
// GetNumAllocations returns the number of times that a claim was allocated.
// Idempotent calls to Allocate that do not need to allocate the claim again do
// not contribute to that counter.
func (c *ExampleController) GetNumAllocations() int64 {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.numAllocations
}
// GetNumDeallocations returns the number of times that a claim was allocated.
// Idempotent calls to Allocate that do not need to allocate the claim again do
// not contribute to that counter.
func (c *ExampleController) GetNumDeallocations() int64 {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.numDeallocations
}
func (c *ExampleController) GetClassParameters(ctx context.Context, class *resourcev1alpha1.ResourceClass) (interface{}, error) {
if class.ParametersRef != nil {
if class.ParametersRef.APIGroup != "" ||
class.ParametersRef.Kind != "ConfigMap" {
return nil, fmt.Errorf("class parameters are only supported in APIVersion v1, Kind ConfigMap, got: %v", class.ParametersRef)
}
return c.readParametersFromConfigMap(ctx, class.ParametersRef.Namespace, class.ParametersRef.Name)
}
return nil, nil
}
func (c *ExampleController) GetClaimParameters(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, class *resourcev1alpha1.ResourceClass, classParameters interface{}) (interface{}, error) {
if claim.Spec.ParametersRef != nil {
if claim.Spec.ParametersRef.APIGroup != "" ||
claim.Spec.ParametersRef.Kind != "ConfigMap" {
return nil, fmt.Errorf("claim parameters are only supported in APIVersion v1, Kind ConfigMap, got: %v", claim.Spec.ParametersRef)
}
return c.readParametersFromConfigMap(ctx, claim.Namespace, claim.Spec.ParametersRef.Name)
}
return nil, nil
}
func (c *ExampleController) readParametersFromConfigMap(ctx context.Context, namespace, name string) (map[string]string, error) {
configMap, err := c.clientset.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("get config map: %v", err)
}
return configMap.Data, nil
}
func (c *ExampleController) Allocate(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{}, class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha1.AllocationResult, err error) {
if c.resources.AllocateWrapper != nil {
return c.resources.AllocateWrapper(ctx, claim, claimParameters, class, classParameters, selectedNode, c.allocate)
}
return c.allocate(ctx, claim, claimParameters, class, classParameters, selectedNode)
}
// allocate simply copies parameters as JSON map into ResourceHandle.
func (c *ExampleController) allocate(ctx context.Context, claim *resourcev1alpha1.ResourceClaim, claimParameters interface{}, class *resourcev1alpha1.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha1.AllocationResult, err error) {
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "Allocate"), "claim", klog.KObj(claim), "uid", claim.UID)
defer func() {
logger.Info("done", "result", prettyPrint(result), "err", err)
}()
c.mutex.Lock()
defer c.mutex.Unlock()
// Already allocated? Then we don't need to count it again.
node, alreadyAllocated := c.allocated[claim.UID]
if alreadyAllocated {
// Idempotent result - kind of. We don't check whether
// the parameters changed in the meantime. A real
// driver would have to do that.
logger.Info("already allocated")
} else {
logger.Info("starting", "selectedNode", selectedNode)
if c.resources.NodeLocal {
node = selectedNode
if node == "" {
// If none has been selected because we do immediate allocation,
// then we need to pick one ourselves.
var viableNodes []string
for _, n := range c.resources.Nodes {
if c.resources.MaxAllocations == 0 ||
c.countAllocations(n) < c.resources.MaxAllocations {
viableNodes = append(viableNodes, n)
}
}
if len(viableNodes) == 0 {
return nil, errors.New("resources exhausted on all nodes")
}
// Pick randomly. We could also prefer the one with the least
// number of allocations (even spreading) or the most (packing).
node = viableNodes[rand.Intn(len(viableNodes))]
logger.Info("picked a node ourselves", "selectedNode", selectedNode)
} else if c.resources.MaxAllocations > 0 &&
c.countAllocations(node) >= c.resources.MaxAllocations {
return nil, fmt.Errorf("resources exhausted on node %q", node)
}
} else {
if c.resources.MaxAllocations > 0 &&
len(c.allocated) >= c.resources.MaxAllocations {
return nil, errors.New("resources exhausted in the cluster")
}
}
}
allocation := &resourcev1alpha1.AllocationResult{
Shareable: c.resources.Shareable,
}
p := parameters{
EnvVars: make(map[string]string),
NodeName: node,
}
toEnvVars("user", claimParameters, p.EnvVars)
toEnvVars("admin", classParameters, p.EnvVars)
data, err := json.Marshal(p)
if err != nil {
return nil, fmt.Errorf("encode parameters: %v", err)
}
allocation.ResourceHandle = string(data)
var nodes []string
if node != "" {
nodes = append(nodes, node)
} else {
nodes = c.resources.Nodes
}
if len(nodes) > 0 {
allocation.AvailableOnNodes = &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
Values: nodes,
},
},
},
},
}
}
if !alreadyAllocated {
c.numAllocations++
c.allocated[claim.UID] = node
}
return allocation, nil
}
func (c *ExampleController) Deallocate(ctx context.Context, claim *resourcev1alpha1.ResourceClaim) error {
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "Deallocate"), "claim", klog.KObj(claim), "uid", claim.UID)
c.mutex.Lock()
defer c.mutex.Unlock()
if _, ok := c.allocated[claim.UID]; !ok {
logger.Info("already deallocated")
return nil
}
logger.Info("done")
c.numDeallocations++
delete(c.allocated, claim.UID)
return nil
}
func (c *ExampleController) UnsuitableNodes(ctx context.Context, pod *v1.Pod, claims []*controller.ClaimAllocation, potentialNodes []string) (finalErr error) {
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "UnsuitableNodes"), "pod", klog.KObj(pod))
logger.Info("starting", "claim", prettyPrintSlice(claims), "potentialNodes", potentialNodes)
defer func() {
// UnsuitableNodes is the same for all claims.
logger.Info("done", "unsuitableNodes", claims[0].UnsuitableNodes, "err", finalErr)
}()
if c.resources.MaxAllocations == 0 {
// All nodes are suitable.
return nil
}
if c.resources.NodeLocal {
allocationsPerNode := make(map[string]int)
for _, node := range c.resources.Nodes {
allocationsPerNode[node] = c.countAllocations(node)
}
for _, claim := range claims {
claim.UnsuitableNodes = nil
for _, node := range potentialNodes {
// If we have more than one claim, then a
// single pod wants to use all of them. That
// can only work if a node has capacity left
// for all of them. Also, nodes that the driver
// doesn't run on cannot be used.
if contains(c.resources.Nodes, node) &&
allocationsPerNode[node]+len(claims) > c.resources.MaxAllocations {
claim.UnsuitableNodes = append(claim.UnsuitableNodes, node)
}
}
}
return nil
}
allocations := c.countAllocations("")
for _, claim := range claims {
claim.UnsuitableNodes = nil
for _, node := range potentialNodes {
if contains(c.resources.Nodes, node) &&
allocations+len(claims) > c.resources.MaxAllocations {
claim.UnsuitableNodes = append(claim.UnsuitableNodes, node)
}
}
}
return nil
}
func toEnvVars(what string, from interface{}, to map[string]string) {
if from == nil {
return
}
env := from.(map[string]string)
for key, value := range env {
to[what+"_"+strings.ToLower(key)] = value
}
}
func contains[T comparable](list []T, value T) bool {
for _, v := range list {
if v == value {
return true
}
}
return false
}
func prettyPrint[T any](obj *T) interface{} {
if obj == nil {
return "<nil>"
}
return *obj
}
// prettyPrintSlice prints the values the slice points to, not the pointers.
func prettyPrintSlice[T any](slice []*T) interface{} {
var values []interface{}
for _, v := range slice {
if v == nil {
values = append(values, "<nil>")
} else {
values = append(values, *v)
}
}
return values
}

View File

@ -0,0 +1,208 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha1"
)
type ExamplePlugin struct {
logger klog.Logger
d kubeletplugin.DRAPlugin
fileOps FileOperations
cdiDir string
driverName string
nodeName string
mutex sync.Mutex
prepared map[ClaimID]bool
}
// ClaimID contains both claim name and UID to simplify debugging. The
// namespace is not included because it is random in E2E tests and the UID is
// sufficient to make the ClaimID unique.
type ClaimID struct {
Name string
UID string
}
var _ drapbv1.NodeServer = &ExamplePlugin{}
// getJSONFilePath returns the absolute path where CDI file is/should be.
func (ex *ExamplePlugin) getJSONFilePath(claimUID string) string {
return filepath.Join(ex.cdiDir, fmt.Sprintf("%s-%s.json", ex.driverName, claimUID))
}
// FileOperations defines optional callbacks for handling CDI files.
type FileOperations struct {
// Create must overwrite the file.
Create func(name string, content []byte) error
// Remove must remove the file. It must not return an error when the
// file does not exist.
Remove func(name string) error
}
// StartPlugin sets up the servers that are necessary for a DRA kubelet plugin.
func StartPlugin(logger klog.Logger, cdiDir, driverName string, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) {
if fileOps.Create == nil {
fileOps.Create = func(name string, content []byte) error {
return os.WriteFile(name, content, os.FileMode(0644))
}
}
if fileOps.Remove == nil {
fileOps.Remove = func(name string) error {
if err := os.Remove(name); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
}
ex := &ExamplePlugin{
logger: logger,
fileOps: fileOps,
cdiDir: cdiDir,
driverName: driverName,
nodeName: nodeName,
prepared: make(map[ClaimID]bool),
}
opts = append(opts,
kubeletplugin.Logger(logger),
kubeletplugin.DriverName(driverName),
)
d, err := kubeletplugin.Start(ex, opts...)
if err != nil {
return nil, fmt.Errorf("start kubelet plugin: %v", err)
}
ex.d = d
return ex, nil
}
// stop ensures that all servers are stopped and resources freed.
func (ex *ExamplePlugin) Stop() {
ex.d.Stop()
}
func (ex *ExamplePlugin) IsRegistered() bool {
status := ex.d.RegistrationStatus()
if status == nil {
return false
}
return status.PluginRegistered
}
// NodePrepareResource ensures that the CDI file for the claim exists. It uses
// a deterministic name to simplify NodeUnprepareResource (no need to remember
// or discover the name) and idempotency (when called again, the file simply
// gets written again).
func (ex *ExamplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1.NodePrepareResourceRequest) (*drapbv1.NodePrepareResourceResponse, error) {
logger := klog.FromContext(ctx)
// Determine environment variables.
var p parameters
if err := json.Unmarshal([]byte(req.ResourceHandle), &p); err != nil {
return nil, fmt.Errorf("unmarshal resource handle: %v", err)
}
// Sanity check scheduling.
if p.NodeName != "" && ex.nodeName != "" && p.NodeName != ex.nodeName {
return nil, fmt.Errorf("claim was allocated for %q, cannot be prepared on %q", p.NodeName, ex.nodeName)
}
// CDI wants env variables as set of strings.
envs := []string{}
for key, val := range p.EnvVars {
envs = append(envs, key+"="+val)
}
deviceName := "claim-" + req.ClaimUid
vendor := ex.driverName
class := "test"
spec := &spec{
Version: "0.2.0", // This has to be a version accepted by the runtimes.
Kind: vendor + "/" + class,
// At least one device is required and its entry must have more
// than just the name.
Devices: []device{
{
Name: deviceName,
ContainerEdits: containerEdits{
Env: envs,
},
},
},
}
filePath := ex.getJSONFilePath(req.ClaimUid)
buffer, err := json.Marshal(spec)
if err != nil {
return nil, fmt.Errorf("marshal spec: %v", err)
}
if err := ex.fileOps.Create(filePath, buffer); err != nil {
return nil, fmt.Errorf("failed to write CDI file %v", err)
}
dev := vendor + "/" + class + "=" + deviceName
resp := &drapbv1.NodePrepareResourceResponse{CdiDevices: []string{dev}}
ex.mutex.Lock()
defer ex.mutex.Unlock()
ex.prepared[ClaimID{Name: req.ClaimName, UID: req.ClaimUid}] = true
logger.V(3).Info("CDI file created", "path", filePath, "device", dev)
return resp, nil
}
// NodeUnprepareResource removes the CDI file created by
// NodePrepareResource. It's idempotent, therefore it is not an error when that
// file is already gone.
func (ex *ExamplePlugin) NodeUnprepareResource(ctx context.Context, req *drapbv1.NodeUnprepareResourceRequest) (*drapbv1.NodeUnprepareResourceResponse, error) {
logger := klog.FromContext(ctx)
filePath := ex.getJSONFilePath(req.ClaimUid)
if err := ex.fileOps.Remove(filePath); err != nil {
return nil, fmt.Errorf("error removing CDI file: %v", err)
}
logger.V(3).Info("CDI file removed", "path", filePath)
ex.mutex.Lock()
defer ex.mutex.Unlock()
delete(ex.prepared, ClaimID{Name: req.ClaimName, UID: req.ClaimUid})
return &drapbv1.NodeUnprepareResourceResponse{}, nil
}
func (ex *ExamplePlugin) GetPreparedResources() []ClaimID {
ex.mutex.Lock()
defer ex.mutex.Unlock()
var prepared []ClaimID
for claimID := range ex.prepared {
prepared = append(prepared, claimID)
}
return prepared
}

View File

@ -0,0 +1,319 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app does all of the work necessary to configure and run a
// Kubernetes app process.
package app
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"path"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/spf13/cobra"
"k8s.io/component-base/metrics"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/term"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/dynamic-resource-allocation/leaderelection"
"k8s.io/klog/v2"
)
// NewCommand creates a *cobra.Command object with default parameters.
func NewCommand() *cobra.Command {
o := logsapi.NewLoggingConfiguration()
var clientset kubernetes.Interface
var config *rest.Config
ctx := context.Background()
logger := klog.Background()
cmd := &cobra.Command{
Use: "cdi-test-driver",
Long: "cdi-test-driver implements a resource driver controller and kubelet plugin.",
}
sharedFlagSets := cliflag.NamedFlagSets{}
fs := sharedFlagSets.FlagSet("logging")
logsapi.AddFlags(o, fs)
logs.AddFlags(fs, logs.SkipLoggingConfigurationFlags())
fs = sharedFlagSets.FlagSet("Kubernetes client")
kubeconfig := fs.String("kubeconfig", "", "Absolute path to the kube.config file. Either this or KUBECONFIG need to be set if the driver is being run out of cluster.")
kubeAPIQPS := fs.Float32("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver.")
kubeAPIBurst := fs.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver.")
workers := fs.Int("workers", 10, "Concurrency to process multiple claims")
fs = sharedFlagSets.FlagSet("http server")
httpEndpoint := fs.String("http-endpoint", "",
"The TCP network address where the HTTP server for diagnostics, including pprof, metrics and (if applicable) leader election health check, will listen (example: `:8080`). The default is the empty string, which means the server is disabled.")
metricsPath := fs.String("metrics-path", "/metrics", "The HTTP path where Prometheus metrics will be exposed, disabled if empty.")
profilePath := fs.String("pprof-path", "", "The HTTP path where pprof profiling will be available, disabled if empty.")
fs = sharedFlagSets.FlagSet("CDI")
driverName := fs.String("drivername", "test-driver.cdi.k8s.io", "Resource driver name.")
fs = sharedFlagSets.FlagSet("other")
featureGate := featuregate.NewFeatureGate()
utilruntime.Must(logsapi.AddFeatureGates(featureGate))
featureGate.AddFlag(fs)
fs = cmd.PersistentFlags()
for _, f := range sharedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
mux := http.NewServeMux()
cmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {
// Activate logging as soon as possible, after that
// show flags with the final logging configuration.
if err := logsapi.ValidateAndApply(o, featureGate); err != nil {
return err
}
// get the KUBECONFIG from env if specified (useful for local/debug cluster)
kubeconfigEnv := os.Getenv("KUBECONFIG")
if kubeconfigEnv != "" {
logger.Info("Found KUBECONFIG environment variable set, using that..")
*kubeconfig = kubeconfigEnv
}
var err error
if *kubeconfig == "" {
config, err = rest.InClusterConfig()
if err != nil {
return fmt.Errorf("create in-cluster client configuration: %v", err)
}
} else {
config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
return fmt.Errorf("create out-of-cluster client configuration: %v", err)
}
}
config.QPS = *kubeAPIQPS
config.Burst = *kubeAPIBurst
clientset, err = kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("create client: %v", err)
}
if *httpEndpoint != "" {
if *metricsPath != "" {
// For workqueue and leader election metrics, set up via the anonymous imports of:
// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/workqueue/metrics.go
// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/leaderelection/metrics.go
//
// Also to happens to include Go runtime and process metrics:
// https://github.com/kubernetes/kubernetes/blob/9780d88cb6a4b5b067256ecb4abf56892093ee87/staging/src/k8s.io/component-base/metrics/legacyregistry/registry.go#L46-L49
gatherer := legacyregistry.DefaultGatherer
actualPath := path.Join("/", *metricsPath)
logger.Info("Starting metrics", "path", actualPath)
mux.Handle(actualPath,
metrics.HandlerFor(gatherer, metrics.HandlerOpts{}))
}
if *profilePath != "" {
actualPath := path.Join("/", *profilePath)
logger.Info("Starting profiling", "path", actualPath)
mux.HandleFunc(path.Join("/", *profilePath), pprof.Index)
mux.HandleFunc(path.Join("/", *profilePath, "cmdline"), pprof.Cmdline)
mux.HandleFunc(path.Join("/", *profilePath, "profile"), pprof.Profile)
mux.HandleFunc(path.Join("/", *profilePath, "symbol"), pprof.Symbol)
mux.HandleFunc(path.Join("/", *profilePath, "trace"), pprof.Trace)
}
listener, err := net.Listen("tcp", *httpEndpoint)
if err != nil {
return fmt.Errorf("listen on HTTP endpoint: %v", err)
}
go func() {
logger.Info("Starting HTTP server", "endpoint", *httpEndpoint)
err := http.Serve(listener, mux)
if err != nil {
logger.Error(err, "HTTP server failed")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}()
}
return nil
}
controller := &cobra.Command{
Use: "controller",
Short: "run as resource controller",
Long: "cdi-test-driver controller runs as a resource driver controller.",
Args: cobra.ExactArgs(0),
}
controllerFlagSets := cliflag.NamedFlagSets{}
fs = controllerFlagSets.FlagSet("leader election")
enableLeaderElection := fs.Bool("leader-election", false,
"Enables leader election. If leader election is enabled, additional RBAC rules are required.")
leaderElectionNamespace := fs.String("leader-election-namespace", "",
"Namespace where the leader election resource lives. Defaults to the pod namespace if not set.")
leaderElectionLeaseDuration := fs.Duration("leader-election-lease-duration", 15*time.Second,
"Duration, in seconds, that non-leader candidates will wait to force acquire leadership.")
leaderElectionRenewDeadline := fs.Duration("leader-election-renew-deadline", 10*time.Second,
"Duration, in seconds, that the acting leader will retry refreshing leadership before giving up.")
leaderElectionRetryPeriod := fs.Duration("leader-election-retry-period", 5*time.Second,
"Duration, in seconds, the LeaderElector clients should wait between tries of actions.")
resourceConfig := fs.String("resource-config", "", "A JSON file containing a Resources struct. Defaults are unshared, network-attached resources.")
fs = controller.Flags()
for _, f := range controllerFlagSets.FlagSets {
fs.AddFlagSet(f)
}
controller.RunE = func(cmd *cobra.Command, args []string) error {
resources := Resources{}
if *resourceConfig != "" {
file, err := os.Open(*resourceConfig)
if err != nil {
return fmt.Errorf("open resource config: %v", err)
}
decoder := json.NewDecoder(file)
decoder.DisallowUnknownFields()
if err := decoder.Decode(&resources); err != nil {
return fmt.Errorf("parse resource config %q: %v", *resourceConfig, err)
}
}
run := func() {
controller := NewController(clientset, *driverName, resources)
controller.Run(ctx, *workers)
}
if !*enableLeaderElection {
run()
return nil
}
// This must not change between releases.
lockName := *driverName
// Create a new clientset for leader election
// to avoid starving it when the normal traffic
// exceeds the QPS+burst limits.
leClientset, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("create leaderelection client: %v", err)
}
le := leaderelection.New(leClientset, lockName,
func(ctx context.Context) {
run()
},
leaderelection.LeaseDuration(*leaderElectionLeaseDuration),
leaderelection.RenewDeadline(*leaderElectionRenewDeadline),
leaderelection.RetryPeriod(*leaderElectionRetryPeriod),
leaderelection.Namespace(*leaderElectionNamespace),
)
if *httpEndpoint != "" {
le.PrepareHealthCheck(mux)
}
if err := le.Run(); err != nil {
return fmt.Errorf("leader election failed: %v", err)
}
return nil
}
cmd.AddCommand(controller)
kubeletPlugin := &cobra.Command{
Use: "kubelet-plugin",
Short: "run as kubelet plugin",
Long: "cdi-test-driver kubelet-plugin runs as a device plugin for kubelet that supports dynamic resource allocation.",
Args: cobra.ExactArgs(0),
}
kubeletPluginFlagSets := cliflag.NamedFlagSets{}
fs = kubeletPluginFlagSets.FlagSet("kubelet")
pluginRegistrationPath := fs.String("plugin-registration-path", "/var/lib/kubelet/plugins_registry", "The directory where kubelet looks for plugin registration sockets, in the filesystem of the driver.")
endpoint := fs.String("endpoint", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket where the driver will listen for kubelet requests, in the filesystem of the driver.")
draAddress := fs.String("dra-address", "/var/lib/kubelet/plugins/test-driver/dra.sock", "The Unix domain socket that kubelet will connect to for dynamic resource allocation requests, in the filesystem of kubelet.")
fs = kubeletPluginFlagSets.FlagSet("CDI")
cdiDir := fs.String("cdi-dir", "/var/run/cdi", "directory for dynamically created CDI JSON files")
fs = kubeletPlugin.Flags()
for _, f := range kubeletPluginFlagSets.FlagSets {
fs.AddFlagSet(f)
}
kubeletPlugin.RunE = func(cmd *cobra.Command, args []string) error {
// Ensure that directories exist, creating them if necessary. We want
// to know early if there is a setup problem that would prevent
// creating those directories.
if err := os.MkdirAll(*cdiDir, os.FileMode(0750)); err != nil {
return fmt.Errorf("create CDI directory: %v", err)
}
if err := os.MkdirAll(filepath.Dir(*endpoint), 0750); err != nil {
return fmt.Errorf("create socket directory: %v", err)
}
plugin, err := StartPlugin(logger, *cdiDir, *driverName, "", FileOperations{},
kubeletplugin.PluginSocketPath(*endpoint),
kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")),
kubeletplugin.KubeletPluginSocketPath(*draAddress),
)
if err != nil {
return fmt.Errorf("start example plugin: %v", err)
}
// Handle graceful shutdown. We need to delete Unix domain
// sockets.
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, os.Interrupt, syscall.SIGTERM)
logger.Info("Waiting for signal.")
sig := <-sigc
logger.Info("Received signal, shutting down.", "signal", sig)
plugin.Stop()
return nil
}
cmd.AddCommand(kubeletPlugin)
// SetUsageAndHelpFunc takes care of flag grouping. However,
// it doesn't support listing child commands. We add those
// to cmd.Use.
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cliflag.SetUsageAndHelpFunc(cmd, sharedFlagSets, cols)
var children []string
for _, child := range cmd.Commands() {
children = append(children, child.Use)
}
cmd.Use += " [shared flags] " + strings.Join(children, "|")
cliflag.SetUsageAndHelpFunc(controller, controllerFlagSets, cols)
cliflag.SetUsageAndHelpFunc(kubeletPlugin, kubeletPluginFlagSets, cols)
return cmd
}

View File

@ -0,0 +1,14 @@
# This storage class intentionally doesn't match any nodes.
# When using it instead of a functional one, scheduling a pod leads to:
# Warning FailedScheduling 16s default-scheduler 0/1 nodes are available: 1 excluded via potential node filter in resource class.
apiVersion: resource.k8s.io/v1alpha1
kind: ResourceClass
metadata:
name: example
driverName: test-driver.cdi.k8s.io
suitableNodes:
nodeSelectorTerms:
- matchExpressions:
- key: no-such-label
operator: Exists

View File

@ -0,0 +1,40 @@
# One external resource claim, one pod, two containers.
# One container uses resource, one does not.
apiVersion: v1
kind: ConfigMap
metadata:
name: external-claim-parameters
namespace: default
data:
a: b
---
apiVersion: resource.k8s.io/v1alpha1
kind: ResourceClaim
metadata:
name: external-claim
spec:
resourceClassName: example
parametersRef:
kind: ConfigMap
name: external-claim-parameters
---
apiVersion: v1
kind: Pod
metadata:
name: test-external-claim
spec:
restartPolicy: Never
containers:
- name: with-resource
image: registry.k8s.io/e2e-test-images/busybox:1.29-2
command: ["sh", "-c", "set && mount && ls -la /dev/"]
resources:
claims:
- resource
- name: without-resource
image: registry.k8s.io/e2e-test-images/busybox:1.29-2
command: ["sh", "-c", "set && mount && ls -la /dev/"]
resourceClaims:
- name: resource
source:
resourceClaimName: external-claim

View File

@ -0,0 +1,48 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: pause-claim-parameters
namespace: default
data:
a: b
---
apiVersion: resource.k8s.io/v1alpha1
kind: ResourceClaimTemplate
metadata:
name: pause-template
namespace: default
spec:
metadata:
labels:
app: inline-resource
spec:
resourceClassName: example
parametersRef:
kind: ConfigMap
name: pause-claim-parameters
---
apiVersion: v1
kind: Pod
metadata:
name: pause
labels:
name: pause
spec:
containers:
- name: pause1
image: "k8s.gcr.io/pause:3.6"
resources:
claims:
- name: resource1
- name: pause2
image: "k8s.gcr.io/pause:3.6"
resources:
claims:
- name: resource2
resourceClaims:
- name: resource1
source:
resourceClaimTemplateName: pause-template
- name: resource2
source:
resourceClaimTemplateName: pause-template

View File

@ -0,0 +1,45 @@
# One inline resource claim, one pod, two containers.
# One container uses resource, one does not.
apiVersion: v1
kind: ConfigMap
metadata:
name: inline-claim-parameters
namespace: default
data:
a: b
---
apiVersion: resource.k8s.io/v1alpha1
kind: ResourceClaimTemplate
metadata:
name: test-inline-claim-template
namespace: default
spec:
metadata:
labels:
app: inline-resource
spec:
resourceClassName: example
parametersRef:
kind: ConfigMap
name: pause-claim-parameters
---
apiVersion: v1
kind: Pod
metadata:
name: test-inline-claim
spec:
restartPolicy: Never
containers:
- name: with-resource
image: registry.k8s.io/e2e-test-images/busybox:1.29-2
command: ["sh", "-c", "set && mount && ls -la /dev/"]
resources:
claims:
- name: resource
- name: without-resource
image: registry.k8s.io/e2e-test-images/busybox:1.29-2
command: ["sh", "-c", "set && mount && ls -la /dev/"]
resourceClaims:
- name: resource
source:
resourceClaimTemplateName: test-inline-claim-template

View File

@ -0,0 +1,61 @@
# One external resource claim, two pods, two containers in each pod.
# Pods share the same resource.
# One container uses resource, one does not.
apiVersion: v1
kind: ConfigMap
metadata:
name: shared-claim-parameters
data:
a: b
---
apiVersion: resource.k8s.io/v1alpha1
kind: ResourceClaim
metadata:
name: shared-claim
spec:
resourceClassName: example
parametersRef:
kind: ConfigMap
name: shared-claim-parameters
---
apiVersion: v1
kind: Pod
metadata:
name: test-shared-claim
spec:
restartPolicy: Never
containers:
- name: with-resource
image: registry.k8s.io/e2e-test-images/busybox:1.29-2
command: ["sh", "-c", "set && mount && ls -la /dev/"]
resources:
claims:
- name: resource
- name: without-resource
image: registry.k8s.io/e2e-test-images/busybox:1.29-2
command: ["sh", "-c", "set && mount && ls -la /dev/"]
resourceClaims:
- name: resource
source:
resourceClaimName: shared-claim
---
apiVersion: v1
kind: Pod
metadata:
name: test-shared-claim-2
spec:
restartPolicy: Never
containers:
- name: with-resource
image: registry.k8s.io/e2e-test-images/busybox:1.29-2
command: ["sh", "-c", "set && mount && ls -la /dev/"]
resources:
claims:
- name: resource
- name: without-resource
image: registry.k8s.io/e2e-test-images/busybox:1.29-2
command: ["sh", "-c", "set && mount && ls -la /dev/"]
resourceClaims:
- name: resource
source:
resourceClaimName: shared-claim

View File

@ -0,0 +1,19 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: example-claim-parameters
namespace: default
data:
a: b
---
apiVersion: resource.k8s.io/v1alpha1
kind: ResourceClaim
metadata:
name: example
namespace: default
spec:
allocationMode: Immediate
resourceClassName: example
parametersRef:
kind: ConfigMap
name: example-claim-parameters

View File

@ -0,0 +1,7 @@
apiVersion: resource.k8s.io/v1alpha1
kind: ResourceClass
metadata:
name: example
driverName: test-driver.cdi.k8s.io
# TODO:
# parameters

View File

@ -0,0 +1,35 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"os"
"k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // for JSON log output support
_ "k8s.io/component-base/metrics/prometheus/clientgo/leaderelection" // register leader election in the default legacy registry
_ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
_ "k8s.io/component-base/metrics/prometheus/workqueue" // register work queues in the default legacy registry
"k8s.io/kubernetes/test/e2e/dra/test-driver/app"
)
func main() {
command := app.NewCommand()
code := cli.Run(command)
os.Exit(code)
}

View File

@ -52,6 +52,7 @@ import (
_ "k8s.io/kubernetes/test/e2e/autoscaling"
_ "k8s.io/kubernetes/test/e2e/cloud"
_ "k8s.io/kubernetes/test/e2e/common"
_ "k8s.io/kubernetes/test/e2e/dra"
_ "k8s.io/kubernetes/test/e2e/instrumentation"
_ "k8s.io/kubernetes/test/e2e/kubectl"
_ "k8s.io/kubernetes/test/e2e/lifecycle"

View File

@ -0,0 +1,11 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- klueska
- pohly
reviewers:
- klueska
- pohly
- bart0sh
labels:
- sig/node

View File

@ -0,0 +1,85 @@
# This YAML file deploys the csi-driver-host-path on a number of nodes such
# that it proxies all connections from kubelet (plugin registration and dynamic
# resource allocation). The actual handling of those connections then happens
# inside the e2e.test binary via test/e2e/storage/drivers/proxy. This approach
# has the advantage that no separate container image with the test driver is
# needed and that tests have full control over the driver, for example for
# error injection.
#
# The csi-driver-host-path image is used because:
# - it has the necessary proxy mode (https://github.com/kubernetes-csi/csi-driver-host-path/commit/65480fc74d550a9a5aa81e850955cc20403857b1)
# - its base image contains a shell (useful for creating files)
# - the image is already a dependency of e2e.test
kind: ReplicaSet
apiVersion: apps/v1
metadata:
name: dra-test-driver
labels:
app.kubernetes.io/instance: test-driver.dra.k8s.io
app.kubernetes.io/part-of: dra-test-driver
app.kubernetes.io/name: dra-test-driver-kubelet-plugin
app.kubernetes.io/component: kubelet-plugin
spec:
selector:
matchLabels:
app.kubernetes.io/instance: test-driver.dra.k8s.io
app.kubernetes.io/part-of: dra-test-driver
app.kubernetes.io/name: dra-test-driver-kubelet-plugin
app.kubernetes.io/component: kubelet-plugin
replicas: 1
template:
metadata:
labels:
app.kubernetes.io/instance: test-driver.dra.k8s.io
app.kubernetes.io/part-of: dra-test-driver
app.kubernetes.io/name: dra-test-driver-kubelet-plugin
app.kubernetes.io/component: kubelet-plugin
spec:
# Ensure that all pods run on distinct nodes.
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
app.kubernetes.io/instance: test-driver.dra.k8s.io
topologyKey: kubernetes.io/hostname
containers:
- name: registrar
image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3
args:
- "--v=5"
- "--endpoint=/plugins_registry/dra-test-driver-reg.sock"
- "--proxy-endpoint=tcp://:9000"
volumeMounts:
- mountPath: /plugins_registry
name: registration-dir
- name: plugin
image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3
args:
- "--v=5"
- "--endpoint=/dra/dra-test-driver.sock"
- "--proxy-endpoint=tcp://:9001"
securityContext:
privileged: true
volumeMounts:
- mountPath: /dra
name: socket-dir
- mountPath: /cdi
name: cdi-dir
volumes:
- hostPath:
path: /var/lib/kubelet/plugins
type: DirectoryOrCreate
name: socket-dir
- hostPath:
path: /var/run/cdi
type: DirectoryOrCreate
name: cdi-dir
- hostPath:
path: /var/lib/kubelet/plugins_registry
type: DirectoryOrCreate
name: registration-dir

3
vendor/modules.txt vendored
View File

@ -2055,6 +2055,9 @@ k8s.io/csi-translation-lib
k8s.io/csi-translation-lib/plugins
# k8s.io/dynamic-resource-allocation v0.0.0 => ./staging/src/k8s.io/dynamic-resource-allocation
## explicit; go 1.19
k8s.io/dynamic-resource-allocation/controller
k8s.io/dynamic-resource-allocation/kubeletplugin
k8s.io/dynamic-resource-allocation/leaderelection
k8s.io/dynamic-resource-allocation/resourceclaim
# k8s.io/gengo v0.0.0-20220902162205-c0856e24416d
## explicit; go 1.13