Merge pull request #67928 from vikaschoudhary16/e2e-dp

Add stub device plugin for e2e tests
This commit is contained in:
Kubernetes Prow Robot
2019-06-22 13:46:12 -07:00
committed by GitHub
7 changed files with 235 additions and 85 deletions

View File

@@ -17,8 +17,6 @@ limitations under the License.
package e2e_node
import (
"fmt"
"os"
"path/filepath"
"time"
@@ -31,11 +29,12 @@ import (
"k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/test/e2e/framework"
dputil "k8s.io/kubernetes/test/e2e/framework/deviceplugin"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
kubeletdevicepluginv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
dm "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
resapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -43,7 +42,8 @@ import (
const (
// fake resource name
resourceName = "fake.com/resource"
resourceName = "example.com/resource"
envVarNamePluginSockDir = "PLUGIN_SOCK_DIR"
)
// Serial because the test restarts Kubelet
@@ -63,27 +63,30 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
initialConfig.FeatureGates[string(features.KubeletPodResources)] = true
})
It("Verifies the Kubelet device plugin functionality.", func() {
By("Start stub device plugin")
// fake devices for e2e test
devs := []*kubeletdevicepluginv1beta1.Device{
{ID: "Dev-1", Health: kubeletdevicepluginv1beta1.Healthy},
{ID: "Dev-2", Health: kubeletdevicepluginv1beta1.Healthy},
By("Wait for node is ready to start with")
e2enode.WaitForNodeToBeReady(f.ClientSet, framework.TestContext.NodeName, 5*time.Minute)
dp := dputil.GetSampleDevicePluginPod()
for i := range dp.Spec.Containers[0].Env {
if dp.Spec.Containers[0].Env[i].Name == envVarNamePluginSockDir {
dp.Spec.Containers[0].Env[i].Value = pluginSockDir
}
}
socketPath := pluginSockDir + "dp." + fmt.Sprintf("%d", time.Now().Unix())
e2elog.Logf("socketPath %v", socketPath)
dp1 := dm.NewDevicePluginStub(devs, socketPath, resourceName, false)
dp1.SetAllocFunc(stubAllocFunc)
err := dp1.Start()
e2elog.Logf("env %v", dp.Spec.Containers[0].Env)
dp.Spec.NodeName = framework.TestContext.NodeName
By("Create sample device plugin pod")
devicePluginPod, err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Create(dp)
framework.ExpectNoError(err)
By("Register resources")
err = dp1.Register(kubeletdevicepluginv1beta1.KubeletSocket, resourceName, pluginSockDir)
framework.ExpectNoError(err)
By("Waiting for devices to become available on the local node")
Eventually(func() bool {
return dputil.NumberOfSampleResources(getLocalNode(f)) > 0
}, 5*time.Minute, framework.Poll).Should(BeTrue())
e2elog.Logf("Successfully created device plugin pod")
By("Waiting for the resource exported by the stub device plugin to become available on the local node")
devsLen := int64(len(devs))
By("Waiting for the resource exported by the sample device plugin to become available on the local node")
// TODO(vikasc): Instead of hard-coding number of devices, provide number of devices in the sample-device-plugin using configmap
// and then use the same here
devsLen := int64(2)
Eventually(func() bool {
node, err := f.ClientSet.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{})
framework.ExpectNoError(err)
@@ -99,15 +102,24 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
Expect(devId1).To(Not(Equal("")))
podResources, err := getNodeDevices()
var resourcesForOurPod *resapi.PodResources
e2elog.Logf("pod resources %v", podResources)
Expect(err).To(BeNil())
Expect(len(podResources.PodResources)).To(Equal(1))
Expect(podResources.PodResources[0].Name).To(Equal(pod1.Name))
Expect(podResources.PodResources[0].Namespace).To(Equal(pod1.Namespace))
Expect(len(podResources.PodResources[0].Containers)).To(Equal(1))
Expect(podResources.PodResources[0].Containers[0].Name).To(Equal(pod1.Spec.Containers[0].Name))
Expect(len(podResources.PodResources[0].Containers[0].Devices)).To(Equal(1))
Expect(podResources.PodResources[0].Containers[0].Devices[0].ResourceName).To(Equal(resourceName))
Expect(len(podResources.PodResources[0].Containers[0].Devices[0].DeviceIds)).To(Equal(1))
Expect(len(podResources.PodResources)).To(Equal(2))
for _, res := range podResources.GetPodResources() {
if res.Name == pod1.Name {
resourcesForOurPod = res
}
}
e2elog.Logf("resourcesForOurPod %v", resourcesForOurPod)
Expect(resourcesForOurPod).NotTo(BeNil())
Expect(resourcesForOurPod.Name).To(Equal(pod1.Name))
Expect(resourcesForOurPod.Namespace).To(Equal(pod1.Namespace))
Expect(len(resourcesForOurPod.Containers)).To(Equal(1))
Expect(resourcesForOurPod.Containers[0].Name).To(Equal(pod1.Spec.Containers[0].Name))
Expect(len(resourcesForOurPod.Containers[0].Devices)).To(Equal(1))
Expect(resourcesForOurPod.Containers[0].Devices[0].ResourceName).To(Equal(resourceName))
Expect(len(resourcesForOurPod.Containers[0].Devices[0].DeviceIds)).To(Equal(1))
pod1, err = f.PodClient().Get(pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
@@ -136,13 +148,20 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
return false
}, 5*time.Minute, framework.Poll).Should(BeTrue())
By("Re-Register resources")
dp1 = dm.NewDevicePluginStub(devs, socketPath, resourceName, false)
dp1.SetAllocFunc(stubAllocFunc)
err = dp1.Start()
By("Re-Register resources and deleting the pods and waiting for container removal")
getOptions := metav1.GetOptions{}
gp := int64(0)
deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: &gp,
}
err = f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(dp.Name, &deleteOptions)
framework.ExpectNoError(err)
waitForContainerRemoval(devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace)
_, err = f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Get(dp.Name, getOptions)
e2elog.Logf("Trying to get dp pod after deletion. err must be non-nil. err: %v", err)
framework.ExpectError(err)
err = dp1.Register(kubeletdevicepluginv1beta1.KubeletSocket, resourceName, pluginSockDir)
devicePluginPod, err = f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Create(dp)
framework.ExpectNoError(err)
ensurePodContainerRestart(f, pod1.Name, pod1.Name)
@@ -166,9 +185,10 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
Expect(devId1).To(Not(Equal(devId2)))
By("Deleting device plugin.")
err = dp1.Stop()
By("By deleting the pods and waiting for container removal")
err = f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(dp.Name, &deleteOptions)
framework.ExpectNoError(err)
waitForContainerRemoval(devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace)
By("Waiting for stub device plugin to become unhealthy on the local node")
Eventually(func() int64 {
@@ -187,12 +207,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
Expect(devIdRestart2).To(Equal(devId2))
By("Re-register resources")
dp1 = dm.NewDevicePluginStub(devs, socketPath, resourceName, false)
dp1.SetAllocFunc(stubAllocFunc)
err = dp1.Start()
framework.ExpectNoError(err)
err = dp1.Register(kubeletdevicepluginv1beta1.KubeletSocket, resourceName, pluginSockDir)
devicePluginPod, err = f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Create(dp)
framework.ExpectNoError(err)
By("Waiting for the resource exported by the stub device plugin to become healthy on the local node")
@@ -202,9 +217,10 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
return numberOfDevicesAllocatable(node, resourceName)
}, 30*time.Second, framework.Poll).Should(Equal(devsLen))
By("Deleting device plugin again.")
err = dp1.Stop()
By("by deleting the pods and waiting for container removal")
err = f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(dp.Name, &deleteOptions)
framework.ExpectNoError(err)
waitForContainerRemoval(devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace)
By("Waiting for stub device plugin to become unavailable on the local node")
Eventually(func() bool {
@@ -300,41 +316,3 @@ func numberOfDevicesAllocatable(node *v1.Node, resourceName string) int64 {
return val.Value()
}
// stubAllocFunc will pass to stub device plugin
func stubAllocFunc(r *kubeletdevicepluginv1beta1.AllocateRequest, devs map[string]kubeletdevicepluginv1beta1.Device) (*kubeletdevicepluginv1beta1.AllocateResponse, error) {
var responses kubeletdevicepluginv1beta1.AllocateResponse
for _, req := range r.ContainerRequests {
response := &kubeletdevicepluginv1beta1.ContainerAllocateResponse{}
for _, requestID := range req.DevicesIDs {
dev, ok := devs[requestID]
if !ok {
return nil, fmt.Errorf("invalid allocation request with non-existing device %s", requestID)
}
if dev.Health != kubeletdevicepluginv1beta1.Healthy {
return nil, fmt.Errorf("invalid allocation request with unhealthy device: %s", requestID)
}
// create fake device file
fpath := filepath.Join("/tmp", dev.ID)
// clean first
os.RemoveAll(fpath)
f, err := os.Create(fpath)
if err != nil && !os.IsExist(err) {
return nil, fmt.Errorf("failed to create fake device file: %s", err)
}
f.Close()
response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
ContainerPath: fpath,
HostPath: fpath,
})
}
responses.ContainerResponses = append(responses.ContainerResponses, response)
}
return &responses, nil
}