Implement TopologyInfo and cpu_ids in podresources

It covers deviceplugin & cpumanager.

It has drawback, since cpuset and all other structs including cadvisor's keep
cpu as int, but for protobuf based interface is better to have fixed
int.
This patch also introduces additional interface CPUsProvider, while
DeviceProvider might have been extended too.

Checkpoint not covered by unit test.

Signed-off-by: Swati Sehgal <swsehgal@redhat.com>
Signed-off-by: Alexey Perevalov <alexey.perevalov@huawei.com>
This commit is contained in:
Alexey Perevalov 2020-07-14 18:03:56 +03:00
parent 62326a1846
commit a8b8995ef2
20 changed files with 183 additions and 39 deletions

View File

@ -28,14 +28,16 @@ import (
type v1PodResourcesServer struct {
podsProvider PodsProvider
devicesProvider DevicesProvider
cpusProvider CPUsProvider
}
// NewV1PodResourcesServer returns a PodResourcesListerServer which lists pods provided by the PodsProvider
// with device information provided by the DevicesProvider
func NewV1PodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider) v1.PodResourcesListerServer {
func NewV1PodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider, cpusProvider CPUsProvider) v1.PodResourcesListerServer {
return &v1PodResourcesServer{
podsProvider: podsProvider,
devicesProvider: devicesProvider,
cpusProvider: cpusProvider,
}
}
@ -58,6 +60,7 @@ func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResource
pRes.Containers[j] = &v1.ContainerResources{
Name: container.Name,
Devices: p.devicesProvider.GetDevices(string(pod.UID), container.Name),
CpuIds: p.cpusProvider.GetCPUs(string(pod.UID), container.Name),
}
}
podResources[i] = &pRes

View File

@ -31,24 +31,30 @@ func TestListPodResourcesV1(t *testing.T) {
podNamespace := "pod-namespace"
podUID := types.UID("pod-uid")
containerName := "container-name"
numaID := int64(1)
devs := []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0", "dev1"},
Topology: &podresourcesapi.TopologyInfo{Nodes: []*podresourcesapi.NUMANode{{ID: numaID}}},
},
}
cpus := []int64{12, 23, 30}
for _, tc := range []struct {
desc string
pods []*v1.Pod
devices []*podresourcesapi.ContainerDevices
cpus []int64
expectedResponse *podresourcesapi.ListPodResourcesResponse
}{
{
desc: "no pods",
pods: []*v1.Pod{},
devices: []*podresourcesapi.ContainerDevices{},
cpus: []int64{},
expectedResponse: &podresourcesapi.ListPodResourcesResponse{},
},
{
@ -70,6 +76,7 @@ func TestListPodResourcesV1(t *testing.T) {
},
},
devices: []*podresourcesapi.ContainerDevices{},
cpus: []int64{},
expectedResponse: &podresourcesapi.ListPodResourcesResponse{
PodResources: []*podresourcesapi.PodResources{
{
@ -104,6 +111,7 @@ func TestListPodResourcesV1(t *testing.T) {
},
},
devices: devs,
cpus: cpus,
expectedResponse: &podresourcesapi.ListPodResourcesResponse{
PodResources: []*podresourcesapi.PodResources{
{
@ -113,6 +121,7 @@ func TestListPodResourcesV1(t *testing.T) {
{
Name: containerName,
Devices: devs,
CpuIds: cpus,
},
},
},
@ -124,8 +133,9 @@ func TestListPodResourcesV1(t *testing.T) {
m := new(mockProvider)
m.On("GetPods").Return(tc.pods)
m.On("GetDevices", string(podUID), containerName).Return(tc.devices)
m.On("GetCPUs", string(podUID), containerName).Return(tc.cpus)
m.On("UpdateAllocatedDevices").Return()
server := NewV1PodResourcesServer(m, m)
server := NewV1PodResourcesServer(m, m, m)
resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)

View File

@ -43,6 +43,11 @@ func (m *mockProvider) GetDevices(podUID, containerName string) []*podresourcesv
return args.Get(0).([]*podresourcesv1.ContainerDevices)
}
func (m *mockProvider) GetCPUs(podUID, containerName string) []int64 {
args := m.Called(podUID, containerName)
return args.Get(0).([]int64)
}
func (m *mockProvider) UpdateAllocatedDevices() {
m.Called()
}

View File

@ -31,3 +31,8 @@ type DevicesProvider interface {
type PodsProvider interface {
GetPods() []*v1.Pod
}
// CPUsProvider knows how to provide the cpus used by the given container
type CPUsProvider interface {
GetCPUs(podUID, containerName string) []int64
}

View File

@ -106,6 +106,9 @@ type ContainerManager interface {
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
// GetCPUs returns information about the cpus assigned to pods and containers
GetCPUs(podUID, containerName string) []int64
// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
// due to node recreation.
ShouldResetExtendedResourceCapacity() bool

View File

@ -1027,6 +1027,10 @@ func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podr
return cm.deviceManager.GetDevices(podUID, containerName)
}
func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
return cm.cpuManager.GetCPUs(podUID, containerName)
}
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return cm.deviceManager.ShouldResetExtendedResourceCapacity()
}

View File

@ -125,6 +125,10 @@ func (cm *containerManagerStub) UpdateAllocatedDevices() {
return
}
func (cm *containerManagerStub) GetCPUs(_, _ string) []int64 {
return nil
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
}

View File

@ -192,3 +192,7 @@ func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.
func (cm *containerManagerImpl) UpdateAllocatedDevices() {
return
}
func (cm *containerManagerImpl) GetCPUs(_, _ string) []int64 {
return nil
}

View File

@ -77,6 +77,10 @@ type Manager interface {
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
// GetCPUs implements the podresources.CPUsProvider interface to provide allocated
// cpus for the container
GetCPUs(podUID, containerName string) []int64
}
type manager struct {
@ -461,3 +465,12 @@ func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet)
CpusetCpus: cpus.String(),
})
}
func (m *manager) GetCPUs(podUID, containerName string) []int64 {
cpus := m.state.GetCPUSetOrDefault(string(podUID), containerName)
result := []int64{}
for _, cpu := range cpus.ToSliceNoSort() {
result = append(result, int64(cpu))
}
return result
}

View File

@ -64,6 +64,11 @@ func (m *fakeManager) State() state.Reader {
return m.state
}
func (m *fakeManager) GetCPUs(podUID, containerName string) []int64 {
klog.Infof("[fake cpumanager] GetCPUs(podUID: %s, containerName: %s)", podUID, containerName)
return nil
}
// NewFakeManager creates empty/fake cpu manager
func NewFakeManager() Manager {
return &fakeManager{

View File

@ -48,11 +48,13 @@ go_test(
srcs = [
"endpoint_test.go",
"manager_test.go",
"pod_devices_test.go",
"topology_hints_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/cm/topologymanager/bitmask:go_default_library",
"//pkg/kubelet/config:go_default_library",

View File

@ -8,6 +8,7 @@ go_library(
deps = [
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)

View File

@ -19,6 +19,7 @@ package checkpoint
import (
"encoding/json"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
@ -29,12 +30,15 @@ type DeviceManagerCheckpoint interface {
GetData() ([]PodDevicesEntry, map[string][]string)
}
// DevicesPerNUMA represents device ids obtained from device plugin per NUMA node id
type DevicesPerNUMA map[int64][]string
// PodDevicesEntry connects pod information to devices
type PodDevicesEntry struct {
PodUID string
ContainerName string
ResourceName string
DeviceIDs []string
DeviceIDs DevicesPerNUMA
AllocResp []byte
}
@ -52,6 +56,22 @@ type Data struct {
Checksum checksum.Checksum
}
// NewDevicesPerNUMA is a function that creates DevicesPerNUMA map
func NewDevicesPerNUMA() DevicesPerNUMA {
return make(DevicesPerNUMA)
}
// Devices is a function that returns all device ids for all NUMA nodes
// and represent it as sets.String
func (dev DevicesPerNUMA) Devices() sets.String {
result := sets.NewString()
for _, devs := range dev {
result.Insert(devs...)
}
return result
}
// New returns an instance of Checkpoint
func New(devEntries []PodDevicesEntry,
devices map[string][]string) DeviceManagerCheckpoint {

View File

@ -900,8 +900,17 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
return fmt.Errorf("no containers return in allocation response %v", resp)
}
allocDevicesWithNUMA := checkpoint.NewDevicesPerNUMA()
// Update internal cached podDevices state.
m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
m.mutex.Lock()
for dev := range allocDevices {
for idx := range m.allDevices[resource][dev].Topology.Nodes {
node := m.allDevices[resource][dev].Topology.Nodes[idx]
allocDevicesWithNUMA[node.ID] = append(allocDevicesWithNUMA[node.ID], dev)
}
}
m.mutex.Unlock()
m.podDevices.insert(podUID, contName, resource, allocDevicesWithNUMA, resp.ContainerResponses[0])
}
if needsUpdateCheckpoint {

View File

@ -38,6 +38,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -432,10 +433,10 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
as.True(testManager.isDevicePluginResource(resourceName2))
}
func constructDevices(devices []string) sets.String {
ret := sets.NewString()
func constructDevices(devices []string) checkpoint.DevicesPerNUMA {
ret := checkpoint.DevicesPerNUMA{}
for _, dev := range devices {
ret.Insert(dev)
ret[0] = append(ret[0], dev)
}
return ret
}
@ -621,13 +622,11 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
activePods: activePods,
sourcesReady: &sourcesReadyStub{},
checkpointManager: ckm,
allDevices: make(map[string]map[string]pluginapi.Device),
}
for _, res := range testRes {
testManager.healthyDevices[res.resourceName] = sets.NewString()
for _, dev := range res.devs {
testManager.healthyDevices[res.resourceName].Insert(dev)
}
testManager.healthyDevices[res.resourceName] = sets.NewString(res.devs.Devices().UnsortedList()...)
if res.resourceName == "domain1.com/resource1" {
testManager.endpoints[res.resourceName] = endpointInfo{
e: &MockEndpoint{allocateFunc: allocateStubFunc()},
@ -657,6 +656,8 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
opts: nil,
}
}
testManager.allDevices[res.resourceName] = makeDevice(res.devs)
}
return testManager, nil
}
@ -664,19 +665,19 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
type TestResource struct {
resourceName string
resourceQuantity resource.Quantity
devs []string
devs checkpoint.DevicesPerNUMA
}
func TestPodContainerDeviceAllocation(t *testing.T) {
res1 := TestResource{
resourceName: "domain1.com/resource1",
resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
devs: []string{"dev1", "dev2"},
devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}},
}
res2 := TestResource{
resourceName: "domain2.com/resource2",
resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI),
devs: []string{"dev3", "dev4"},
devs: checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}},
}
testResources := make([]TestResource, 2)
testResources = append(testResources, res1)
@ -767,12 +768,12 @@ func TestInitContainerDeviceAllocation(t *testing.T) {
res1 := TestResource{
resourceName: "domain1.com/resource1",
resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
devs: []string{"dev1", "dev2"},
devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}},
}
res2 := TestResource{
resourceName: "domain2.com/resource2",
resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI),
devs: []string{"dev3", "dev4"},
devs: checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}},
}
testResources := make([]TestResource, 2)
testResources = append(testResources, res1)
@ -920,7 +921,7 @@ func TestDevicePreStartContainer(t *testing.T) {
res1 := TestResource{
resourceName: "domain1.com/resource1",
resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
devs: []string{"dev1", "dev2"},
devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}},
}
as := require.New(t)
podsStub := activePodsStub{
@ -960,7 +961,7 @@ func TestDevicePreStartContainer(t *testing.T) {
as.Contains(initializedDevs, "dev1")
as.Contains(initializedDevs, "dev2")
as.Equal(len(initializedDevs), len(res1.devs))
as.Equal(len(initializedDevs), res1.devs.Devices().Len())
expectedResps, err := allocateStubFunc()([]string{"dev1", "dev2"})
as.Nil(err)
@ -1057,3 +1058,13 @@ func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error)
return resps, nil
}
}
func makeDevice(devOnNUMA checkpoint.DevicesPerNUMA) map[string]pluginapi.Device {
res := make(map[string]pluginapi.Device)
for node, devs := range devOnNUMA {
for idx := range devs {
res[devs[idx]] = pluginapi.Device{ID: devs[idx], Topology: &pluginapi.TopologyInfo{Nodes: []*pluginapi.NUMANode{{ID: node}}}}
}
}
return res
}

View File

@ -30,7 +30,7 @@ import (
type deviceAllocateInfo struct {
// deviceIds contains device Ids allocated to this container for the given resourceName.
deviceIds sets.String
deviceIds checkpoint.DevicesPerNUMA
// allocResp contains cached rpc AllocateResponse.
allocResp *pluginapi.ContainerAllocateResponse
}
@ -70,7 +70,7 @@ func (pdev *podDevices) hasPod(podUID string) bool {
return podExists
}
func (pdev *podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.ContainerAllocateResponse) {
func (pdev *podDevices) insert(podUID, contName, resource string, devices checkpoint.DevicesPerNUMA, resp *pluginapi.ContainerAllocateResponse) {
pdev.Lock()
defer pdev.Unlock()
if _, podExists := pdev.devs[podUID]; !podExists {
@ -108,7 +108,7 @@ func (pdev *podDevices) containerDevices(podUID, contName, resource string) sets
if !resourceExists {
return nil
}
return devs.deviceIds
return devs.deviceIds.Devices()
}
// Populates allocatedResources with the device resources allocated to the specified <podUID, contName>.
@ -124,7 +124,7 @@ func (pdev *podDevices) addContainerAllocatedResources(podUID, contName string,
return
}
for resource, devices := range resources {
allocatedResources[resource] = allocatedResources[resource].Union(devices.deviceIds)
allocatedResources[resource] = allocatedResources[resource].Union(devices.deviceIds.Devices())
}
}
@ -141,7 +141,7 @@ func (pdev *podDevices) removeContainerAllocatedResources(podUID, contName strin
return
}
for resource, devices := range resources {
allocatedResources[resource] = allocatedResources[resource].Difference(devices.deviceIds)
allocatedResources[resource] = allocatedResources[resource].Difference(devices.deviceIds.Devices())
}
}
@ -157,7 +157,7 @@ func (pdev *podDevices) devices() map[string]sets.String {
ret[resource] = sets.NewString()
}
if devices.allocResp != nil {
ret[resource] = ret[resource].Union(devices.deviceIds)
ret[resource] = ret[resource].Union(devices.deviceIds.Devices())
}
}
}
@ -173,7 +173,6 @@ func (pdev *podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
for podUID, containerDevices := range pdev.devs {
for conName, resources := range containerDevices {
for resource, devices := range resources {
devIds := devices.deviceIds.UnsortedList()
if devices.allocResp == nil {
klog.Errorf("Can't marshal allocResp for %v %v %v: allocation response is missing", podUID, conName, resource)
continue
@ -188,7 +187,7 @@ func (pdev *podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
PodUID: podUID,
ContainerName: conName,
ResourceName: resource,
DeviceIDs: devIds,
DeviceIDs: devices.deviceIds,
AllocResp: allocResp})
}
}
@ -201,17 +200,13 @@ func (pdev *podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) {
for _, entry := range data {
klog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n",
entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp)
devIDs := sets.NewString()
for _, devID := range entry.DeviceIDs {
devIDs.Insert(devID)
}
allocResp := &pluginapi.ContainerAllocateResponse{}
err := allocResp.Unmarshal(entry.AllocResp)
if err != nil {
klog.Errorf("Can't unmarshal allocResp for %v %v %v: %v", entry.PodUID, entry.ContainerName, entry.ResourceName, err)
continue
}
pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, devIDs, allocResp)
pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, allocResp)
}
}
@ -328,10 +323,13 @@ func (pdev *podDevices) getContainerDevices(podUID, contName string) []*podresou
}
cDev := []*podresourcesapi.ContainerDevices{}
for resource, allocateInfo := range pdev.devs[podUID][contName] {
cDev = append(cDev, &podresourcesapi.ContainerDevices{
ResourceName: resource,
DeviceIds: allocateInfo.deviceIds.UnsortedList(),
})
for numaid, devlist := range allocateInfo.deviceIds {
cDev = append(cDev, &podresourcesapi.ContainerDevices{
ResourceName: resource,
DeviceIds: devlist,
Topology: &podresourcesapi.TopologyInfo{Nodes: []*podresourcesapi.NUMANode{{ID: numaid}}},
})
}
}
return cDev
}

View File

@ -0,0 +1,47 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devicemanager
import (
"testing"
"github.com/stretchr/testify/require"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
)
func TestGetContainerDevices(t *testing.T) {
podDevices := newPodDevices()
resourceName1 := "domain1.com/resource1"
podID := "pod1"
contID := "con1"
devices := checkpoint.DevicesPerNUMA{0: []string{"dev1"}, 1: []string{"dev1"}}
podDevices.insert(podID, contID, resourceName1,
devices,
constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}, map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
contDevices := podDevices.getContainerDevices(podID, contID)
require.Equal(t, len(devices), len(contDevices), "Incorrect container devices")
for _, contDev := range contDevices {
for _, node := range contDev.Topology.Nodes {
dev, ok := devices[node.ID]
require.True(t, ok, "NUMA id %v doesn't exist in result", node.ID)
require.Equal(t, contDev.DeviceIds[0], dev[0], "Can't find device %s in result", dev[0])
}
}
}

View File

@ -404,7 +404,7 @@ func TestGetTopologyHints(t *testing.T) {
for p := range tc.allocatedDevices {
for c := range tc.allocatedDevices[p] {
for r, devices := range tc.allocatedDevices[p][c] {
m.podDevices.insert(p, c, r, sets.NewString(devices...), nil)
m.podDevices.insert(p, c, r, constructDevices(devices), nil)
m.allocatedDevices[r] = sets.NewString()
for _, d := range devices {

View File

@ -2193,7 +2193,7 @@ func (kl *Kubelet) ListenAndServePodResources() {
klog.V(2).Infof("Failed to get local endpoint for PodResources endpoint: %v", err)
return
}
server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager)
server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager, kl.containerManager)
}
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.

View File

@ -179,10 +179,10 @@ func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer st
}
// ListenAndServePodResources initializes a gRPC server to serve the PodResources service
func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider) {
func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider, cpusProvider podresources.CPUsProvider) {
server := grpc.NewServer()
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(podsProvider, devicesProvider))
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(podsProvider, devicesProvider))
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(podsProvider, devicesProvider, cpusProvider))
l, err := util.CreateListener(socket)
if err != nil {
klog.Fatalf("Failed to create listener for podResources endpoint: %v", err)