devicemanager: checkpoint: support pre-1.20 data

The commit a8b8995ef2
changed the content of the data kubelet writes in the checkpoint.
Unfortunately, the checkpoint restore code was not updated,
so if we upgrade kubelet from pre-1.20 to 1.20+, the
device manager cannot anymore restore its state correctly.

The only trace of this misbehaviour is this line in the
kubelet logs:
```
W0615 07:31:49.744770    4852 manager.go:244] Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: json: cannot unmarshal array into Go struct field PodDevicesEntry.Data.PodDeviceEntries.DeviceIDs of type checkpoint.DevicesPerNUMA
```

If we hit this bug, the device allocation info is
indeed NOT up-to-date up until the device plugins register
themselves again. This can take up to few minutes, depending
on the specific device plugin.

While the device manager state is inconsistent:
1. the kubelet will NOT update the device availability to zero, so
   the scheduler will send pods towards the inconsistent kubelet.
2. at pod admission time, the device manager allocation will not
   trigger, so pods will be admitted without devices actually
   being allocated to them.

To fix these issues, we add support to the device manager to
read pre-1.20 checkpoint data. We retroactively call this
format "v1".

Signed-off-by: Francesco Romani <fromani@redhat.com>
This commit is contained in:
Francesco Romani 2021-06-15 16:29:08 +02:00
parent dba9975e3e
commit 2f426fdba6
4 changed files with 193 additions and 13 deletions

View File

@ -27,7 +27,7 @@ import (
// DeviceManagerCheckpoint defines the operations to retrieve pod devices
type DeviceManagerCheckpoint interface {
checkpointmanager.Checkpoint
GetData() ([]PodDevicesEntry, map[string][]string)
GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string)
}
// DevicesPerNUMA represents device ids obtained from device plugin per NUMA node id
@ -72,9 +72,12 @@ func (dev DevicesPerNUMA) Devices() sets.String {
return result
}
// New returns an instance of Checkpoint
func New(devEntries []PodDevicesEntry,
devices map[string][]string) DeviceManagerCheckpoint {
// New returns an instance of Checkpoint - must be an alias for the most recent version
func New(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint {
return NewV2(devEntries, devices)
}
func NewV2(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint {
return &Data{
Data: checkpointData{
PodDeviceEntries: devEntries,
@ -99,7 +102,8 @@ func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(cp.Data)
}
// GetData returns device entries and registered devices
func (cp *Data) GetData() ([]PodDevicesEntry, map[string][]string) {
// GetDataInLatestFormat returns device entries and registered devices in the *most recent*
// checkpoint format, *not* in the original format stored on disk.
func (cp *Data) GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string) {
return cp.Data.PodDeviceEntries, cp.Data.RegisteredDevices
}

View File

@ -0,0 +1,124 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"encoding/json"
"hash/fnv"
"strings"
"github.com/davecgh/go-spew/spew"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)
// PodDevicesEntry connects pod information to devices, without topology information (k8s <= 1.19)
type PodDevicesEntryV1 struct {
PodUID string
ContainerName string
ResourceName string
DeviceIDs []string
AllocResp []byte
}
// checkpointData struct is used to store pod to device allocation information
// in a checkpoint file, without topology information (k8s <= 1.19)
type checkpointDataV1 struct {
PodDeviceEntries []PodDevicesEntryV1
RegisteredDevices map[string][]string
}
// checksum compute the checksum using the same algorithms (and data type names) k8s 1.19 used.
// We need this special code path to be able to correctly validate the checksum k8s 1.19 wrote.
// credits to https://github.com/kubernetes/kubernetes/pull/102717/commits/353f93895118d2ffa2d59a29a1fbc225160ea1d6
func (cp checkpointDataV1) checksum() checksum.Checksum {
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}
object := printer.Sprintf("%#v", cp)
object = strings.Replace(object, "checkpointDataV1", "checkpointData", 1)
object = strings.Replace(object, "PodDevicesEntryV1", "PodDevicesEntry", -1)
hash := fnv.New32a()
printer.Fprintf(hash, "%v", object)
return checksum.Checksum(hash.Sum32())
}
// Data holds checkpoint data and its checksum, in V1 (k8s <= 1.19) format
type DataV1 struct {
Data checkpointDataV1
Checksum checksum.Checksum
}
// New returns an instance of Checkpoint, in V1 (k8s <= 1.19) format.
// Users should avoid creating checkpoints in formats different than the most recent one,
// use the old formats only to validate existing checkpoint and convert them to most recent
// format. The only exception should be test code.
func NewV1(devEntries []PodDevicesEntryV1,
devices map[string][]string) DeviceManagerCheckpoint {
return &DataV1{
Data: checkpointDataV1{
PodDeviceEntries: devEntries,
RegisteredDevices: devices,
},
}
}
// MarshalCheckpoint is needed to implement the Checkpoint interface, but should not be called anymore
func (cp *DataV1) MarshalCheckpoint() ([]byte, error) {
klog.InfoS("Marshalling a device manager V1 checkpoint")
cp.Checksum = cp.Data.checksum()
return json.Marshal(*cp)
}
// MarshalCheckpoint returns marshalled data
func (cp *DataV1) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *DataV1) VerifyChecksum() error {
if cp.Checksum != cp.Data.checksum() {
return errors.ErrCorruptCheckpoint
}
return nil
}
// GetDataInLatestFormat returns device entries and registered devices in the *most recent*
// checkpoint format, *not* in the original format stored on disk.
func (cp *DataV1) GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string) {
var podDevs []PodDevicesEntry
for _, entryV1 := range cp.Data.PodDeviceEntries {
devsPerNuma := NewDevicesPerNUMA()
// no NUMA cell affinity was recorded. The only possible choice
// is to set all the devices affine to node 0.
devsPerNuma[0] = entryV1.DeviceIDs
podDevs = append(podDevs, PodDevicesEntry{
PodUID: entryV1.PodUID,
ContainerName: entryV1.ContainerName,
ResourceName: entryV1.ResourceName,
DeviceIDs: devsPerNuma,
AllocResp: entryV1.AllocResp,
})
}
return podDevs, cp.Data.RegisteredDevices
}

View File

@ -599,20 +599,33 @@ func (m *ManagerImpl) writeCheckpoint() error {
// Reads device to container allocation information from disk, and populates
// m.allocatedDevices accordingly.
func (m *ManagerImpl) readCheckpoint() error {
registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntry, 0)
cp := checkpoint.New(devEntries, registeredDevs)
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
// the vast majority of time we restore a compatible checkpoint, so we try
// the current version first. Trying to restore older format checkpoints is
// relevant only in the kubelet upgrade flow, which happens once in a
// (long) while.
cp, err := m.getCheckpointV2()
if err != nil {
if err == errors.ErrCheckpointNotFound {
klog.InfoS("Failed to retrieve checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint, "err", err)
// no point in trying anything else
klog.InfoS("Failed to read data from checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint, "err", err)
return nil
}
return err
var errv1 error
// one last try: maybe it's a old format checkpoint?
cp, errv1 = m.getCheckpointV1()
if errv1 != nil {
klog.InfoS("Failed to read checkpoint V1 file", "err", errv1)
// intentionally return the parent error. We expect to restore V1 checkpoints
// a tiny fraction of time, so what matters most is the current checkpoint read error.
return err
}
klog.InfoS("Read data from a V1 checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint)
}
m.mutex.Lock()
defer m.mutex.Unlock()
podDevices, registeredDevs := cp.GetData()
podDevices, registeredDevs := cp.GetDataInLatestFormat()
m.podDevices.fromCheckpointData(podDevices)
m.allocatedDevices = m.podDevices.devices()
for resource := range registeredDevs {
@ -625,6 +638,22 @@ func (m *ManagerImpl) readCheckpoint() error {
return nil
}
func (m *ManagerImpl) getCheckpointV2() (checkpoint.DeviceManagerCheckpoint, error) {
registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntry, 0)
cp := checkpoint.New(devEntries, registeredDevs)
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
return cp, err
}
func (m *ManagerImpl) getCheckpointV1() (checkpoint.DeviceManagerCheckpoint, error) {
registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntryV1, 0)
cp := checkpoint.NewV1(devEntries, registeredDevs)
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
return cp, err
}
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
func (m *ManagerImpl) UpdateAllocatedDevices() {
if !m.sourcesReady.AllReady() {

View File

@ -25,6 +25,7 @@ import (
"testing"
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
@ -1288,3 +1289,25 @@ func makeDevice(devOnNUMA checkpoint.DevicesPerNUMA, topology bool) map[string]p
}
return res
}
const deviceManagerCheckpointFilename = "kubelet_internal_checkpoint"
var oldCheckpoint string = `{"Data":{"PodDeviceEntries":[{"PodUID":"13ac2284-0d19-44b7-b94f-055b032dba9b","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA3"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkEzX1RUWTEwEgEwGhwKCi9kZXYvdHR5MTASCi9kZXYvdHR5MTAaAnJ3"},{"PodUID":"86b9a017-c9ca-4069-815f-46ca3e53c1e4","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA4"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkE0X1RUWTExEgEwGhwKCi9kZXYvdHR5MTESCi9kZXYvdHR5MTEaAnJ3"}],"RegisteredDevices":{"example.com/deviceA":["DevA1","DevA2","DevA3","DevA4"]}},"Checksum":405612085}`
func TestReadPreNUMACheckpoint(t *testing.T) {
socketDir, socketName, _, err := tmpSocketDir()
require.NoError(t, err)
defer os.RemoveAll(socketDir)
err = ioutil.WriteFile(filepath.Join(socketDir, deviceManagerCheckpointFilename), []byte(oldCheckpoint), 0644)
require.NoError(t, err)
topologyStore := topologymanager.NewFakeManager()
nodes := []cadvisorapi.Node{{Id: 0}}
m, err := newManagerImpl(socketName, nodes, topologyStore)
require.NoError(t, err)
// TODO: we should not calling private methods, but among the existing tests we do anyway
err = m.readCheckpoint()
require.NoError(t, err)
}