Node-level Checkpointing manager

This commit is contained in:
vikaschoudhary16 2017-11-20 03:43:52 -05:00 committed by vikaschoudhary16
parent 2ef566d0c3
commit d62bd9ef65
27 changed files with 927 additions and 281 deletions

View File

@ -163,6 +163,9 @@ pkg/kubelet/apis/kubeletconfig
pkg/kubelet/apis/kubeletconfig/v1beta1
pkg/kubelet/cadvisor
pkg/kubelet/cadvisor/testing
pkg/kubelet/checkpoint
pkg/kubelet/checkpointmanager/checksum
pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1
pkg/kubelet/client
pkg/kubelet/cm
pkg/kubelet/cm/util

View File

@ -246,6 +246,7 @@ filegroup(
"//pkg/kubelet/cadvisor:all-srcs",
"//pkg/kubelet/certificate:all-srcs",
"//pkg/kubelet/checkpoint:all-srcs",
"//pkg/kubelet/checkpointmanager:all-srcs",
"//pkg/kubelet/client:all-srcs",
"//pkg/kubelet/cm:all-srcs",
"//pkg/kubelet/config:all-srcs",

View File

@ -0,0 +1,48 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["checkpoint_manager.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager",
deps = [
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/util/store:go_default_library",
"//pkg/util/filesystem:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["checkpoint_manager_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/checkpointmanager/testing:go_default_library",
"//pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/checkpointmanager/checksum:all-srcs",
"//pkg/kubelet/checkpointmanager/errors:all-srcs",
"//pkg/kubelet/checkpointmanager/testing:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,24 @@
## DISCLAIMER
Sig-Node community has reached a general consensus, as a best practice, to
avoid introducing any new checkpointing support. We reached this understanding
after struggling with some hard-to-debug issues in the production environments
caused by the checkpointing.
## Introduction
This folder contains a framework & primitives, Checkpointing Manager, which is
used by several other Kubelet submodules, `dockershim`, `devicemanager`, `pods`
and `cpumanager`, to implement checkpointing at each submodule level. As already
explained in above `Disclaimer` section, think twice before introducing any further
checkpointing in Kubelet. If still checkpointing is required, then this folder
provides the common APIs and the framework for implementing checkpointing.
Using same APIs across all the submodules will help maintaining consistency at
Kubelet level.
Below is the history of checkpointing support in Kubelet.
| Package | First checkpointing support merged on | PR link |
| ------- | --------------------------------------| ------- |
|kubelet/dockershim | Feb 3, 2017 | [[CRI] Implement Dockershim Checkpoint](https://github.com/kubernetes/kubernetes/pull/39903)
|devicemanager| Sep 6, 2017 | [Deviceplugin checkpoint](https://github.com/kubernetes/kubernetes/pull/51744)
| kubelet/pod | Nov 22, 2017 | [Initial basic bootstrap-checkpoint support](https://github.com/kubernetes/kubernetes/pull/50984)
|cpumanager| Oct 27, 2017 |[Add file backed state to cpu manager ](https://github.com/kubernetes/kubernetes/pull/54408)

View File

@ -0,0 +1,106 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpointmanager
import (
"fmt"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
// Checkpoint provides the process checkpoint data
type Checkpoint interface {
MarshalCheckpoint() ([]byte, error)
UnmarshalCheckpoint(blob []byte) error
VerifyChecksum() error
}
// CheckpointManager provides the interface to manage checkpoint
type CheckpointManager interface {
// CreateCheckpoint persists checkpoint in CheckpointStore. checkpointKey is the key for utilstore to locate checkpoint.
// For file backed utilstore, checkpointKey is the file name to write the checkpoint data.
CreateCheckpoint(checkpointKey string, checkpoint Checkpoint) error
// GetCheckpoint retrieves checkpoint from CheckpointStore.
GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error
// WARNING: RemoveCheckpoint will not return error if checkpoint does not exist.
RemoveCheckpoint(checkpointKey string) error
// ListCheckpoint returns the list of existing checkpoints.
ListCheckpoints() ([]string, error)
}
// impl is an implementation of CheckpointManager. It persists checkpoint in CheckpointStore
type impl struct {
path string
store utilstore.Store
}
func NewCheckpointManager(checkpointDir string) (CheckpointManager, error) {
fstore, err := utilstore.NewFileStore(checkpointDir, utilfs.DefaultFs{})
if err != nil {
return nil, err
}
return &impl{path: checkpointDir, store: fstore}, nil
}
// CreateCheckpoint persists checkpoint in CheckpointStore.
func (manager *impl) CreateCheckpoint(checkpointKey string, checkpoint Checkpoint) error {
manager.mutex.Lock()
defer manager.mutex.Unlock()
blob, err := checkpoint.MarshalCheckpoint()
if err != nil {
return err
}
return manager.store.Write(checkpointKey, blob)
}
// GetCheckpoint retrieves checkpoint from CheckpointStore.
func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error {
manager.mutex.Lock()
defer manager.mutex.Unlock()
blob, err := manager.store.Read(checkpointKey)
if err != nil {
if err == utilstore.ErrKeyNotFound {
return errors.ErrCheckpointNotFound
}
return err
}
err = checkpoint.UnmarshalCheckpoint(blob)
if err == nil {
err = checkpoint.VerifyChecksum()
}
return err
}
func (manager *impl) RemoveCheckpoint(checkpointKey string) error {
manager.mutex.Lock()
defer manager.mutex.Unlock()
return manager.store.Delete(checkpointKey)
}
// ListCheckpoints returns the list of existing checkpoints.
func (manager *impl) ListCheckpoints() ([]string, error) {
manager.mutex.Lock()
defer manager.mutex.Unlock()
keys, err := manager.store.List()
if err != nil {
return []string{}, fmt.Errorf("failed to list checkpoint store: %v", err)
}
return keys, nil
}

View File

@ -0,0 +1,246 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpointmanager
import (
"encoding/json"
"hash/fnv"
"sort"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
utilstore "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1"
)
var testStore *utilstore.MemStore
type FakeCheckpoint interface {
Checkpoint
GetData() ([]*PortMapping, bool)
}
// Data contains all types of data that can be stored in the checkpoint.
type Data struct {
PortMappings []*PortMapping `json:"port_mappings,omitempty"`
HostNetwork bool `json:"host_network,omitempty"`
}
type CheckpointDataV2 struct {
PortMappings []*PortMapping `json:"port_mappings,omitempty"`
HostNetwork bool `json:"host_network,omitempty"`
V2Field string `json:"v2field"`
}
type protocol string
// portMapping is the port mapping configurations of a sandbox.
type portMapping struct {
// protocol of the port mapping.
Protocol *protocol
// Port number within the container.
ContainerPort *int32
// Port number on the host.
HostPort *int32
}
// CheckpointData is a sample example structure to be used in test cases for checkpointing
type CheckpointData struct {
Version string
Name string
Data *Data
Checksum checksum.Checksum
}
func newFakeCheckpointV1(name string, portMappings []*PortMapping, hostNetwork bool) FakeCheckpoint {
return &CheckpointData{
Version: "v1",
Name: name,
Data: &Data{
PortMappings: portMappings,
HostNetwork: hostNetwork,
},
}
}
func (cp *CheckpointData) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(*cp.Data)
return json.Marshal(*cp)
}
func (cp *CheckpointData) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
func (cp *CheckpointData) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Data)
}
func (cp *CheckpointData) GetData() ([]*PortMapping, bool) {
return cp.Data.PortMappings, cp.Data.HostNetwork
}
type checkpointDataV2 struct {
Version string
Name string
Data *CheckpointDataV2
Checksum checksum.Checksum
}
func newFakeCheckpointV2(name string, portMappings []*PortMapping, hostNetwork bool) FakeCheckpoint {
return &checkpointDataV2{
Version: "v2",
Name: name,
Data: &CheckpointDataV2{
PortMappings: portMappings,
HostNetwork: hostNetwork,
},
}
}
func newFakeCheckpointRemoteV1(name string, portMappings []*v1.PortMapping, hostNetwork bool) Checkpoint {
return &v1.CheckpointData{
Version: "v1",
Name: name,
Data: &v1.Data{
PortMappings: portMappings,
HostNetwork: hostNetwork,
},
}
}
func (cp *checkpointDataV2) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(*cp.Data)
return json.Marshal(*cp)
}
func (cp *checkpointDataV2) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
func (cp *checkpointDataV2) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Data)
}
func (cp *checkpointDataV2) GetData() ([]*PortMapping, bool) {
return cp.Data.PortMappings, cp.Data.HostNetwork
}
func newTestCheckpointManager() CheckpointManager {
return &impl{store: testStore}
}
func TestCheckpointManager(t *testing.T) {
var err error
testStore = utilstore.NewMemStore()
manager := newTestCheckpointManager()
port80 := int32(80)
port443 := int32(443)
proto := protocol("tcp")
portMappings := []*portMapping{
{
&proto,
&port80,
&port80,
},
{
&proto,
&port443,
&port443,
},
}
checkpoint1 := newFakeCheckpointV1("check1", portMappings, true)
checkpoints := []struct {
checkpointKey string
checkpoint FakeCheckpoint
expectHostNetwork bool
}{
{
"key1",
checkpoint1,
true,
},
{
"key2",
newFakeCheckpointV1("check2", nil, false),
false,
},
}
for _, tc := range checkpoints {
// Test CreateCheckpoints
err = manager.CreateCheckpoint(tc.checkpointKey, tc.checkpoint)
assert.NoError(t, err)
// Test GetCheckpoints
checkpointOut := newFakeCheckpointV1("", nil, false)
err := manager.GetCheckpoint(tc.checkpointKey, checkpointOut)
assert.NoError(t, err)
actualPortMappings, actualHostNetwork := checkpointOut.GetData()
expPortMappings, expHostNetwork := tc.checkpoint.GetData()
assert.Equal(t, actualPortMappings, expPortMappings)
assert.Equal(t, actualHostNetwork, expHostNetwork)
}
// Test it fails if tried to read V1 structure into V2, a different structure from the structure which is checkpointed
checkpointV2 := newFakeCheckpointV2("", nil, false)
err = manager.GetCheckpoint("key1", checkpointV2)
assert.EqualError(t, err, "checkpoint is corrupted")
// Test it fails if tried to read V1 structure into the same structure but defined in another package
checkpointRemoteV1 := newFakeCheckpointRemoteV1("", nil, false)
err = manager.GetCheckpoint("key1", checkpointRemoteV1)
assert.EqualError(t, err, "checkpoint is corrupted")
// Test it works if tried to read V1 structure using into a new V1 structure
checkpointV1 := newFakeCheckpointV1("", nil, false)
err = manager.GetCheckpoint("key1", checkpointV1)
assert.NoError(t, err)
// Test corrupt checksum case
checkpointOut := newFakeCheckpointV1("", nil, false)
blob, err := checkpointOut.MarshalCheckpoint()
assert.NoError(t, err)
testStore.Write("key1", blob)
err = manager.GetCheckpoint("key1", checkpoint1)
assert.EqualError(t, err, "checkpoint is corrupted")
// Test ListCheckpoints
keys, err := manager.ListCheckpoints()
assert.NoError(t, err)
sort.Strings(keys)
assert.Equal(t, keys, []string{"key1", "key2"})
// Test RemoveCheckpoints
err = manager.RemoveCheckpoint("key1")
assert.NoError(t, err)
// Test Remove Nonexisted Checkpoints
err = manager.RemoveCheckpoint("key1")
assert.NoError(t, err)
// Test ListCheckpoints
keys, err = manager.ListCheckpoints()
assert.NoError(t, err)
assert.Equal(t, keys, []string{"key2"})
// Test Get NonExisted Checkpoint
checkpointNE := newFakeCheckpointV1("NE", nil, false)
err = manager.GetCheckpoint("key1", checkpointNE)
assert.Error(t, err)
}

View File

@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["checksum.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/util/hash:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,46 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checksum
import (
"hash/fnv"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
hashutil "k8s.io/kubernetes/pkg/util/hash"
)
// Data to be stored as checkpoint
type Checksum uint64
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cs Checksum) Verify(data interface{}) error {
if cs != New(data) {
return errors.ErrCorruptCheckpoint
}
return nil
}
func New(data interface{}) Checksum {
return Checksum(getChecksum(data))
}
// Get returns calculated checksum of checkpoint data
func getChecksum(data interface{}) uint64 {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, data)
return uint64(hash.Sum32())
}

View File

@ -7,8 +7,8 @@ load(
go_library(
name = "go_default_library",
srcs = ["util.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim/testing",
srcs = ["errors.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors",
)
filegroup(

View File

@ -0,0 +1,22 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package errors
import "fmt"
var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.")
var CheckpointNotFoundError = fmt.Errorf("checkpoint is not found.")

View File

@ -0,0 +1,28 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["util.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing",
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["types.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1",
visibility = ["//visibility:public"],
deps = ["//pkg/kubelet/checkpointmanager/checksum:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,62 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"encoding/json"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
type protocol string
// portMapping is the port mapping configurations of a sandbox.
type PortMapping struct {
// protocol of the port mapping.
Protocol *protocol
// Port number within the container.
ContainerPort *int32
// Port number on the host.
HostPort *int32
}
// CheckpointData contains all types of data that can be stored in the checkpoint.
type Data struct {
PortMappings []*PortMapping `json:"port_mappings,omitempty"`
HostNetwork bool `json:"host_network,omitempty"`
}
// CheckpointData is a sample example structure to be used in test cases for checkpointing
type CheckpointData struct {
Version string
Name string
Data *Data
Checksum checksum.Checksum
}
func (cp *CheckpointData) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(*cp.Data)
return json.Marshal(*cp)
}
func (cp *CheckpointData) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
func (cp *CheckpointData) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Data)
}

View File

@ -15,13 +15,14 @@ go_library(
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/util/store:go_default_library",
"//pkg/scheduler/schedulercache:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
@ -39,10 +40,9 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/util/store:go_default_library",
"//pkg/scheduler/schedulercache:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
@ -62,7 +62,10 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//pkg/kubelet/cm/devicemanager/checkpoint:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["checkpoint.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,81 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"encoding/json"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
type DeviceManagerCheckpoint interface {
checkpointmanager.Checkpoint
GetData() ([]PodDevicesEntry, map[string][]string)
}
type PodDevicesEntry struct {
PodUID string
ContainerName string
ResourceName string
DeviceIDs []string
AllocResp []byte
}
// checkpointData struct is used to store pod to device allocation information
// in a checkpoint file.
// TODO: add version control when we need to change checkpoint format.
type checkpointData struct {
PodDeviceEntries []PodDevicesEntry
RegisteredDevices map[string][]string
}
type Data struct {
Data checkpointData
Checksum checksum.Checksum
}
// NewDeviceManagerCheckpoint returns an instance of Checkpoint
func New(devEntries []PodDevicesEntry,
devices map[string][]string) DeviceManagerCheckpoint {
return &Data{
Data: checkpointData{
PodDeviceEntries: devEntries,
RegisteredDevices: devices,
},
}
}
// MarshalCheckpoint returns marshalled data
func (cp *Data) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(cp.Data)
return json.Marshal(*cp)
}
// UnmarshalCheckpoint returns unmarshalled data
func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(cp.Data)
}
func (cp *Data) GetData() ([]PodDevicesEntry, map[string][]string) {
return cp.Data.PodDeviceEntries, cp.Data.RegisteredDevices
}

View File

@ -18,7 +18,6 @@ package devicemanager
import (
"context"
"encoding/json"
"fmt"
"net"
"os"
@ -34,12 +33,13 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
// ActivePodsFunc is a function that returns a list of pods to reconcile.
@ -84,8 +84,8 @@ type ManagerImpl struct {
// podDevices contains pod to allocated device mapping.
podDevices podDevices
store utilstore.Store
pluginOpts map[string]*pluginapi.DevicePluginOptions
checkpointManager checkpointmanager.CheckpointManager
}
type sourcesReadyStub struct{}
@ -122,11 +122,11 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) {
// Before that, initializes them to perform no-op operations.
manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
manager.sourcesReady = &sourcesReadyStub{}
var err error
manager.store, err = utilstore.NewFileStore(dir, utilfs.DefaultFs{})
checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
if err != nil {
return nil, fmt.Errorf("failed to initialize device plugin checkpointing store: %+v", err)
return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
}
manager.checkpointManager = checkpointManager
return manager, nil
}
@ -454,33 +454,19 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
return capacity, allocatable, deletedResources.UnsortedList()
}
// checkpointData struct is used to store pod to device allocation information
// and registered device information in a checkpoint file.
// TODO: add version control when we need to change checkpoint format.
type checkpointData struct {
PodDeviceEntries []podDevicesCheckpointEntry
RegisteredDevices map[string][]string
}
// Checkpoints device to container allocation information to disk.
func (m *ManagerImpl) writeCheckpoint() error {
m.mutex.Lock()
data := checkpointData{
PodDeviceEntries: m.podDevices.toCheckpointData(),
RegisteredDevices: make(map[string][]string),
}
registeredDevs := make(map[string][]string)
for resource, devices := range m.healthyDevices {
data.RegisteredDevices[resource] = devices.UnsortedList()
registeredDevs[resource] = devices.UnsortedList()
}
data := checkpoint.New(m.podDevices.toCheckpointData(),
registeredDevs)
m.mutex.Unlock()
dataJSON, err := json.Marshal(data)
err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data)
if err != nil {
return err
}
err = m.store.Write(kubeletDeviceManagerCheckpoint, dataJSON)
if err != nil {
return fmt.Errorf("failed to write deviceplugin checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err)
return fmt.Errorf("failed to write checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err)
}
return nil
}
@ -488,24 +474,23 @@ func (m *ManagerImpl) writeCheckpoint() error {
// Reads device to container allocation information from disk, and populates
// m.allocatedDevices accordingly.
func (m *ManagerImpl) readCheckpoint() error {
content, err := m.store.Read(kubeletDeviceManagerCheckpoint)
registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntry, 0)
cp := checkpoint.New(devEntries, registeredDevs)
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
if err != nil {
if err == utilstore.ErrKeyNotFound {
if err == errors.ErrCheckpointNotFound {
glog.Warningf("Failed to retrieve checkpoint for %q: %v", kubeletDeviceManagerCheckpoint, err)
return nil
}
return fmt.Errorf("failed to read checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err)
return err
}
glog.V(4).Infof("Read checkpoint file %s\n", kubeletDeviceManagerCheckpoint)
var data checkpointData
if err := json.Unmarshal(content, &data); err != nil {
return fmt.Errorf("failed to unmarshal deviceplugin checkpoint data: %v", err)
}
m.mutex.Lock()
defer m.mutex.Unlock()
m.podDevices.fromCheckpointData(data.PodDeviceEntries)
podDevices, registeredDevs := cp.GetData()
m.podDevices.fromCheckpointData(podDevices)
m.allocatedDevices = m.podDevices.devices()
for resource := range data.RegisteredDevices {
for resource := range registeredDevs {
// During start up, creates empty healthyDevices list so that the resource capacity
// will stay zero till the corresponding device plugin re-registers.
m.healthyDevices[resource] = sets.NewString()

View File

@ -34,10 +34,9 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (
@ -347,20 +346,19 @@ func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.Cont
func TestCheckpoint(t *testing.T) {
resourceName1 := "domain1.com/resource1"
resourceName2 := "domain2.com/resource2"
as := assert.New(t)
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
defer os.RemoveAll(tmpDir)
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
as.Nil(err)
testManager := &ManagerImpl{
socketdir: tmpDir,
endpoints: make(map[string]endpoint),
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
checkpointManager: ckm,
}
testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{})
testManager.podDevices.insert("pod1", "con1", resourceName1,
constructDevices([]string{"dev1", "dev2"}),
@ -479,8 +477,12 @@ func makePod(limits v1.ResourceList) *v1.Pod {
}
}
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) *ManagerImpl {
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) (*ManagerImpl, error) {
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
if err != nil {
return nil, err
}
testManager := &ManagerImpl{
socketdir: tmpDir,
callback: monitorCallback,
@ -492,8 +494,8 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
podDevices: make(podDevices),
activePods: activePods,
sourcesReady: &sourcesReadyStub{},
checkpointManager: ckm,
}
testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{})
for _, res := range testRes {
testManager.healthyDevices[res.resourceName] = sets.NewString()
for _, dev := range res.devs {
@ -525,7 +527,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
}
}
}
return testManager
return testManager, nil
}
func getTestNodeInfo(allocatable v1.ResourceList) *schedulercache.NodeInfo {
@ -569,7 +571,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
defer os.RemoveAll(tmpDir)
nodeInfo := getTestNodeInfo(v1.ResourceList{})
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts)
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts)
as.Nil(err)
testPods := []*v1.Pod{
makePod(v1.ResourceList{
@ -664,7 +667,8 @@ func TestInitContainerDeviceAllocation(t *testing.T) {
as.Nil(err)
defer os.RemoveAll(tmpDir)
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts)
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts)
as.Nil(err)
podWithPluginResourcesInInitContainers := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -742,14 +746,18 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
as := assert.New(t)
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
as.Nil(err)
testManager := &ManagerImpl{
callback: monitorCallback,
allDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
checkpointManager: ckm,
}
testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{})
// require one of resource1 and one of resource2
testManager.allocatedDevices[resourceName1] = sets.NewString()
testManager.allocatedDevices[resourceName1].Insert(devID1)
@ -796,7 +804,8 @@ func TestDevicePreStartContainer(t *testing.T) {
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
pluginOpts[res1.resourceName] = &pluginapi.DevicePluginOptions{PreStartRequired: true}
testManager := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts)
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts)
as.Nil(err)
ch := make(chan []string, 1)
testManager.endpoints[res1.resourceName] = &MockEndpoint{

View File

@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
@ -126,18 +127,9 @@ func (pdev podDevices) devices() map[string]sets.String {
return ret
}
// podDevicesCheckpointEntry is used to record <pod, container> to device allocation information.
type podDevicesCheckpointEntry struct {
PodUID string
ContainerName string
ResourceName string
DeviceIDs []string
AllocResp []byte
}
// Turns podDevices to checkpointData.
func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry {
var data []podDevicesCheckpointEntry
func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
var data []checkpoint.PodDevicesEntry
for podUID, containerDevices := range pdev {
for conName, resources := range containerDevices {
for resource, devices := range resources {
@ -152,7 +144,7 @@ func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry {
glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err)
continue
}
data = append(data, podDevicesCheckpointEntry{podUID, conName, resource, devIds, allocResp})
data = append(data, checkpoint.PodDevicesEntry{podUID, conName, resource, devIds, allocResp})
}
}
}
@ -160,7 +152,7 @@ func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry {
}
// Populates podDevices from the passed in checkpointData.
func (pdev podDevices) fromCheckpointData(data []podDevicesCheckpointEntry) {
func (pdev podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) {
for _, entry := range data {
glog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n",
entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp)

View File

@ -82,6 +82,8 @@ go_library(
"//pkg/credentialprovider:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/dockershim/cm:go_default_library",
@ -100,8 +102,6 @@ go_library(
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/kubelet/util/store:go_default_library",
"//pkg/security/apparmor:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//pkg/util/hash:go_default_library",
"//pkg/util/parsers:go_default_library",
"//vendor/github.com/armon/circbuf:go_default_library",
"//vendor/github.com/blang/semver:go_default_library",
@ -149,6 +149,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/dockershim/libdocker:go_default_library",
@ -186,7 +187,6 @@ filegroup(
"//pkg/kubelet/dockershim/metrics:all-srcs",
"//pkg/kubelet/dockershim/network:all-srcs",
"//pkg/kubelet/dockershim/remote:all-srcs",
"//pkg/kubelet/dockershim/testing:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],

View File

@ -164,13 +164,14 @@ func containerToRuntimeAPISandbox(c *dockertypes.Container) (*runtimeapi.PodSand
}, nil
}
func checkpointToRuntimeAPISandbox(id string, checkpoint *PodSandboxCheckpoint) *runtimeapi.PodSandbox {
func checkpointToRuntimeAPISandbox(id string, checkpoint DockershimCheckpoint) *runtimeapi.PodSandbox {
state := runtimeapi.PodSandboxState_SANDBOX_NOTREADY
_, name, namespace, _, _ := checkpoint.GetData()
return &runtimeapi.PodSandbox{
Id: id,
Metadata: &runtimeapi.PodSandboxMetadata{
Name: checkpoint.Name,
Namespace: checkpoint.Namespace,
Name: name,
Namespace: namespace,
},
State: state,
}

View File

@ -18,14 +18,9 @@ package dockershim
import (
"encoding/json"
"fmt"
"hash/fnv"
"path/filepath"
"github.com/golang/glog"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
hashutil "k8s.io/kubernetes/pkg/util/hash"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
const (
@ -36,6 +31,11 @@ const (
schemaVersion = "v1"
)
type DockershimCheckpoint interface {
checkpointmanager.Checkpoint
GetData() (string, string, string, []*PortMapping, bool)
}
type Protocol string
// PortMapping is the port mapping configurations of a sandbox.
@ -65,89 +65,31 @@ type PodSandboxCheckpoint struct {
// Data to checkpoint for pod sandbox.
Data *CheckpointData `json:"data,omitempty"`
// Checksum is calculated with fnv hash of the checkpoint object with checksum field set to be zero
CheckSum uint64 `json:"checksum"`
Checksum checksum.Checksum `json:"checksum"`
}
// CheckpointHandler provides the interface to manage PodSandbox checkpoint
type CheckpointHandler interface {
// CreateCheckpoint persists sandbox checkpoint in CheckpointStore.
CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error
// GetCheckpoint retrieves sandbox checkpoint from CheckpointStore.
GetCheckpoint(podSandboxID string) (*PodSandboxCheckpoint, error)
// RemoveCheckpoint removes sandbox checkpoint form CheckpointStore.
// WARNING: RemoveCheckpoint will not return error if checkpoint does not exist.
RemoveCheckpoint(podSandboxID string) error
// ListCheckpoint returns the list of existing checkpoints.
ListCheckpoints() ([]string, error)
}
// PersistentCheckpointHandler is an implementation of CheckpointHandler. It persists checkpoint in CheckpointStore
type PersistentCheckpointHandler struct {
store utilstore.Store
}
func NewPersistentCheckpointHandler(dockershimRootDir string) (CheckpointHandler, error) {
fstore, err := utilstore.NewFileStore(filepath.Join(dockershimRootDir, sandboxCheckpointDir), utilfs.DefaultFs{})
if err != nil {
return nil, err
}
return &PersistentCheckpointHandler{store: fstore}, nil
}
func (handler *PersistentCheckpointHandler) CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error {
checkpoint.CheckSum = calculateChecksum(*checkpoint)
blob, err := json.Marshal(checkpoint)
if err != nil {
return err
}
return handler.store.Write(podSandboxID, blob)
}
func (handler *PersistentCheckpointHandler) GetCheckpoint(podSandboxID string) (*PodSandboxCheckpoint, error) {
blob, err := handler.store.Read(podSandboxID)
if err != nil {
return nil, err
}
var checkpoint PodSandboxCheckpoint
//TODO: unmarhsal into a struct with just Version, check version, unmarshal into versioned type.
err = json.Unmarshal(blob, &checkpoint)
if err != nil {
glog.Errorf("Failed to unmarshal checkpoint %q, removing checkpoint. Checkpoint content: %q. ErrMsg: %v", podSandboxID, string(blob), err)
handler.RemoveCheckpoint(podSandboxID)
return nil, fmt.Errorf("failed to unmarshal checkpoint")
}
if checkpoint.CheckSum != calculateChecksum(checkpoint) {
glog.Errorf("Checksum of checkpoint %q is not valid, removing checkpoint", podSandboxID)
handler.RemoveCheckpoint(podSandboxID)
return nil, fmt.Errorf("checkpoint is corrupted")
}
return &checkpoint, nil
}
func (handler *PersistentCheckpointHandler) RemoveCheckpoint(podSandboxID string) error {
return handler.store.Delete(podSandboxID)
}
func (handler *PersistentCheckpointHandler) ListCheckpoints() ([]string, error) {
keys, err := handler.store.List()
if err != nil {
return []string{}, fmt.Errorf("failed to list checkpoint store: %v", err)
}
return keys, nil
}
func NewPodSandboxCheckpoint(namespace, name string) *PodSandboxCheckpoint {
func NewPodSandboxCheckpoint(namespace, name string, data *CheckpointData) DockershimCheckpoint {
return &PodSandboxCheckpoint{
Version: schemaVersion,
Namespace: namespace,
Name: name,
Data: &CheckpointData{},
Data: data,
}
}
func calculateChecksum(checkpoint PodSandboxCheckpoint) uint64 {
checkpoint.CheckSum = 0
hash := fnv.New32a()
hashutil.DeepHashObject(hash, checkpoint)
return uint64(hash.Sum32())
func (cp *PodSandboxCheckpoint) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(*cp.Data)
return json.Marshal(*cp)
}
func (cp *PodSandboxCheckpoint) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
func (cp *PodSandboxCheckpoint) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Data)
}
func (cp *PodSandboxCheckpoint) GetData() (string, string, string, []*PortMapping, bool) {
return cp.Version, cp.Name, cp.Namespace, cp.Data.PortMappings, cp.Data.HostNetwork
}

View File

@ -17,86 +17,17 @@ limitations under the License.
package dockershim
import (
"sort"
"testing"
"github.com/stretchr/testify/assert"
utilstore "k8s.io/kubernetes/pkg/kubelet/dockershim/testing"
)
func NewTestPersistentCheckpointHandler() CheckpointHandler {
return &PersistentCheckpointHandler{store: utilstore.NewMemStore()}
}
func TestPersistentCheckpointHandler(t *testing.T) {
var err error
handler := NewTestPersistentCheckpointHandler()
port80 := int32(80)
port443 := int32(443)
proto := protocolTCP
checkpoint1 := NewPodSandboxCheckpoint("ns1", "sandbox1")
checkpoint1.Data.PortMappings = []*PortMapping{
{
&proto,
&port80,
&port80,
},
{
&proto,
&port443,
&port443,
},
}
checkpoint1.Data.HostNetwork = true
checkpoints := []struct {
podSandboxID string
checkpoint *PodSandboxCheckpoint
expectHostNetwork bool
}{
{
"id1",
checkpoint1,
true,
},
{
"id2",
NewPodSandboxCheckpoint("ns2", "sandbox2"),
false,
},
}
for _, tc := range checkpoints {
// Test CreateCheckpoints
err = handler.CreateCheckpoint(tc.podSandboxID, tc.checkpoint)
assert.NoError(t, err)
// Test GetCheckpoints
checkpoint, err := handler.GetCheckpoint(tc.podSandboxID)
assert.NoError(t, err)
assert.Equal(t, *checkpoint, *tc.checkpoint)
assert.Equal(t, checkpoint.Data.HostNetwork, tc.expectHostNetwork)
}
// Test ListCheckpoints
keys, err := handler.ListCheckpoints()
assert.NoError(t, err)
sort.Strings(keys)
assert.Equal(t, keys, []string{"id1", "id2"})
// Test RemoveCheckpoints
err = handler.RemoveCheckpoint("id1")
assert.NoError(t, err)
// Test Remove Nonexisted Checkpoints
err = handler.RemoveCheckpoint("id1")
assert.NoError(t, err)
// Test ListCheckpoints
keys, err = handler.ListCheckpoints()
assert.NoError(t, err)
assert.Equal(t, keys, []string{"id2"})
// Test Get NonExisted Checkpoint
_, err = handler.GetCheckpoint("id1")
assert.Error(t, err)
func TestPodSandboxCheckpoint(t *testing.T) {
data := &CheckpointData{HostNetwork: true}
checkpoint := NewPodSandboxCheckpoint("ns1", "sandbox1", data)
version, name, namespace, _, hostNetwork := checkpoint.GetData()
assert.Equal(t, schemaVersion, version)
assert.Equal(t, "ns1", namespace)
assert.Equal(t, "sandbox1", name)
assert.Equal(t, true, hostNetwork)
}

View File

@ -30,6 +30,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
"k8s.io/kubernetes/pkg/kubelet/qos"
@ -118,7 +119,7 @@ func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPod
}(&err)
// Step 3: Create Sandbox Checkpoint.
if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
return nil, err
}
@ -189,7 +190,8 @@ func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopP
name = metadata.Name
hostNetwork = (networkNamespaceMode(inspectResult) == runtimeapi.NamespaceMode_NODE)
} else {
checkpoint, checkpointErr := ds.checkpointHandler.GetCheckpoint(podSandboxID)
checkpoint := NewPodSandboxCheckpoint("", "", &CheckpointData{})
checkpointErr := ds.checkpointManager.GetCheckpoint(podSandboxID, checkpoint)
// Proceed if both sandbox container and checkpoint could not be found. This means that following
// actions will only have sandbox ID and not have pod namespace and name information.
@ -204,9 +206,7 @@ func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopP
fmt.Errorf("failed to get sandbox status: %v", statusErr)})
}
} else {
namespace = checkpoint.Namespace
name = checkpoint.Name
hostNetwork = checkpoint.Data != nil && checkpoint.Data.HostNetwork
_, name, namespace, _, hostNetwork = checkpoint.GetData()
}
}
@ -237,7 +237,7 @@ func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopP
errList = append(errList, err)
} else {
// remove the checkpoint for any sandbox that is not found in the runtime
ds.checkpointHandler.RemoveCheckpoint(podSandboxID)
ds.checkpointManager.RemoveCheckpoint(podSandboxID)
}
}
@ -284,7 +284,7 @@ func (ds *dockerService) RemovePodSandbox(ctx context.Context, r *runtimeapi.Rem
}
// Remove the checkpoint of the sandbox.
if err := ds.checkpointHandler.RemoveCheckpoint(podSandboxID); err != nil {
if err := ds.checkpointManager.RemoveCheckpoint(podSandboxID); err != nil {
errs = append(errs, err)
}
if len(errs) == 0 {
@ -465,7 +465,7 @@ func (ds *dockerService) ListPodSandbox(_ context.Context, r *runtimeapi.ListPod
var err error
checkpoints := []string{}
if filter == nil {
checkpoints, err = ds.checkpointHandler.ListCheckpoints()
checkpoints, err = ds.checkpointManager.ListCheckpoints()
if err != nil {
glog.Errorf("Failed to list checkpoints: %v", err)
}
@ -501,7 +501,8 @@ func (ds *dockerService) ListPodSandbox(_ context.Context, r *runtimeapi.ListPod
if _, ok := sandboxIDs[id]; ok {
continue
}
checkpoint, err := ds.checkpointHandler.GetCheckpoint(id)
checkpoint := NewPodSandboxCheckpoint("", "", &CheckpointData{})
err := ds.checkpointManager.GetCheckpoint(id, checkpoint)
if err != nil {
glog.Errorf("Failed to retrieve checkpoint for sandbox %q: %v", id, err)
continue
@ -624,20 +625,20 @@ func ipcNamespaceMode(container *dockertypes.ContainerJSON) runtimeapi.Namespace
return runtimeapi.NamespaceMode_POD
}
func constructPodSandboxCheckpoint(config *runtimeapi.PodSandboxConfig) *PodSandboxCheckpoint {
checkpoint := NewPodSandboxCheckpoint(config.Metadata.Namespace, config.Metadata.Name)
func constructPodSandboxCheckpoint(config *runtimeapi.PodSandboxConfig) checkpointmanager.Checkpoint {
data := CheckpointData{}
for _, pm := range config.GetPortMappings() {
proto := toCheckpointProtocol(pm.Protocol)
checkpoint.Data.PortMappings = append(checkpoint.Data.PortMappings, &PortMapping{
data.PortMappings = append(data.PortMappings, &PortMapping{
HostPort: &pm.HostPort,
ContainerPort: &pm.ContainerPort,
Protocol: &proto,
})
}
if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
checkpoint.Data.HostNetwork = true
data.HostNetwork = true
}
return checkpoint
return NewPodSandboxCheckpoint(config.Metadata.Namespace, config.Metadata.Name, &data)
}
func toCheckpointProtocol(protocol runtimeapi.Protocol) Protocol {

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net/http"
"path/filepath"
"sync"
"time"
@ -30,6 +31,7 @@ import (
"k8s.io/api/core/v1"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
kubecm "k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim/cm"
@ -191,7 +193,8 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon
client := NewDockerClientFromConfig(config)
c := libdocker.NewInstrumentedInterface(client)
checkpointHandler, err := NewPersistentCheckpointHandler(dockershimRootDir)
checkpointManager, err := checkpointmanager.NewCheckpointManager(filepath.Join(dockershimRootDir, sandboxCheckpointDir))
if err != nil {
return nil, err
}
@ -205,7 +208,7 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon
execHandler: &NativeExecHandler{},
},
containerManager: cm.NewContainerManager(cgroupsName, client),
checkpointHandler: checkpointHandler,
checkpointManager: checkpointManager,
disableSharedPID: disableSharedPID,
networkReady: make(map[string]bool),
}
@ -293,7 +296,7 @@ type dockerService struct {
containerManager cm.ContainerManager
// cgroup driver used by Docker runtime.
cgroupDriver string
checkpointHandler CheckpointHandler
checkpointManager checkpointmanager.CheckpointManager
// caches the version of the runtime.
// To be compatible with multiple docker versions, we need to perform
// version checking for some operations. Use this cache to avoid querying
@ -365,7 +368,8 @@ func (ds *dockerService) GetNetNS(podSandboxID string) (string, error) {
// GetPodPortMappings returns the port mappings of the given podSandbox ID.
func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.PortMapping, error) {
// TODO: get portmappings from docker labels for backward compatibility
checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID)
checkpoint := NewPodSandboxCheckpoint("", "", &CheckpointData{})
err := ds.checkpointManager.GetCheckpoint(podSandboxID, checkpoint)
// Return empty portMappings if checkpoint is not found
if err != nil {
if err == utilstore.ErrKeyNotFound {
@ -373,9 +377,9 @@ func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.Po
}
return nil, err
}
portMappings := make([]*hostport.PortMapping, 0, len(checkpoint.Data.PortMappings))
for _, pm := range checkpoint.Data.PortMappings {
_, _, _, checkpointedPortMappings, _ := checkpoint.GetData()
portMappings := make([]*hostport.PortMapping, 0, len(checkpointedPortMappings))
for _, pm := range checkpointedPortMappings {
proto := toAPIProtocol(*pm.Protocol)
portMappings = append(portMappings, &hostport.PortMapping{
HostPort: *pm.HostPort,

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
"k8s.io/kubernetes/pkg/kubelet/dockershim/network"
@ -43,15 +44,50 @@ func newTestNetworkPlugin(t *testing.T) *nettest.MockNetworkPlugin {
return nettest.NewMockNetworkPlugin(ctrl)
}
type mockCheckpointManager struct {
checkpoint map[string]*PodSandboxCheckpoint
}
func (ckm *mockCheckpointManager) CreateCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error {
ckm.checkpoint[checkpointKey] = checkpoint.(*PodSandboxCheckpoint)
return nil
}
func (ckm *mockCheckpointManager) GetCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error {
*(checkpoint.(*PodSandboxCheckpoint)) = *(ckm.checkpoint[checkpointKey])
return nil
}
func (ckm *mockCheckpointManager) RemoveCheckpoint(checkpointKey string) error {
_, ok := ckm.checkpoint[checkpointKey]
if ok {
delete(ckm.checkpoint, "moo")
}
return nil
}
func (ckm *mockCheckpointManager) ListCheckpoints() ([]string, error) {
var keys []string
for key, _ := range ckm.checkpoint {
keys = append(keys, key)
}
return keys, nil
}
func newMockCheckpointManager() checkpointmanager.CheckpointManager {
return &mockCheckpointManager{checkpoint: make(map[string]*PodSandboxCheckpoint)}
}
func newTestDockerService() (*dockerService, *libdocker.FakeDockerClient, *clock.FakeClock) {
fakeClock := clock.NewFakeClock(time.Time{})
c := libdocker.NewFakeDockerClient().WithClock(fakeClock).WithVersion("1.11.2", "1.23").WithRandSource(rand.NewSource(0))
pm := network.NewPluginManager(&network.NoopNetworkPlugin{})
ckm := newMockCheckpointManager()
return &dockerService{
client: c,
os: &containertest.FakeOS{},
network: pm,
checkpointHandler: NewTestPersistentCheckpointHandler(),
checkpointManager: ckm,
networkReady: make(map[string]bool),
}, c, fakeClock
}