update unit test for plugin resources allocation reinforcement
This commit is contained in:
@@ -209,6 +209,7 @@ go_test(
|
|||||||
"//pkg/volume/host_path:go_default_library",
|
"//pkg/volume/host_path:go_default_library",
|
||||||
"//pkg/volume/testing:go_default_library",
|
"//pkg/volume/testing:go_default_library",
|
||||||
"//pkg/volume/util/volumehelper:go_default_library",
|
"//pkg/volume/util/volumehelper:go_default_library",
|
||||||
|
"//plugin/pkg/scheduler/schedulercache:go_default_library",
|
||||||
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
|
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
|
||||||
"//vendor/github.com/google/cadvisor/info/v2:go_default_library",
|
"//vendor/github.com/google/cadvisor/info/v2:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||||
|
@@ -57,6 +57,8 @@ go_test(
|
|||||||
library = ":go_default_library",
|
library = ":go_default_library",
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library",
|
"//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library",
|
||||||
|
"//pkg/kubelet/lifecycle:go_default_library",
|
||||||
|
"//plugin/pkg/scheduler/schedulercache:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
|
@@ -30,6 +30,8 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/uuid"
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestUpdateCapacity(t *testing.T) {
|
func TestUpdateCapacity(t *testing.T) {
|
||||||
@@ -224,13 +226,24 @@ func TestCheckpoint(t *testing.T) {
|
|||||||
as.True(reflect.DeepEqual(expectedAllocatedDevices, testHandler.allocatedDevices))
|
as.True(reflect.DeepEqual(expectedAllocatedDevices, testHandler.allocatedDevices))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type activePodsStub struct {
|
||||||
|
activePods []*v1.Pod
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *activePodsStub) getActivePods() []*v1.Pod {
|
||||||
|
return a.activePods
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
|
||||||
|
a.activePods = newPods
|
||||||
|
}
|
||||||
|
|
||||||
func TestPodContainerDeviceAllocation(t *testing.T) {
|
func TestPodContainerDeviceAllocation(t *testing.T) {
|
||||||
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
|
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
|
||||||
var logLevel string
|
var logLevel string
|
||||||
flag.StringVar(&logLevel, "logLevel", "4", "test")
|
flag.StringVar(&logLevel, "logLevel", "4", "test")
|
||||||
flag.Lookup("v").Value.Set(logLevel)
|
flag.Lookup("v").Value.Set(logLevel)
|
||||||
|
|
||||||
var activePods []*v1.Pod
|
|
||||||
resourceName1 := "domain1.com/resource1"
|
resourceName1 := "domain1.com/resource1"
|
||||||
resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI)
|
resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||||
devID1 := "dev1"
|
devID1 := "dev1"
|
||||||
@@ -244,6 +257,16 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
|
|||||||
as := assert.New(t)
|
as := assert.New(t)
|
||||||
as.Nil(err)
|
as.Nil(err)
|
||||||
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||||
|
podsStub := activePodsStub{
|
||||||
|
activePods: []*v1.Pod{},
|
||||||
|
}
|
||||||
|
cachedNode := &v1.Node{
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
Allocatable: v1.ResourceList{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
nodeInfo := &schedulercache.NodeInfo{}
|
||||||
|
nodeInfo.SetNode(cachedNode)
|
||||||
|
|
||||||
testHandler := &HandlerImpl{
|
testHandler := &HandlerImpl{
|
||||||
devicePluginManager: m,
|
devicePluginManager: m,
|
||||||
@@ -251,6 +274,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
|
|||||||
allDevices: make(map[string]sets.String),
|
allDevices: make(map[string]sets.String),
|
||||||
allocatedDevices: make(map[string]sets.String),
|
allocatedDevices: make(map[string]sets.String),
|
||||||
podDevices: make(podDevices),
|
podDevices: make(podDevices),
|
||||||
|
activePods: podsStub.getActivePods,
|
||||||
}
|
}
|
||||||
testHandler.allDevices[resourceName1] = sets.NewString()
|
testHandler.allDevices[resourceName1] = sets.NewString()
|
||||||
testHandler.allDevices[resourceName1].Insert(devID1)
|
testHandler.allDevices[resourceName1].Insert(devID1)
|
||||||
@@ -288,8 +312,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
activePods = append(activePods, pod)
|
podsStub.updateActivePods([]*v1.Pod{pod})
|
||||||
err = testHandler.Allocate(pod, &pod.Spec.Containers[0], activePods)
|
err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
|
||||||
as.Nil(err)
|
as.Nil(err)
|
||||||
runContainerOpts := testHandler.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
|
runContainerOpts := testHandler.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
|
||||||
as.Equal(len(runContainerOpts.Devices), 3)
|
as.Equal(len(runContainerOpts.Devices), 3)
|
||||||
@@ -315,7 +339,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err = testHandler.Allocate(failPod, &failPod.Spec.Containers[0], activePods)
|
err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: failPod})
|
||||||
as.NotNil(err)
|
as.NotNil(err)
|
||||||
runContainerOpts2 := testHandler.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0])
|
runContainerOpts2 := testHandler.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0])
|
||||||
as.Nil(runContainerOpts2)
|
as.Nil(runContainerOpts2)
|
||||||
@@ -338,8 +362,53 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err = testHandler.Allocate(newPod, &newPod.Spec.Containers[0], activePods)
|
err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: newPod})
|
||||||
as.Nil(err)
|
as.Nil(err)
|
||||||
runContainerOpts3 := testHandler.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0])
|
runContainerOpts3 := testHandler.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0])
|
||||||
as.Equal(1, len(runContainerOpts3.Envs))
|
as.Equal(1, len(runContainerOpts3.Envs))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSanitizeNodeAllocatable(t *testing.T) {
|
||||||
|
resourceName1 := "domain1.com/resource1"
|
||||||
|
devID1 := "dev1"
|
||||||
|
|
||||||
|
resourceName2 := "domain2.com/resource2"
|
||||||
|
devID2 := "dev2"
|
||||||
|
|
||||||
|
m, err := NewDevicePluginManagerTestStub()
|
||||||
|
as := assert.New(t)
|
||||||
|
as.Nil(err)
|
||||||
|
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||||
|
|
||||||
|
testHandler := &HandlerImpl{
|
||||||
|
devicePluginManager: m,
|
||||||
|
devicePluginManagerMonitorCallback: monitorCallback,
|
||||||
|
allDevices: make(map[string]sets.String),
|
||||||
|
allocatedDevices: make(map[string]sets.String),
|
||||||
|
podDevices: make(podDevices),
|
||||||
|
}
|
||||||
|
// require one of resource1 and one of resource2
|
||||||
|
testHandler.allocatedDevices[resourceName1] = sets.NewString()
|
||||||
|
testHandler.allocatedDevices[resourceName1].Insert(devID1)
|
||||||
|
testHandler.allocatedDevices[resourceName2] = sets.NewString()
|
||||||
|
testHandler.allocatedDevices[resourceName2].Insert(devID2)
|
||||||
|
|
||||||
|
cachedNode := &v1.Node{
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
Allocatable: v1.ResourceList{
|
||||||
|
// has no resource1 and two of resource2
|
||||||
|
v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
nodeInfo := &schedulercache.NodeInfo{}
|
||||||
|
nodeInfo.SetNode(cachedNode)
|
||||||
|
|
||||||
|
testHandler.sanitizeNodeAllocatable(nodeInfo)
|
||||||
|
|
||||||
|
allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources
|
||||||
|
// allocatable in nodeInfo is less than needed, should update
|
||||||
|
as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)]))
|
||||||
|
// allocatable in nodeInfo is more than needed, should skip updating
|
||||||
|
as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)]))
|
||||||
|
}
|
||||||
|
@@ -72,6 +72,7 @@ import (
|
|||||||
_ "k8s.io/kubernetes/pkg/volume/host_path"
|
_ "k8s.io/kubernetes/pkg/volume/host_path"
|
||||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
|
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -284,7 +285,7 @@ func newTestKubeletWithImageList(
|
|||||||
kubelet.evictionManager = evictionManager
|
kubelet.evictionManager = evictionManager
|
||||||
kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
|
kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
|
||||||
// Add this as cleanup predicate pod admitter
|
// Add this as cleanup predicate pod admitter
|
||||||
kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub()))
|
kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), kubelet.containerManager.UpdatePluginResources))
|
||||||
|
|
||||||
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
|
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
|
||||||
var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
|
var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
|
||||||
@@ -573,6 +574,116 @@ func TestHandleMemExceeded(t *testing.T) {
|
|||||||
checkPodStatus(t, kl, fittingPod, v1.PodPending)
|
checkPodStatus(t, kl, fittingPod, v1.PodPending)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that we handle result of interface UpdatePluginResources correctly
|
||||||
|
// by setting corresponding status in status map.
|
||||||
|
func TestHandlePluginResources(t *testing.T) {
|
||||||
|
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||||
|
defer testKubelet.Cleanup()
|
||||||
|
testKubelet.chainMock()
|
||||||
|
kl := testKubelet.kubelet
|
||||||
|
|
||||||
|
adjustedResource := v1.ResourceName("domain1.com/adjustedResource")
|
||||||
|
unadjustedResouce := v1.ResourceName("domain2.com/unadjustedResouce")
|
||||||
|
failedResource := v1.ResourceName("domain2.com/failedResource")
|
||||||
|
resourceQuantity1 := *resource.NewQuantity(int64(1), resource.DecimalSI)
|
||||||
|
resourceQuantity2 := *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||||
|
resourceQuantityInvalid := *resource.NewQuantity(int64(-1), resource.DecimalSI)
|
||||||
|
allowedPodQuantity := *resource.NewQuantity(int64(10), resource.DecimalSI)
|
||||||
|
nodes := []*v1.Node{
|
||||||
|
{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
|
||||||
|
Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{
|
||||||
|
adjustedResource: resourceQuantity1,
|
||||||
|
unadjustedResouce: resourceQuantity1,
|
||||||
|
v1.ResourcePods: allowedPodQuantity,
|
||||||
|
}}},
|
||||||
|
}
|
||||||
|
kl.nodeInfo = testNodeInfo{nodes: nodes}
|
||||||
|
|
||||||
|
updatePluginResourcesFunc := func(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||||
|
// Maps from resourceName to the value we use to set node.allocatableResource[resourceName].
|
||||||
|
// A resource with invalid value (< 0) causes the function to return an error
|
||||||
|
// to emulate resource Allocation failure.
|
||||||
|
// Resources not contained in this map will have their node.allocatableResource
|
||||||
|
// quantity unchanged.
|
||||||
|
updateResourceMap := map[v1.ResourceName]resource.Quantity{
|
||||||
|
adjustedResource: resourceQuantity2,
|
||||||
|
failedResource: resourceQuantityInvalid,
|
||||||
|
}
|
||||||
|
pod := attrs.Pod
|
||||||
|
allocatableResource := node.AllocatableResource()
|
||||||
|
newAllocatableResource := allocatableResource.Clone()
|
||||||
|
for _, container := range pod.Spec.Containers {
|
||||||
|
for resource := range container.Resources.Requests {
|
||||||
|
newQuantity, exist := updateResourceMap[resource]
|
||||||
|
if !exist {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if newQuantity.Value() < 0 {
|
||||||
|
return fmt.Errorf("Allocation failed")
|
||||||
|
}
|
||||||
|
newAllocatableResource.ScalarResources[resource] = newQuantity.Value()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
node.SetAllocatableResource(newAllocatableResource)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// add updatePluginResourcesFunc to admission handler, to test it's behavior.
|
||||||
|
kl.admitHandlers = lifecycle.PodAdmitHandlers{}
|
||||||
|
kl.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kl.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), updatePluginResourcesFunc))
|
||||||
|
|
||||||
|
// pod requiring adjustedResource can be successfully allocated because updatePluginResourcesFunc
|
||||||
|
// adjusts node.allocatableResource for this resource to a sufficient value.
|
||||||
|
fittingPodspec := v1.PodSpec{NodeName: string(kl.nodeName),
|
||||||
|
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
|
||||||
|
Limits: v1.ResourceList{
|
||||||
|
adjustedResource: resourceQuantity2,
|
||||||
|
},
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
adjustedResource: resourceQuantity2,
|
||||||
|
},
|
||||||
|
}}},
|
||||||
|
}
|
||||||
|
// pod requiring unadjustedResouce with insufficient quantity will still fail PredicateAdmit.
|
||||||
|
exceededPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
|
||||||
|
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
|
||||||
|
Limits: v1.ResourceList{
|
||||||
|
unadjustedResouce: resourceQuantity2,
|
||||||
|
},
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
unadjustedResouce: resourceQuantity2,
|
||||||
|
},
|
||||||
|
}}},
|
||||||
|
}
|
||||||
|
// pod requiring failedResource will fail with the resource failed to be allocated.
|
||||||
|
failedPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
|
||||||
|
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
|
||||||
|
Limits: v1.ResourceList{
|
||||||
|
failedResource: resourceQuantity1,
|
||||||
|
},
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
failedResource: resourceQuantity1,
|
||||||
|
},
|
||||||
|
}}},
|
||||||
|
}
|
||||||
|
pods := []*v1.Pod{
|
||||||
|
podWithUIDNameNsSpec("123", "fittingpod", "foo", fittingPodspec),
|
||||||
|
podWithUIDNameNsSpec("456", "exceededpod", "foo", exceededPodSpec),
|
||||||
|
podWithUIDNameNsSpec("789", "failedpod", "foo", failedPodSpec),
|
||||||
|
}
|
||||||
|
// The latter two pod should be rejected.
|
||||||
|
fittingPod := pods[0]
|
||||||
|
exceededPod := pods[1]
|
||||||
|
failedPod := pods[2]
|
||||||
|
|
||||||
|
kl.HandlePodAdditions(pods)
|
||||||
|
|
||||||
|
// Check pod status stored in the status map.
|
||||||
|
checkPodStatus(t, kl, fittingPod, v1.PodPending)
|
||||||
|
checkPodStatus(t, kl, exceededPod, v1.PodFailed)
|
||||||
|
checkPodStatus(t, kl, failedPod, v1.PodFailed)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal.
|
// TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal.
|
||||||
func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
|
func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
|
||||||
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||||
|
Reference in New Issue
Block a user