From d62bd9ef6571c13b134162da2f7ae41785c6a59e Mon Sep 17 00:00:00 2001 From: vikaschoudhary16 Date: Mon, 20 Nov 2017 03:43:52 -0500 Subject: [PATCH] Node-level Checkpointing manager --- hack/.golint_failures | 3 + pkg/kubelet/BUILD | 1 + pkg/kubelet/checkpointmanager/BUILD | 48 ++++ pkg/kubelet/checkpointmanager/README.md | 24 ++ .../checkpointmanager/checkpoint_manager.go | 106 ++++++++ .../checkpoint_manager_test.go | 246 ++++++++++++++++++ pkg/kubelet/checkpointmanager/checksum/BUILD | 26 ++ .../checkpointmanager/checksum/checksum.go | 46 ++++ .../errors}/BUILD | 4 +- .../checkpointmanager/errors/errors.go | 22 ++ pkg/kubelet/checkpointmanager/testing/BUILD | 28 ++ .../example_checkpoint_formats/v1/BUILD | 23 ++ .../example_checkpoint_formats/v1/types.go | 62 +++++ .../testing/util.go | 0 pkg/kubelet/cm/devicemanager/BUILD | 13 +- pkg/kubelet/cm/devicemanager/checkpoint/BUILD | 26 ++ .../cm/devicemanager/checkpoint/checkpoint.go | 81 ++++++ pkg/kubelet/cm/devicemanager/manager.go | 65 ++--- pkg/kubelet/cm/devicemanager/manager_test.go | 75 +++--- pkg/kubelet/cm/devicemanager/pod_devices.go | 18 +- pkg/kubelet/dockershim/BUILD | 6 +- pkg/kubelet/dockershim/convert.go | 7 +- pkg/kubelet/dockershim/docker_checkpoint.go | 108 ++------ .../dockershim/docker_checkpoint_test.go | 85 +----- pkg/kubelet/dockershim/docker_sandbox.go | 29 ++- pkg/kubelet/dockershim/docker_service.go | 18 +- pkg/kubelet/dockershim/docker_service_test.go | 38 ++- 27 files changed, 927 insertions(+), 281 deletions(-) create mode 100644 pkg/kubelet/checkpointmanager/BUILD create mode 100644 pkg/kubelet/checkpointmanager/README.md create mode 100644 pkg/kubelet/checkpointmanager/checkpoint_manager.go create mode 100644 pkg/kubelet/checkpointmanager/checkpoint_manager_test.go create mode 100644 pkg/kubelet/checkpointmanager/checksum/BUILD create mode 100644 pkg/kubelet/checkpointmanager/checksum/checksum.go rename pkg/kubelet/{dockershim/testing => checkpointmanager/errors}/BUILD (79%) create mode 100644 pkg/kubelet/checkpointmanager/errors/errors.go create mode 100644 pkg/kubelet/checkpointmanager/testing/BUILD create mode 100644 pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/BUILD create mode 100644 pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/types.go rename pkg/kubelet/{dockershim => checkpointmanager}/testing/util.go (100%) create mode 100644 pkg/kubelet/cm/devicemanager/checkpoint/BUILD create mode 100644 pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go diff --git a/hack/.golint_failures b/hack/.golint_failures index 1cec2f3c515..824de2477d9 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -163,6 +163,9 @@ pkg/kubelet/apis/kubeletconfig pkg/kubelet/apis/kubeletconfig/v1beta1 pkg/kubelet/cadvisor pkg/kubelet/cadvisor/testing +pkg/kubelet/checkpoint +pkg/kubelet/checkpointmanager/checksum +pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1 pkg/kubelet/client pkg/kubelet/cm pkg/kubelet/cm/util diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 891a61bdc05..b7657b78239 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -246,6 +246,7 @@ filegroup( "//pkg/kubelet/cadvisor:all-srcs", "//pkg/kubelet/certificate:all-srcs", "//pkg/kubelet/checkpoint:all-srcs", + "//pkg/kubelet/checkpointmanager:all-srcs", "//pkg/kubelet/client:all-srcs", "//pkg/kubelet/cm:all-srcs", "//pkg/kubelet/config:all-srcs", diff --git a/pkg/kubelet/checkpointmanager/BUILD b/pkg/kubelet/checkpointmanager/BUILD new file mode 100644 index 00000000000..c27bb8f5898 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/BUILD @@ -0,0 +1,48 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = ["checkpoint_manager.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager", + deps = [ + "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/kubelet/util/store:go_default_library", + "//pkg/util/filesystem:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["checkpoint_manager_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/kubelet/checkpointmanager/testing:go_default_library", + "//pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/kubelet/checkpointmanager/checksum:all-srcs", + "//pkg/kubelet/checkpointmanager/errors:all-srcs", + "//pkg/kubelet/checkpointmanager/testing:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/checkpointmanager/README.md b/pkg/kubelet/checkpointmanager/README.md new file mode 100644 index 00000000000..d5b6fd6eb74 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/README.md @@ -0,0 +1,24 @@ +## DISCLAIMER +Sig-Node community has reached a general consensus, as a best practice, to +avoid introducing any new checkpointing support. We reached this understanding +after struggling with some hard-to-debug issues in the production environments +caused by the checkpointing. + +## Introduction +This folder contains a framework & primitives, Checkpointing Manager, which is +used by several other Kubelet submodules, `dockershim`, `devicemanager`, `pods` +and `cpumanager`, to implement checkpointing at each submodule level. As already +explained in above `Disclaimer` section, think twice before introducing any further +checkpointing in Kubelet. If still checkpointing is required, then this folder +provides the common APIs and the framework for implementing checkpointing. +Using same APIs across all the submodules will help maintaining consistency at +Kubelet level. + +Below is the history of checkpointing support in Kubelet. + +| Package | First checkpointing support merged on | PR link | +| ------- | --------------------------------------| ------- | +|kubelet/dockershim | Feb 3, 2017 | [[CRI] Implement Dockershim Checkpoint](https://github.com/kubernetes/kubernetes/pull/39903) +|devicemanager| Sep 6, 2017 | [Deviceplugin checkpoint](https://github.com/kubernetes/kubernetes/pull/51744) +| kubelet/pod | Nov 22, 2017 | [Initial basic bootstrap-checkpoint support](https://github.com/kubernetes/kubernetes/pull/50984) +|cpumanager| Oct 27, 2017 |[Add file backed state to cpu manager ](https://github.com/kubernetes/kubernetes/pull/54408) diff --git a/pkg/kubelet/checkpointmanager/checkpoint_manager.go b/pkg/kubelet/checkpointmanager/checkpoint_manager.go new file mode 100644 index 00000000000..1d078357bff --- /dev/null +++ b/pkg/kubelet/checkpointmanager/checkpoint_manager.go @@ -0,0 +1,106 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkpointmanager + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" + utilfs "k8s.io/kubernetes/pkg/util/filesystem" +) + +// Checkpoint provides the process checkpoint data +type Checkpoint interface { + MarshalCheckpoint() ([]byte, error) + UnmarshalCheckpoint(blob []byte) error + VerifyChecksum() error +} + +// CheckpointManager provides the interface to manage checkpoint +type CheckpointManager interface { + // CreateCheckpoint persists checkpoint in CheckpointStore. checkpointKey is the key for utilstore to locate checkpoint. + // For file backed utilstore, checkpointKey is the file name to write the checkpoint data. + CreateCheckpoint(checkpointKey string, checkpoint Checkpoint) error + // GetCheckpoint retrieves checkpoint from CheckpointStore. + GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error + // WARNING: RemoveCheckpoint will not return error if checkpoint does not exist. + RemoveCheckpoint(checkpointKey string) error + // ListCheckpoint returns the list of existing checkpoints. + ListCheckpoints() ([]string, error) +} + +// impl is an implementation of CheckpointManager. It persists checkpoint in CheckpointStore +type impl struct { + path string + store utilstore.Store +} + +func NewCheckpointManager(checkpointDir string) (CheckpointManager, error) { + fstore, err := utilstore.NewFileStore(checkpointDir, utilfs.DefaultFs{}) + if err != nil { + return nil, err + } + + return &impl{path: checkpointDir, store: fstore}, nil +} + +// CreateCheckpoint persists checkpoint in CheckpointStore. +func (manager *impl) CreateCheckpoint(checkpointKey string, checkpoint Checkpoint) error { + manager.mutex.Lock() + defer manager.mutex.Unlock() + blob, err := checkpoint.MarshalCheckpoint() + if err != nil { + return err + } + return manager.store.Write(checkpointKey, blob) +} + +// GetCheckpoint retrieves checkpoint from CheckpointStore. +func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error { + manager.mutex.Lock() + defer manager.mutex.Unlock() + blob, err := manager.store.Read(checkpointKey) + if err != nil { + if err == utilstore.ErrKeyNotFound { + return errors.ErrCheckpointNotFound + } + return err + } + err = checkpoint.UnmarshalCheckpoint(blob) + if err == nil { + err = checkpoint.VerifyChecksum() + } + return err +} + +func (manager *impl) RemoveCheckpoint(checkpointKey string) error { + manager.mutex.Lock() + defer manager.mutex.Unlock() + return manager.store.Delete(checkpointKey) +} + +// ListCheckpoints returns the list of existing checkpoints. +func (manager *impl) ListCheckpoints() ([]string, error) { + manager.mutex.Lock() + defer manager.mutex.Unlock() + keys, err := manager.store.List() + if err != nil { + return []string{}, fmt.Errorf("failed to list checkpoint store: %v", err) + } + return keys, nil +} diff --git a/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go b/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go new file mode 100644 index 00000000000..4d02c4d5ac0 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go @@ -0,0 +1,246 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkpointmanager + +import ( + "encoding/json" + "hash/fnv" + "sort" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + utilstore "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1" +) + +var testStore *utilstore.MemStore + +type FakeCheckpoint interface { + Checkpoint + GetData() ([]*PortMapping, bool) +} + +// Data contains all types of data that can be stored in the checkpoint. +type Data struct { + PortMappings []*PortMapping `json:"port_mappings,omitempty"` + HostNetwork bool `json:"host_network,omitempty"` +} + +type CheckpointDataV2 struct { + PortMappings []*PortMapping `json:"port_mappings,omitempty"` + HostNetwork bool `json:"host_network,omitempty"` + V2Field string `json:"v2field"` +} + +type protocol string + +// portMapping is the port mapping configurations of a sandbox. +type portMapping struct { + // protocol of the port mapping. + Protocol *protocol + // Port number within the container. + ContainerPort *int32 + // Port number on the host. + HostPort *int32 +} + +// CheckpointData is a sample example structure to be used in test cases for checkpointing +type CheckpointData struct { + Version string + Name string + Data *Data + Checksum checksum.Checksum +} + +func newFakeCheckpointV1(name string, portMappings []*PortMapping, hostNetwork bool) FakeCheckpoint { + return &CheckpointData{ + Version: "v1", + Name: name, + Data: &Data{ + PortMappings: portMappings, + HostNetwork: hostNetwork, + }, + } +} + +func (cp *CheckpointData) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(*cp.Data) + return json.Marshal(*cp) +} + +func (cp *CheckpointData) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +func (cp *CheckpointData) VerifyChecksum() error { + return cp.Checksum.Verify(*cp.Data) +} + +func (cp *CheckpointData) GetData() ([]*PortMapping, bool) { + return cp.Data.PortMappings, cp.Data.HostNetwork +} + +type checkpointDataV2 struct { + Version string + Name string + Data *CheckpointDataV2 + Checksum checksum.Checksum +} + +func newFakeCheckpointV2(name string, portMappings []*PortMapping, hostNetwork bool) FakeCheckpoint { + return &checkpointDataV2{ + Version: "v2", + Name: name, + Data: &CheckpointDataV2{ + PortMappings: portMappings, + HostNetwork: hostNetwork, + }, + } +} + +func newFakeCheckpointRemoteV1(name string, portMappings []*v1.PortMapping, hostNetwork bool) Checkpoint { + return &v1.CheckpointData{ + Version: "v1", + Name: name, + Data: &v1.Data{ + PortMappings: portMappings, + HostNetwork: hostNetwork, + }, + } +} + +func (cp *checkpointDataV2) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(*cp.Data) + return json.Marshal(*cp) +} + +func (cp *checkpointDataV2) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +func (cp *checkpointDataV2) VerifyChecksum() error { + return cp.Checksum.Verify(*cp.Data) +} + +func (cp *checkpointDataV2) GetData() ([]*PortMapping, bool) { + return cp.Data.PortMappings, cp.Data.HostNetwork +} + +func newTestCheckpointManager() CheckpointManager { + return &impl{store: testStore} +} + +func TestCheckpointManager(t *testing.T) { + var err error + testStore = utilstore.NewMemStore() + manager := newTestCheckpointManager() + port80 := int32(80) + port443 := int32(443) + proto := protocol("tcp") + + portMappings := []*portMapping{ + { + &proto, + &port80, + &port80, + }, + { + &proto, + &port443, + &port443, + }, + } + checkpoint1 := newFakeCheckpointV1("check1", portMappings, true) + + checkpoints := []struct { + checkpointKey string + checkpoint FakeCheckpoint + expectHostNetwork bool + }{ + { + "key1", + checkpoint1, + true, + }, + { + "key2", + newFakeCheckpointV1("check2", nil, false), + false, + }, + } + + for _, tc := range checkpoints { + // Test CreateCheckpoints + err = manager.CreateCheckpoint(tc.checkpointKey, tc.checkpoint) + assert.NoError(t, err) + + // Test GetCheckpoints + checkpointOut := newFakeCheckpointV1("", nil, false) + err := manager.GetCheckpoint(tc.checkpointKey, checkpointOut) + assert.NoError(t, err) + actualPortMappings, actualHostNetwork := checkpointOut.GetData() + expPortMappings, expHostNetwork := tc.checkpoint.GetData() + assert.Equal(t, actualPortMappings, expPortMappings) + assert.Equal(t, actualHostNetwork, expHostNetwork) + } + // Test it fails if tried to read V1 structure into V2, a different structure from the structure which is checkpointed + checkpointV2 := newFakeCheckpointV2("", nil, false) + err = manager.GetCheckpoint("key1", checkpointV2) + assert.EqualError(t, err, "checkpoint is corrupted") + + // Test it fails if tried to read V1 structure into the same structure but defined in another package + checkpointRemoteV1 := newFakeCheckpointRemoteV1("", nil, false) + err = manager.GetCheckpoint("key1", checkpointRemoteV1) + assert.EqualError(t, err, "checkpoint is corrupted") + + // Test it works if tried to read V1 structure using into a new V1 structure + checkpointV1 := newFakeCheckpointV1("", nil, false) + err = manager.GetCheckpoint("key1", checkpointV1) + assert.NoError(t, err) + + // Test corrupt checksum case + checkpointOut := newFakeCheckpointV1("", nil, false) + blob, err := checkpointOut.MarshalCheckpoint() + assert.NoError(t, err) + testStore.Write("key1", blob) + err = manager.GetCheckpoint("key1", checkpoint1) + assert.EqualError(t, err, "checkpoint is corrupted") + + // Test ListCheckpoints + keys, err := manager.ListCheckpoints() + assert.NoError(t, err) + sort.Strings(keys) + assert.Equal(t, keys, []string{"key1", "key2"}) + + // Test RemoveCheckpoints + err = manager.RemoveCheckpoint("key1") + assert.NoError(t, err) + // Test Remove Nonexisted Checkpoints + err = manager.RemoveCheckpoint("key1") + assert.NoError(t, err) + + // Test ListCheckpoints + keys, err = manager.ListCheckpoints() + assert.NoError(t, err) + assert.Equal(t, keys, []string{"key2"}) + + // Test Get NonExisted Checkpoint + checkpointNE := newFakeCheckpointV1("NE", nil, false) + err = manager.GetCheckpoint("key1", checkpointNE) + assert.Error(t, err) +} diff --git a/pkg/kubelet/checkpointmanager/checksum/BUILD b/pkg/kubelet/checkpointmanager/checksum/BUILD new file mode 100644 index 00000000000..cb86b212baf --- /dev/null +++ b/pkg/kubelet/checkpointmanager/checksum/BUILD @@ -0,0 +1,26 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["checksum.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/util/hash:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/checkpointmanager/checksum/checksum.go b/pkg/kubelet/checkpointmanager/checksum/checksum.go new file mode 100644 index 00000000000..b2a98e45be4 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/checksum/checksum.go @@ -0,0 +1,46 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checksum + +import ( + "hash/fnv" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + hashutil "k8s.io/kubernetes/pkg/util/hash" +) + +// Data to be stored as checkpoint +type Checksum uint64 + +// VerifyChecksum verifies that passed checksum is same as calculated checksum +func (cs Checksum) Verify(data interface{}) error { + if cs != New(data) { + return errors.ErrCorruptCheckpoint + } + return nil +} + +func New(data interface{}) Checksum { + return Checksum(getChecksum(data)) +} + +// Get returns calculated checksum of checkpoint data +func getChecksum(data interface{}) uint64 { + hash := fnv.New32a() + hashutil.DeepHashObject(hash, data) + return uint64(hash.Sum32()) +} diff --git a/pkg/kubelet/dockershim/testing/BUILD b/pkg/kubelet/checkpointmanager/errors/BUILD similarity index 79% rename from pkg/kubelet/dockershim/testing/BUILD rename to pkg/kubelet/checkpointmanager/errors/BUILD index 0e57c9b081b..76c04fe9014 100644 --- a/pkg/kubelet/dockershim/testing/BUILD +++ b/pkg/kubelet/checkpointmanager/errors/BUILD @@ -7,8 +7,8 @@ load( go_library( name = "go_default_library", - srcs = ["util.go"], - importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim/testing", + srcs = ["errors.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors", ) filegroup( diff --git a/pkg/kubelet/checkpointmanager/errors/errors.go b/pkg/kubelet/checkpointmanager/errors/errors.go new file mode 100644 index 00000000000..25462ffd9c8 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/errors/errors.go @@ -0,0 +1,22 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package errors + +import "fmt" + +var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.") +var CheckpointNotFoundError = fmt.Errorf("checkpoint is not found.") diff --git a/pkg/kubelet/checkpointmanager/testing/BUILD b/pkg/kubelet/checkpointmanager/testing/BUILD new file mode 100644 index 00000000000..51c6764dac9 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/testing/BUILD @@ -0,0 +1,28 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["util.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing", +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/BUILD b/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/BUILD new file mode 100644 index 00000000000..e8a30850f14 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/BUILD @@ -0,0 +1,23 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["types.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1", + visibility = ["//visibility:public"], + deps = ["//pkg/kubelet/checkpointmanager/checksum:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/types.go b/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/types.go new file mode 100644 index 00000000000..7a883d5b5c8 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/types.go @@ -0,0 +1,62 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "encoding/json" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" +) + +type protocol string + +// portMapping is the port mapping configurations of a sandbox. +type PortMapping struct { + // protocol of the port mapping. + Protocol *protocol + // Port number within the container. + ContainerPort *int32 + // Port number on the host. + HostPort *int32 +} + +// CheckpointData contains all types of data that can be stored in the checkpoint. +type Data struct { + PortMappings []*PortMapping `json:"port_mappings,omitempty"` + HostNetwork bool `json:"host_network,omitempty"` +} + +// CheckpointData is a sample example structure to be used in test cases for checkpointing +type CheckpointData struct { + Version string + Name string + Data *Data + Checksum checksum.Checksum +} + +func (cp *CheckpointData) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(*cp.Data) + return json.Marshal(*cp) +} + +func (cp *CheckpointData) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +func (cp *CheckpointData) VerifyChecksum() error { + return cp.Checksum.Verify(*cp.Data) +} diff --git a/pkg/kubelet/dockershim/testing/util.go b/pkg/kubelet/checkpointmanager/testing/util.go similarity index 100% rename from pkg/kubelet/dockershim/testing/util.go rename to pkg/kubelet/checkpointmanager/testing/util.go diff --git a/pkg/kubelet/cm/devicemanager/BUILD b/pkg/kubelet/cm/devicemanager/BUILD index f23bfc488e2..ac2643dbb07 100644 --- a/pkg/kubelet/cm/devicemanager/BUILD +++ b/pkg/kubelet/cm/devicemanager/BUILD @@ -15,13 +15,14 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library", "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/metrics:go_default_library", - "//pkg/kubelet/util/store:go_default_library", "//pkg/scheduler/schedulercache:go_default_library", - "//pkg/util/filesystem:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", @@ -39,10 +40,9 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", - "//pkg/kubelet/util/store:go_default_library", "//pkg/scheduler/schedulercache:go_default_library", - "//pkg/util/filesystem:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", @@ -62,7 +62,10 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//pkg/kubelet/cm/devicemanager/checkpoint:all-srcs", + ], tags = ["automanaged"], visibility = ["//visibility:public"], ) diff --git a/pkg/kubelet/cm/devicemanager/checkpoint/BUILD b/pkg/kubelet/cm/devicemanager/checkpoint/BUILD new file mode 100644 index 00000000000..91506a71f7c --- /dev/null +++ b/pkg/kubelet/cm/devicemanager/checkpoint/BUILD @@ -0,0 +1,26 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["checkpoint.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/checkpointmanager/checksum:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go b/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go new file mode 100644 index 00000000000..9f8cac139cd --- /dev/null +++ b/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go @@ -0,0 +1,81 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkpoint + +import ( + "encoding/json" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" +) + +type DeviceManagerCheckpoint interface { + checkpointmanager.Checkpoint + GetData() ([]PodDevicesEntry, map[string][]string) +} + +type PodDevicesEntry struct { + PodUID string + ContainerName string + ResourceName string + DeviceIDs []string + AllocResp []byte +} + +// checkpointData struct is used to store pod to device allocation information +// in a checkpoint file. +// TODO: add version control when we need to change checkpoint format. +type checkpointData struct { + PodDeviceEntries []PodDevicesEntry + RegisteredDevices map[string][]string +} + +type Data struct { + Data checkpointData + Checksum checksum.Checksum +} + +// NewDeviceManagerCheckpoint returns an instance of Checkpoint +func New(devEntries []PodDevicesEntry, + devices map[string][]string) DeviceManagerCheckpoint { + return &Data{ + Data: checkpointData{ + PodDeviceEntries: devEntries, + RegisteredDevices: devices, + }, + } +} + +// MarshalCheckpoint returns marshalled data +func (cp *Data) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(cp.Data) + return json.Marshal(*cp) +} + +// UnmarshalCheckpoint returns unmarshalled data +func (cp *Data) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +// VerifyChecksum verifies that passed checksum is same as calculated checksum +func (cp *Data) VerifyChecksum() error { + return cp.Checksum.Verify(cp.Data) +} + +func (cp *Data) GetData() ([]PodDevicesEntry, map[string][]string) { + return cp.Data.PodDeviceEntries, cp.Data.RegisteredDevices +} diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 6c82fbe53d9..f415181dffa 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -18,7 +18,6 @@ package devicemanager import ( "context" - "encoding/json" "fmt" "net" "os" @@ -34,12 +33,13 @@ import ( "k8s.io/apimachinery/pkg/util/sets" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" - utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" "k8s.io/kubernetes/pkg/scheduler/schedulercache" - utilfs "k8s.io/kubernetes/pkg/util/filesystem" ) // ActivePodsFunc is a function that returns a list of pods to reconcile. @@ -83,9 +83,9 @@ type ManagerImpl struct { allocatedDevices map[string]sets.String // podDevices contains pod to allocated device mapping. - podDevices podDevices - store utilstore.Store - pluginOpts map[string]*pluginapi.DevicePluginOptions + podDevices podDevices + pluginOpts map[string]*pluginapi.DevicePluginOptions + checkpointManager checkpointmanager.CheckpointManager } type sourcesReadyStub struct{} @@ -122,11 +122,11 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) { // Before that, initializes them to perform no-op operations. manager.activePods = func() []*v1.Pod { return []*v1.Pod{} } manager.sourcesReady = &sourcesReadyStub{} - var err error - manager.store, err = utilstore.NewFileStore(dir, utilfs.DefaultFs{}) + checkpointManager, err := checkpointmanager.NewCheckpointManager(dir) if err != nil { - return nil, fmt.Errorf("failed to initialize device plugin checkpointing store: %+v", err) + return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err) } + manager.checkpointManager = checkpointManager return manager, nil } @@ -454,33 +454,19 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) return capacity, allocatable, deletedResources.UnsortedList() } -// checkpointData struct is used to store pod to device allocation information -// and registered device information in a checkpoint file. -// TODO: add version control when we need to change checkpoint format. -type checkpointData struct { - PodDeviceEntries []podDevicesCheckpointEntry - RegisteredDevices map[string][]string -} - // Checkpoints device to container allocation information to disk. func (m *ManagerImpl) writeCheckpoint() error { m.mutex.Lock() - data := checkpointData{ - PodDeviceEntries: m.podDevices.toCheckpointData(), - RegisteredDevices: make(map[string][]string), - } + registeredDevs := make(map[string][]string) for resource, devices := range m.healthyDevices { - data.RegisteredDevices[resource] = devices.UnsortedList() + registeredDevs[resource] = devices.UnsortedList() } + data := checkpoint.New(m.podDevices.toCheckpointData(), + registeredDevs) m.mutex.Unlock() - - dataJSON, err := json.Marshal(data) + err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data) if err != nil { - return err - } - err = m.store.Write(kubeletDeviceManagerCheckpoint, dataJSON) - if err != nil { - return fmt.Errorf("failed to write deviceplugin checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err) + return fmt.Errorf("failed to write checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err) } return nil } @@ -488,24 +474,23 @@ func (m *ManagerImpl) writeCheckpoint() error { // Reads device to container allocation information from disk, and populates // m.allocatedDevices accordingly. func (m *ManagerImpl) readCheckpoint() error { - content, err := m.store.Read(kubeletDeviceManagerCheckpoint) + registeredDevs := make(map[string][]string) + devEntries := make([]checkpoint.PodDevicesEntry, 0) + cp := checkpoint.New(devEntries, registeredDevs) + err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp) if err != nil { - if err == utilstore.ErrKeyNotFound { + if err == errors.ErrCheckpointNotFound { + glog.Warningf("Failed to retrieve checkpoint for %q: %v", kubeletDeviceManagerCheckpoint, err) return nil } - return fmt.Errorf("failed to read checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err) + return err } - glog.V(4).Infof("Read checkpoint file %s\n", kubeletDeviceManagerCheckpoint) - var data checkpointData - if err := json.Unmarshal(content, &data); err != nil { - return fmt.Errorf("failed to unmarshal deviceplugin checkpoint data: %v", err) - } - m.mutex.Lock() defer m.mutex.Unlock() - m.podDevices.fromCheckpointData(data.PodDeviceEntries) + podDevices, registeredDevs := cp.GetData() + m.podDevices.fromCheckpointData(podDevices) m.allocatedDevices = m.podDevices.devices() - for resource := range data.RegisteredDevices { + for resource := range registeredDevs { // During start up, creates empty healthyDevices list so that the resource capacity // will stay zero till the corresponding device plugin re-registers. m.healthyDevices[resource] = sets.NewString() diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index b0ef2857fd1..66b63999ae3 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -34,10 +34,9 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/lifecycle" - utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" "k8s.io/kubernetes/pkg/scheduler/schedulercache" - utilfs "k8s.io/kubernetes/pkg/util/filesystem" ) const ( @@ -347,20 +346,19 @@ func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.Cont func TestCheckpoint(t *testing.T) { resourceName1 := "domain1.com/resource1" resourceName2 := "domain2.com/resource2" - as := assert.New(t) tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) - defer os.RemoveAll(tmpDir) + ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) + as.Nil(err) testManager := &ManagerImpl{ - socketdir: tmpDir, - endpoints: make(map[string]endpoint), - healthyDevices: make(map[string]sets.String), - unhealthyDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), + endpoints: make(map[string]endpoint), + healthyDevices: make(map[string]sets.String), + unhealthyDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + podDevices: make(podDevices), + checkpointManager: ckm, } - testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{}) testManager.podDevices.insert("pod1", "con1", resourceName1, constructDevices([]string{"dev1", "dev2"}), @@ -479,21 +477,25 @@ func makePod(limits v1.ResourceList) *v1.Pod { } } -func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) *ManagerImpl { +func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) (*ManagerImpl, error) { monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} - testManager := &ManagerImpl{ - socketdir: tmpDir, - callback: monitorCallback, - healthyDevices: make(map[string]sets.String), - unhealthyDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]sets.String), - endpoints: make(map[string]endpoint), - pluginOpts: opts, - podDevices: make(podDevices), - activePods: activePods, - sourcesReady: &sourcesReadyStub{}, + ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) + if err != nil { + return nil, err + } + testManager := &ManagerImpl{ + socketdir: tmpDir, + callback: monitorCallback, + healthyDevices: make(map[string]sets.String), + unhealthyDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + endpoints: make(map[string]endpoint), + pluginOpts: opts, + podDevices: make(podDevices), + activePods: activePods, + sourcesReady: &sourcesReadyStub{}, + checkpointManager: ckm, } - testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{}) for _, res := range testRes { testManager.healthyDevices[res.resourceName] = sets.NewString() for _, dev := range res.devs { @@ -525,7 +527,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso } } } - return testManager + return testManager, nil } func getTestNodeInfo(allocatable v1.ResourceList) *schedulercache.NodeInfo { @@ -569,7 +571,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) { defer os.RemoveAll(tmpDir) nodeInfo := getTestNodeInfo(v1.ResourceList{}) pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) - testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) + testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) + as.Nil(err) testPods := []*v1.Pod{ makePod(v1.ResourceList{ @@ -664,7 +667,8 @@ func TestInitContainerDeviceAllocation(t *testing.T) { as.Nil(err) defer os.RemoveAll(tmpDir) pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) - testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) + testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) + as.Nil(err) podWithPluginResourcesInInitContainers := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -742,14 +746,18 @@ func TestSanitizeNodeAllocatable(t *testing.T) { as := assert.New(t) monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} + tmpDir, err := ioutil.TempDir("", "checkpoint") + as.Nil(err) + ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) + as.Nil(err) testManager := &ManagerImpl{ - callback: monitorCallback, - healthyDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), + callback: monitorCallback, + allDevices: make(map[string]sets.String), + healthyDevices: make(map[string]sets.String), + podDevices: make(podDevices), + checkpointManager: ckm, } - testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{}) // require one of resource1 and one of resource2 testManager.allocatedDevices[resourceName1] = sets.NewString() testManager.allocatedDevices[resourceName1].Insert(devID1) @@ -796,7 +804,8 @@ func TestDevicePreStartContainer(t *testing.T) { pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) pluginOpts[res1.resourceName] = &pluginapi.DevicePluginOptions{PreStartRequired: true} - testManager := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts) + testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts) + as.Nil(err) ch := make(chan []string, 1) testManager.endpoints[res1.resourceName] = &MockEndpoint{ diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index eb20dc0a6e8..6dcb3a51a56 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) @@ -126,18 +127,9 @@ func (pdev podDevices) devices() map[string]sets.String { return ret } -// podDevicesCheckpointEntry is used to record to device allocation information. -type podDevicesCheckpointEntry struct { - PodUID string - ContainerName string - ResourceName string - DeviceIDs []string - AllocResp []byte -} - // Turns podDevices to checkpointData. -func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry { - var data []podDevicesCheckpointEntry +func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry { + var data []checkpoint.PodDevicesEntry for podUID, containerDevices := range pdev { for conName, resources := range containerDevices { for resource, devices := range resources { @@ -152,7 +144,7 @@ func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry { glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err) continue } - data = append(data, podDevicesCheckpointEntry{podUID, conName, resource, devIds, allocResp}) + data = append(data, checkpoint.PodDevicesEntry{podUID, conName, resource, devIds, allocResp}) } } } @@ -160,7 +152,7 @@ func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry { } // Populates podDevices from the passed in checkpointData. -func (pdev podDevices) fromCheckpointData(data []podDevicesCheckpointEntry) { +func (pdev podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) { for _, entry := range data { glog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n", entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp) diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index ccdc7ece9ea..fa8ab7d6a9c 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -82,6 +82,8 @@ go_library( "//pkg/credentialprovider:go_default_library", "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/checkpointmanager/checksum:go_default_library", "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/dockershim/cm:go_default_library", @@ -100,8 +102,6 @@ go_library( "//pkg/kubelet/util/ioutils:go_default_library", "//pkg/kubelet/util/store:go_default_library", "//pkg/security/apparmor:go_default_library", - "//pkg/util/filesystem:go_default_library", - "//pkg/util/hash:go_default_library", "//pkg/util/parsers:go_default_library", "//vendor/github.com/armon/circbuf:go_default_library", "//vendor/github.com/blang/semver:go_default_library", @@ -149,6 +149,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/dockershim/libdocker:go_default_library", @@ -186,7 +187,6 @@ filegroup( "//pkg/kubelet/dockershim/metrics:all-srcs", "//pkg/kubelet/dockershim/network:all-srcs", "//pkg/kubelet/dockershim/remote:all-srcs", - "//pkg/kubelet/dockershim/testing:all-srcs", ], tags = ["automanaged"], visibility = ["//visibility:public"], diff --git a/pkg/kubelet/dockershim/convert.go b/pkg/kubelet/dockershim/convert.go index f1c9b0e852f..4f5621d24d9 100644 --- a/pkg/kubelet/dockershim/convert.go +++ b/pkg/kubelet/dockershim/convert.go @@ -164,13 +164,14 @@ func containerToRuntimeAPISandbox(c *dockertypes.Container) (*runtimeapi.PodSand }, nil } -func checkpointToRuntimeAPISandbox(id string, checkpoint *PodSandboxCheckpoint) *runtimeapi.PodSandbox { +func checkpointToRuntimeAPISandbox(id string, checkpoint DockershimCheckpoint) *runtimeapi.PodSandbox { state := runtimeapi.PodSandboxState_SANDBOX_NOTREADY + _, name, namespace, _, _ := checkpoint.GetData() return &runtimeapi.PodSandbox{ Id: id, Metadata: &runtimeapi.PodSandboxMetadata{ - Name: checkpoint.Name, - Namespace: checkpoint.Namespace, + Name: name, + Namespace: namespace, }, State: state, } diff --git a/pkg/kubelet/dockershim/docker_checkpoint.go b/pkg/kubelet/dockershim/docker_checkpoint.go index f474696995b..8bfa1a77822 100644 --- a/pkg/kubelet/dockershim/docker_checkpoint.go +++ b/pkg/kubelet/dockershim/docker_checkpoint.go @@ -18,14 +18,9 @@ package dockershim import ( "encoding/json" - "fmt" - "hash/fnv" - "path/filepath" - "github.com/golang/glog" - utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" - utilfs "k8s.io/kubernetes/pkg/util/filesystem" - hashutil "k8s.io/kubernetes/pkg/util/hash" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" ) const ( @@ -36,6 +31,11 @@ const ( schemaVersion = "v1" ) +type DockershimCheckpoint interface { + checkpointmanager.Checkpoint + GetData() (string, string, string, []*PortMapping, bool) +} + type Protocol string // PortMapping is the port mapping configurations of a sandbox. @@ -65,89 +65,31 @@ type PodSandboxCheckpoint struct { // Data to checkpoint for pod sandbox. Data *CheckpointData `json:"data,omitempty"` // Checksum is calculated with fnv hash of the checkpoint object with checksum field set to be zero - CheckSum uint64 `json:"checksum"` + Checksum checksum.Checksum `json:"checksum"` } -// CheckpointHandler provides the interface to manage PodSandbox checkpoint -type CheckpointHandler interface { - // CreateCheckpoint persists sandbox checkpoint in CheckpointStore. - CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error - // GetCheckpoint retrieves sandbox checkpoint from CheckpointStore. - GetCheckpoint(podSandboxID string) (*PodSandboxCheckpoint, error) - // RemoveCheckpoint removes sandbox checkpoint form CheckpointStore. - // WARNING: RemoveCheckpoint will not return error if checkpoint does not exist. - RemoveCheckpoint(podSandboxID string) error - // ListCheckpoint returns the list of existing checkpoints. - ListCheckpoints() ([]string, error) -} - -// PersistentCheckpointHandler is an implementation of CheckpointHandler. It persists checkpoint in CheckpointStore -type PersistentCheckpointHandler struct { - store utilstore.Store -} - -func NewPersistentCheckpointHandler(dockershimRootDir string) (CheckpointHandler, error) { - fstore, err := utilstore.NewFileStore(filepath.Join(dockershimRootDir, sandboxCheckpointDir), utilfs.DefaultFs{}) - if err != nil { - return nil, err - } - return &PersistentCheckpointHandler{store: fstore}, nil -} - -func (handler *PersistentCheckpointHandler) CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error { - checkpoint.CheckSum = calculateChecksum(*checkpoint) - blob, err := json.Marshal(checkpoint) - if err != nil { - return err - } - return handler.store.Write(podSandboxID, blob) -} - -func (handler *PersistentCheckpointHandler) GetCheckpoint(podSandboxID string) (*PodSandboxCheckpoint, error) { - blob, err := handler.store.Read(podSandboxID) - if err != nil { - return nil, err - } - var checkpoint PodSandboxCheckpoint - //TODO: unmarhsal into a struct with just Version, check version, unmarshal into versioned type. - err = json.Unmarshal(blob, &checkpoint) - if err != nil { - glog.Errorf("Failed to unmarshal checkpoint %q, removing checkpoint. Checkpoint content: %q. ErrMsg: %v", podSandboxID, string(blob), err) - handler.RemoveCheckpoint(podSandboxID) - return nil, fmt.Errorf("failed to unmarshal checkpoint") - } - if checkpoint.CheckSum != calculateChecksum(checkpoint) { - glog.Errorf("Checksum of checkpoint %q is not valid, removing checkpoint", podSandboxID) - handler.RemoveCheckpoint(podSandboxID) - return nil, fmt.Errorf("checkpoint is corrupted") - } - return &checkpoint, nil -} - -func (handler *PersistentCheckpointHandler) RemoveCheckpoint(podSandboxID string) error { - return handler.store.Delete(podSandboxID) -} - -func (handler *PersistentCheckpointHandler) ListCheckpoints() ([]string, error) { - keys, err := handler.store.List() - if err != nil { - return []string{}, fmt.Errorf("failed to list checkpoint store: %v", err) - } - return keys, nil -} - -func NewPodSandboxCheckpoint(namespace, name string) *PodSandboxCheckpoint { +func NewPodSandboxCheckpoint(namespace, name string, data *CheckpointData) DockershimCheckpoint { return &PodSandboxCheckpoint{ Version: schemaVersion, Namespace: namespace, Name: name, - Data: &CheckpointData{}, + Data: data, } } -func calculateChecksum(checkpoint PodSandboxCheckpoint) uint64 { - checkpoint.CheckSum = 0 - hash := fnv.New32a() - hashutil.DeepHashObject(hash, checkpoint) - return uint64(hash.Sum32()) +func (cp *PodSandboxCheckpoint) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(*cp.Data) + return json.Marshal(*cp) +} + +func (cp *PodSandboxCheckpoint) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +func (cp *PodSandboxCheckpoint) VerifyChecksum() error { + return cp.Checksum.Verify(*cp.Data) +} + +func (cp *PodSandboxCheckpoint) GetData() (string, string, string, []*PortMapping, bool) { + return cp.Version, cp.Name, cp.Namespace, cp.Data.PortMappings, cp.Data.HostNetwork } diff --git a/pkg/kubelet/dockershim/docker_checkpoint_test.go b/pkg/kubelet/dockershim/docker_checkpoint_test.go index c10b8f1e502..49f4e4d174a 100644 --- a/pkg/kubelet/dockershim/docker_checkpoint_test.go +++ b/pkg/kubelet/dockershim/docker_checkpoint_test.go @@ -17,86 +17,17 @@ limitations under the License. package dockershim import ( - "sort" "testing" "github.com/stretchr/testify/assert" - utilstore "k8s.io/kubernetes/pkg/kubelet/dockershim/testing" ) -func NewTestPersistentCheckpointHandler() CheckpointHandler { - return &PersistentCheckpointHandler{store: utilstore.NewMemStore()} -} - -func TestPersistentCheckpointHandler(t *testing.T) { - var err error - handler := NewTestPersistentCheckpointHandler() - port80 := int32(80) - port443 := int32(443) - proto := protocolTCP - - checkpoint1 := NewPodSandboxCheckpoint("ns1", "sandbox1") - checkpoint1.Data.PortMappings = []*PortMapping{ - { - &proto, - &port80, - &port80, - }, - { - &proto, - &port443, - &port443, - }, - } - checkpoint1.Data.HostNetwork = true - - checkpoints := []struct { - podSandboxID string - checkpoint *PodSandboxCheckpoint - expectHostNetwork bool - }{ - { - "id1", - checkpoint1, - true, - }, - { - "id2", - NewPodSandboxCheckpoint("ns2", "sandbox2"), - false, - }, - } - - for _, tc := range checkpoints { - // Test CreateCheckpoints - err = handler.CreateCheckpoint(tc.podSandboxID, tc.checkpoint) - assert.NoError(t, err) - - // Test GetCheckpoints - checkpoint, err := handler.GetCheckpoint(tc.podSandboxID) - assert.NoError(t, err) - assert.Equal(t, *checkpoint, *tc.checkpoint) - assert.Equal(t, checkpoint.Data.HostNetwork, tc.expectHostNetwork) - } - // Test ListCheckpoints - keys, err := handler.ListCheckpoints() - assert.NoError(t, err) - sort.Strings(keys) - assert.Equal(t, keys, []string{"id1", "id2"}) - - // Test RemoveCheckpoints - err = handler.RemoveCheckpoint("id1") - assert.NoError(t, err) - // Test Remove Nonexisted Checkpoints - err = handler.RemoveCheckpoint("id1") - assert.NoError(t, err) - - // Test ListCheckpoints - keys, err = handler.ListCheckpoints() - assert.NoError(t, err) - assert.Equal(t, keys, []string{"id2"}) - - // Test Get NonExisted Checkpoint - _, err = handler.GetCheckpoint("id1") - assert.Error(t, err) +func TestPodSandboxCheckpoint(t *testing.T) { + data := &CheckpointData{HostNetwork: true} + checkpoint := NewPodSandboxCheckpoint("ns1", "sandbox1", data) + version, name, namespace, _, hostNetwork := checkpoint.GetData() + assert.Equal(t, schemaVersion, version) + assert.Equal(t, "ns1", namespace) + assert.Equal(t, "sandbox1", name) + assert.Equal(t, true, hostNetwork) } diff --git a/pkg/kubelet/dockershim/docker_sandbox.go b/pkg/kubelet/dockershim/docker_sandbox.go index 3e71d5506b4..a9bd36789b4 100644 --- a/pkg/kubelet/dockershim/docker_sandbox.go +++ b/pkg/kubelet/dockershim/docker_sandbox.go @@ -30,6 +30,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" "k8s.io/kubernetes/pkg/kubelet/qos" @@ -118,7 +119,7 @@ func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPod }(&err) // Step 3: Create Sandbox Checkpoint. - if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { + if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { return nil, err } @@ -189,7 +190,8 @@ func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopP name = metadata.Name hostNetwork = (networkNamespaceMode(inspectResult) == runtimeapi.NamespaceMode_NODE) } else { - checkpoint, checkpointErr := ds.checkpointHandler.GetCheckpoint(podSandboxID) + checkpoint := NewPodSandboxCheckpoint("", "", &CheckpointData{}) + checkpointErr := ds.checkpointManager.GetCheckpoint(podSandboxID, checkpoint) // Proceed if both sandbox container and checkpoint could not be found. This means that following // actions will only have sandbox ID and not have pod namespace and name information. @@ -204,9 +206,7 @@ func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopP fmt.Errorf("failed to get sandbox status: %v", statusErr)}) } } else { - namespace = checkpoint.Namespace - name = checkpoint.Name - hostNetwork = checkpoint.Data != nil && checkpoint.Data.HostNetwork + _, name, namespace, _, hostNetwork = checkpoint.GetData() } } @@ -237,7 +237,7 @@ func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopP errList = append(errList, err) } else { // remove the checkpoint for any sandbox that is not found in the runtime - ds.checkpointHandler.RemoveCheckpoint(podSandboxID) + ds.checkpointManager.RemoveCheckpoint(podSandboxID) } } @@ -284,7 +284,7 @@ func (ds *dockerService) RemovePodSandbox(ctx context.Context, r *runtimeapi.Rem } // Remove the checkpoint of the sandbox. - if err := ds.checkpointHandler.RemoveCheckpoint(podSandboxID); err != nil { + if err := ds.checkpointManager.RemoveCheckpoint(podSandboxID); err != nil { errs = append(errs, err) } if len(errs) == 0 { @@ -465,7 +465,7 @@ func (ds *dockerService) ListPodSandbox(_ context.Context, r *runtimeapi.ListPod var err error checkpoints := []string{} if filter == nil { - checkpoints, err = ds.checkpointHandler.ListCheckpoints() + checkpoints, err = ds.checkpointManager.ListCheckpoints() if err != nil { glog.Errorf("Failed to list checkpoints: %v", err) } @@ -501,7 +501,8 @@ func (ds *dockerService) ListPodSandbox(_ context.Context, r *runtimeapi.ListPod if _, ok := sandboxIDs[id]; ok { continue } - checkpoint, err := ds.checkpointHandler.GetCheckpoint(id) + checkpoint := NewPodSandboxCheckpoint("", "", &CheckpointData{}) + err := ds.checkpointManager.GetCheckpoint(id, checkpoint) if err != nil { glog.Errorf("Failed to retrieve checkpoint for sandbox %q: %v", id, err) continue @@ -624,20 +625,20 @@ func ipcNamespaceMode(container *dockertypes.ContainerJSON) runtimeapi.Namespace return runtimeapi.NamespaceMode_POD } -func constructPodSandboxCheckpoint(config *runtimeapi.PodSandboxConfig) *PodSandboxCheckpoint { - checkpoint := NewPodSandboxCheckpoint(config.Metadata.Namespace, config.Metadata.Name) +func constructPodSandboxCheckpoint(config *runtimeapi.PodSandboxConfig) checkpointmanager.Checkpoint { + data := CheckpointData{} for _, pm := range config.GetPortMappings() { proto := toCheckpointProtocol(pm.Protocol) - checkpoint.Data.PortMappings = append(checkpoint.Data.PortMappings, &PortMapping{ + data.PortMappings = append(data.PortMappings, &PortMapping{ HostPort: &pm.HostPort, ContainerPort: &pm.ContainerPort, Protocol: &proto, }) } if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE { - checkpoint.Data.HostNetwork = true + data.HostNetwork = true } - return checkpoint + return NewPodSandboxCheckpoint(config.Metadata.Namespace, config.Metadata.Name, &data) } func toCheckpointProtocol(protocol runtimeapi.Protocol) Protocol { diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index fe8e84d1240..1b6f886d94a 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "path/filepath" "sync" "time" @@ -30,6 +31,7 @@ import ( "k8s.io/api/core/v1" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" kubecm "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockershim/cm" @@ -191,7 +193,8 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon client := NewDockerClientFromConfig(config) c := libdocker.NewInstrumentedInterface(client) - checkpointHandler, err := NewPersistentCheckpointHandler(dockershimRootDir) + + checkpointManager, err := checkpointmanager.NewCheckpointManager(filepath.Join(dockershimRootDir, sandboxCheckpointDir)) if err != nil { return nil, err } @@ -205,7 +208,7 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon execHandler: &NativeExecHandler{}, }, containerManager: cm.NewContainerManager(cgroupsName, client), - checkpointHandler: checkpointHandler, + checkpointManager: checkpointManager, disableSharedPID: disableSharedPID, networkReady: make(map[string]bool), } @@ -293,7 +296,7 @@ type dockerService struct { containerManager cm.ContainerManager // cgroup driver used by Docker runtime. cgroupDriver string - checkpointHandler CheckpointHandler + checkpointManager checkpointmanager.CheckpointManager // caches the version of the runtime. // To be compatible with multiple docker versions, we need to perform // version checking for some operations. Use this cache to avoid querying @@ -365,7 +368,8 @@ func (ds *dockerService) GetNetNS(podSandboxID string) (string, error) { // GetPodPortMappings returns the port mappings of the given podSandbox ID. func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.PortMapping, error) { // TODO: get portmappings from docker labels for backward compatibility - checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID) + checkpoint := NewPodSandboxCheckpoint("", "", &CheckpointData{}) + err := ds.checkpointManager.GetCheckpoint(podSandboxID, checkpoint) // Return empty portMappings if checkpoint is not found if err != nil { if err == utilstore.ErrKeyNotFound { @@ -373,9 +377,9 @@ func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.Po } return nil, err } - - portMappings := make([]*hostport.PortMapping, 0, len(checkpoint.Data.PortMappings)) - for _, pm := range checkpoint.Data.PortMappings { + _, _, _, checkpointedPortMappings, _ := checkpoint.GetData() + portMappings := make([]*hostport.PortMapping, 0, len(checkpointedPortMappings)) + for _, pm := range checkpointedPortMappings { proto := toAPIProtocol(*pm.Protocol) portMappings = append(portMappings, &hostport.PortMapping{ HostPort: *pm.HostPort, diff --git a/pkg/kubelet/dockershim/docker_service_test.go b/pkg/kubelet/dockershim/docker_service_test.go index 0f9724e224d..2b223389205 100644 --- a/pkg/kubelet/dockershim/docker_service_test.go +++ b/pkg/kubelet/dockershim/docker_service_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" "k8s.io/kubernetes/pkg/kubelet/dockershim/network" @@ -43,15 +44,50 @@ func newTestNetworkPlugin(t *testing.T) *nettest.MockNetworkPlugin { return nettest.NewMockNetworkPlugin(ctrl) } +type mockCheckpointManager struct { + checkpoint map[string]*PodSandboxCheckpoint +} + +func (ckm *mockCheckpointManager) CreateCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error { + ckm.checkpoint[checkpointKey] = checkpoint.(*PodSandboxCheckpoint) + return nil +} + +func (ckm *mockCheckpointManager) GetCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error { + *(checkpoint.(*PodSandboxCheckpoint)) = *(ckm.checkpoint[checkpointKey]) + return nil +} + +func (ckm *mockCheckpointManager) RemoveCheckpoint(checkpointKey string) error { + _, ok := ckm.checkpoint[checkpointKey] + if ok { + delete(ckm.checkpoint, "moo") + } + return nil +} + +func (ckm *mockCheckpointManager) ListCheckpoints() ([]string, error) { + var keys []string + for key, _ := range ckm.checkpoint { + keys = append(keys, key) + } + return keys, nil +} + +func newMockCheckpointManager() checkpointmanager.CheckpointManager { + return &mockCheckpointManager{checkpoint: make(map[string]*PodSandboxCheckpoint)} +} + func newTestDockerService() (*dockerService, *libdocker.FakeDockerClient, *clock.FakeClock) { fakeClock := clock.NewFakeClock(time.Time{}) c := libdocker.NewFakeDockerClient().WithClock(fakeClock).WithVersion("1.11.2", "1.23").WithRandSource(rand.NewSource(0)) pm := network.NewPluginManager(&network.NoopNetworkPlugin{}) + ckm := newMockCheckpointManager() return &dockerService{ client: c, os: &containertest.FakeOS{}, network: pm, - checkpointHandler: NewTestPersistentCheckpointHandler(), + checkpointManager: ckm, networkReady: make(map[string]bool), }, c, fakeClock }