From fc37a7a990da6f17850a1352e66cfdcce64a3ff1 Mon Sep 17 00:00:00 2001 From: Adrian Reber Date: Fri, 10 Sep 2021 12:38:08 +0000 Subject: [PATCH] kubelet: wire checkpoint container support through This adds the last pieces to wire through the container checkpoint support in the kubelet. Signed-off-by: Adrian Reber --- pkg/features/kube_features.go | 9 ++ pkg/kubelet/cri/remote/fake/fake_runtime.go | 10 ++ pkg/kubelet/kubelet.go | 40 ++++++ pkg/kubelet/kubelet_test.go | 116 ++++++++++++++++++ pkg/kubelet/server/auth_test.go | 1 + pkg/kubelet/server/server.go | 90 ++++++++++++++ pkg/kubelet/server/server_test.go | 89 +++++++++++++- .../src/k8s.io/cri-api/pkg/apis/services.go | 4 +- .../pkg/apis/testing/fake_runtime_service.go | 14 +++ 9 files changed, 369 insertions(+), 4 deletions(-) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 3472371ecd8..80e9bf42321 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -190,6 +190,13 @@ const ( // Allows clients to request a duration for certificates issued via the Kubernetes CSR API. CSRDuration featuregate.Feature = "CSRDuration" + // owner: @adrianreber + // kep: http://kep.k8s.io/2008 + // alpha: v1.25 + // + // Enables container Checkpoint support in the kubelet + ContainerCheckpoint featuregate.Feature = "ContainerCheckpoint" + // owner: @jiahuif // alpha: v1.21 // beta: v1.22 @@ -846,6 +853,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS CSRDuration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26 + ContainerCheckpoint: {Default: false, PreRelease: featuregate.Alpha}, + ControllerManagerLeaderMigration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26 CronJobTimeZone: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/pkg/kubelet/cri/remote/fake/fake_runtime.go b/pkg/kubelet/cri/remote/fake/fake_runtime.go index 54b31fe2a8f..f217f0592d1 100644 --- a/pkg/kubelet/cri/remote/fake/fake_runtime.go +++ b/pkg/kubelet/cri/remote/fake/fake_runtime.go @@ -322,3 +322,13 @@ func (f *RemoteRuntime) ReopenContainerLog(ctx context.Context, req *kubeapi.Reo return &kubeapi.ReopenContainerLogResponse{}, nil } + +// CheckpointContainer checkpoints the given container. +func (f *RemoteRuntime) CheckpointContainer(ctx context.Context, req *kubeapi.CheckpointContainerRequest) (*kubeapi.CheckpointContainerResponse, error) { + err := f.RuntimeService.CheckpointContainer(&kubeapi.CheckpointContainerRequest{}) + if err != nil { + return nil, err + } + + return &kubeapi.CheckpointContainerResponse{}, nil +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5cb582dcedb..4caf8f73ce5 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -25,6 +25,7 @@ import ( "net/http" "os" "path" + "path/filepath" sysruntime "runtime" "sort" "strings" @@ -60,6 +61,7 @@ import ( cloudprovider "k8s.io/cloud-provider" "k8s.io/component-helpers/apimachinery/lease" internalapi "k8s.io/cri-api/pkg/apis" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" @@ -2445,6 +2447,44 @@ func (kl *Kubelet) fastStatusUpdateOnce() { } } +// CheckpointContainer tries to checkpoint a container. The parameters are used to +// look up the specified container. If the container specified by the given parameters +// cannot be found an error is returned. If the container is found the container +// engine will be asked to checkpoint the given container into the kubelet's default +// checkpoint directory. +func (kl *Kubelet) CheckpointContainer( + podUID types.UID, + podFullName, + containerName string, + options *runtimeapi.CheckpointContainerRequest, +) error { + container, err := kl.findContainer(podFullName, podUID, containerName) + if err != nil { + return err + } + if container == nil { + return fmt.Errorf("container %v not found", containerName) + } + + options.Location = filepath.Join( + kl.getCheckpointsDir(), + fmt.Sprintf( + "checkpoint-%s-%s-%s.tar", + podFullName, + containerName, + time.Now().Format(time.RFC3339), + ), + ) + + options.ContainerId = string(container.ID.ID) + + if err := kl.containerRuntime.CheckpointContainer(options); err != nil { + return err + } + + return nil +} + // isSyncPodWorthy filters out events that are not worthy of pod syncing func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool { // ContainerRemoved doesn't affect pod state diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 32aa2facc25..9995f1e04cc 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -21,10 +21,12 @@ import ( "fmt" "io/ioutil" "os" + "path/filepath" "reflect" goruntime "runtime" "sort" "strconv" + "strings" "testing" "time" @@ -44,6 +46,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2/ktesting" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" @@ -1583,6 +1586,119 @@ func TestFilterOutInactivePods(t *testing.T) { assert.Equal(t, expected, actual) } +func TestCheckpointContainer(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + + fakeRuntime := testKubelet.fakeRuntime + containerID := kubecontainer.ContainerID{ + Type: "test", + ID: "abc1234", + } + + fakePod := &containertest.FakePod{ + Pod: &kubecontainer.Pod{ + ID: "12345678", + Name: "podFoo", + Namespace: "nsFoo", + Containers: []*kubecontainer.Container{ + { + Name: "containerFoo", + ID: containerID, + }, + }, + }, + } + + fakeRuntime.PodList = []*containertest.FakePod{fakePod} + wrongContainerName := "wrongContainerName" + + tests := []struct { + name string + containerName string + checkpointLocation string + expectedStatus error + expectedLocation string + }{ + { + name: "Checkpoint with wrong container name", + containerName: wrongContainerName, + checkpointLocation: "", + expectedStatus: fmt.Errorf("container %s not found", wrongContainerName), + expectedLocation: "", + }, + { + name: "Checkpoint with default checkpoint location", + containerName: fakePod.Pod.Containers[0].Name, + checkpointLocation: "", + expectedStatus: nil, + expectedLocation: filepath.Join( + kubelet.getCheckpointsDir(), + fmt.Sprintf( + "checkpoint-%s_%s-%s", + fakePod.Pod.Name, + fakePod.Pod.Namespace, + fakePod.Pod.Containers[0].Name, + ), + ), + }, + { + name: "Checkpoint with ignored location", + containerName: fakePod.Pod.Containers[0].Name, + checkpointLocation: "somethingThatWillBeIgnored", + expectedStatus: nil, + expectedLocation: filepath.Join( + kubelet.getCheckpointsDir(), + fmt.Sprintf( + "checkpoint-%s_%s-%s", + fakePod.Pod.Name, + fakePod.Pod.Namespace, + fakePod.Pod.Containers[0].Name, + ), + ), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + options := &runtimeapi.CheckpointContainerRequest{} + if test.checkpointLocation != "" { + options.Location = test.checkpointLocation + } + status := kubelet.CheckpointContainer( + fakePod.Pod.ID, + fmt.Sprintf( + "%s_%s", + fakePod.Pod.Name, + fakePod.Pod.Namespace, + ), + test.containerName, + options, + ) + require.Equal(t, status, test.expectedStatus) + + if status != nil { + return + } + + require.True( + t, + strings.HasPrefix( + options.Location, + test.expectedLocation, + ), + ) + require.Equal( + t, + options.ContainerId, + containerID.ID, + ) + + }) + } +} + func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() diff --git a/pkg/kubelet/server/auth_test.go b/pkg/kubelet/server/auth_test.go index dc827ffd105..d5f2b6928f4 100644 --- a/pkg/kubelet/server/auth_test.go +++ b/pkg/kubelet/server/auth_test.go @@ -110,6 +110,7 @@ func AuthzTestCases() []AuthzTestCase { testPaths := map[string]string{ "/attach/{podNamespace}/{podID}/{containerName}": "proxy", "/attach/{podNamespace}/{podID}/{uid}/{containerName}": "proxy", + "/checkpoint/{podNamespace}/{podID}/{containerName}": "proxy", "/configz": "proxy", "/containerLogs/{podNamespace}/{podID}/{containerName}": "proxy", "/debug/flags/v": "proxy", diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 42a74475475..4c31bb6a4ac 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -62,6 +62,7 @@ import ( "k8s.io/component-base/logs" compbasemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" podresourcesapiv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1" "k8s.io/kubernetes/pkg/api/legacyscheme" @@ -224,6 +225,7 @@ type HostInterface interface { GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) GetRunningPods() ([]*v1.Pod, error) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) + CheckpointContainer(podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error ServeLogs(w http.ResponseWriter, req *http.Request) ResyncInterval() time.Duration @@ -403,6 +405,17 @@ func (s *Server) InstallDefaultHandlers() { s.restfulCont.Handle(proberMetricsPath, compbasemetrics.HandlerFor(p, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}), ) + + // Only enable checkpoint API if the feature is enabled + if utilfeature.DefaultFeatureGate.Enabled(features.ContainerCheckpoint) { + s.addMetricsBucketMatcher("checkpoint") + ws = &restful.WebService{} + ws.Path("/checkpoint").Produces(restful.MIME_JSON) + ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}"). + To(s.checkpoint). + Operation("checkpoint")) + s.restfulCont.Add(ws) + } } // InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers @@ -878,6 +891,83 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp proxyStream(response.ResponseWriter, request.Request, url) } +// checkpoint handles the checkpoint API request. It checks if the requested +// podNamespace, pod and container actually exist and only then calls out +// to the runtime to actually checkpoint the container. +func (s *Server) checkpoint(request *restful.Request, response *restful.Response) { + pod, ok := s.host.GetPodByName(request.PathParameter("podNamespace"), request.PathParameter("podID")) + if !ok { + response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) + return + } + + containerName := request.PathParameter("containerName") + + found := false + for _, container := range pod.Spec.Containers { + if container.Name == containerName { + found = true + } + } + if !found { + for _, container := range pod.Spec.InitContainers { + if container.Name == containerName { + found = true + } + } + } + if !found && utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) { + for _, container := range pod.Spec.EphemeralContainers { + if container.Name == containerName { + found = true + } + } + } + if !found { + response.WriteError( + http.StatusNotFound, + fmt.Errorf("container %v does not exist", containerName), + ) + return + } + + options := &runtimeapi.CheckpointContainerRequest{} + // Query parameter to select an optional timeout. Without the timeout parameter + // the checkpoint command will use the default CRI timeout. + timeouts := request.Request.URL.Query()["timeout"] + if len(timeouts) > 0 { + // If the user specified one or multiple values for timeouts we + // are using the last available value. + timeout, err := strconv.ParseInt(timeouts[len(timeouts)-1], 10, 64) + if err != nil { + response.WriteError( + http.StatusNotFound, + fmt.Errorf("cannot parse value of timeout parameter"), + ) + return + } + options.Timeout = timeout + } + + if err := s.host.CheckpointContainer(pod.UID, kubecontainer.GetPodFullName(pod), containerName, options); err != nil { + response.WriteError( + http.StatusInternalServerError, + fmt.Errorf( + "checkpointing of %v/%v/%v failed (%v)", + request.PathParameter("podNamespace"), + request.PathParameter("podID"), + containerName, + err, + ), + ) + return + } + writeJSONResponse( + response, + []byte(fmt.Sprintf("{\"items\":[\"%s\"]}", options.Location)), + ) +} + // getURLRootPath trims a URL path. // For paths in the format of "/metrics/xxx", "metrics/xxx" is returned; // For all other paths, the first part of the path is returned. diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 5df58ad988a..521716aa59c 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -53,7 +53,10 @@ import ( "k8s.io/utils/pointer" // Do some initialization to decode the query parameters correctly. + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" _ "k8s.io/kubernetes/pkg/apis/core/install" + "k8s.io/kubernetes/pkg/features" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cri/streaming" @@ -146,6 +149,13 @@ func (fk *fakeKubelet) RunInContainer(podFullName string, uid types.UID, contain return fk.runFunc(podFullName, uid, containerName, cmd) } +func (fk *fakeKubelet) CheckpointContainer(podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error { + if containerName == "checkpointingFailure" { + return fmt.Errorf("Returning error for test") + } + return nil +} + type fakeRuntime struct { execFunc func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error attachFunc func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error @@ -348,7 +358,8 @@ func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletCo fw.fakeKubelet, stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}), fw.fakeAuth, - kubeCfg) + kubeCfg, + ) fw.serverUnderTest = &server fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest) return fw @@ -459,7 +470,6 @@ func TestServeRunInContainerWithUID(t *testing.T) { } resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+testUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil) - if err != nil { t.Fatalf("Got error POSTing: %v", err) } @@ -546,6 +556,9 @@ func TestAuthzCoverage(t *testing.T) { } func TestAuthFilters(t *testing.T) { + // Enable features.ContainerCheckpoint during test + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ContainerCheckpoint, true)() + fw := newServerTest() defer fw.testHTTPServer.Close() @@ -840,6 +853,78 @@ func TestContainerLogsWithInvalidTail(t *testing.T) { } } +func TestCheckpointContainer(t *testing.T) { + // Enable features.ContainerCheckpoint during test + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ContainerCheckpoint, true)() + + fw := newServerTest() + defer fw.testHTTPServer.Close() + podNamespace := "other" + podName := "foo" + expectedContainerName := "baz" + // GetPodByName() should always fail + fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) { + return nil, false + } + t.Run("wrong pod namespace", func(t *testing.T) { + resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/"+expectedContainerName, "", nil) + if err != nil { + t.Errorf("Got error POSTing: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNotFound { + t.Errorf("Unexpected non-error checkpointing container: %#v", resp) + } + }) + // let GetPodByName() return a result, but our container "wrongContainerName" is not part of the Pod + setPodByNameFunc(fw, podNamespace, podName, expectedContainerName) + t.Run("wrong container name", func(t *testing.T) { + resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/wrongContainerName", "", nil) + if err != nil { + t.Errorf("Got error POSTing: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNotFound { + t.Errorf("Unexpected non-error checkpointing container: %#v", resp) + } + }) + // Now the checkpointing of the container fails + fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: podNamespace, + Name: podName, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "checkpointingFailure", + }, + }, + }, + }, true + } + t.Run("checkpointing fails", func(t *testing.T) { + resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/checkpointingFailure", "", nil) + if err != nil { + t.Errorf("Got error POSTing: %v", err) + } + defer resp.Body.Close() + assert.Equal(t, resp.StatusCode, 500) + body, _ := ioutil.ReadAll(resp.Body) + assert.Equal(t, string(body), "checkpointing of other/foo/checkpointingFailure failed (Returning error for test)") + }) + // Now test a successful checkpoint succeeds + setPodByNameFunc(fw, podNamespace, podName, expectedContainerName) + t.Run("checkpointing succeeds", func(t *testing.T) { + resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/"+expectedContainerName, "", nil) + if err != nil { + t.Errorf("Got error POSTing: %v", err) + } + assert.Equal(t, resp.StatusCode, 200) + }) +} + func makeReq(t *testing.T, method, url, clientProtocol string) *http.Request { req, err := http.NewRequest(method, url, nil) if err != nil { diff --git a/staging/src/k8s.io/cri-api/pkg/apis/services.go b/staging/src/k8s.io/cri-api/pkg/apis/services.go index ac7cc59fbf4..4a3efd75e78 100644 --- a/staging/src/k8s.io/cri-api/pkg/apis/services.go +++ b/staging/src/k8s.io/cri-api/pkg/apis/services.go @@ -56,6 +56,8 @@ type ContainerManager interface { // for the container. If it returns error, new container log file MUST NOT // be created. ReopenContainerLog(ContainerID string) error + // CheckpointContainer checkpoints a container + CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error } // PodSandboxManager contains methods for operating on PodSandboxes. The methods @@ -76,8 +78,6 @@ type PodSandboxManager interface { ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. PortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) - // CheckpointContainer checkpoints a container - CheckpointContainer(containerID, checkpointDir string) error } // ContainerStatsManager contains methods for retrieving the container diff --git a/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go b/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go index 8e9c3a2ce61..fd82c79d8a4 100644 --- a/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go +++ b/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go @@ -694,3 +694,17 @@ func (r *FakeRuntimeService) ReopenContainerLog(containerID string) error { return nil } + +// CheckpointContainer emulates call to checkpoint a container in the FakeRuntimeService. +func (r *FakeRuntimeService) CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error { + r.Lock() + defer r.Unlock() + + r.Called = append(r.Called, "CheckpointContainer") + + if err := r.popError("CheckpointContainer"); err != nil { + return err + } + + return nil +}