CRI: implement Controller.Stop for SandboxAPI
Signed-off-by: WangLei <wllenyj@linux.alibaba.com>
This commit is contained in:
		| @@ -41,8 +41,8 @@ import ( | ||||
| type CRIService interface { | ||||
| 	EnsureImageExists(ctx context.Context, ref string, config *runtime.PodSandboxConfig) (*imagestore.Image, error) | ||||
|  | ||||
| 	// TODO: we should implement Controller.Wait and use it instead of this to monitor sandbox exit. | ||||
| 	StartSandboxExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} | ||||
| 	// TODO: we should implement Event backoff in Controller. | ||||
| 	BackOffEvent(id string, event interface{}) | ||||
| } | ||||
|  | ||||
| type Controller struct { | ||||
| @@ -83,11 +83,6 @@ func New( | ||||
|  | ||||
| var _ sandbox.Controller = (*Controller)(nil) | ||||
|  | ||||
| func (c *Controller) Stop(ctx context.Context, sandboxID string) (*api.ControllerStopResponse, error) { | ||||
| 	//TODO implement me | ||||
| 	panic("implement me") | ||||
| } | ||||
|  | ||||
| func (c *Controller) Delete(ctx context.Context, sandboxID string) (*api.ControllerDeleteResponse, error) { | ||||
| 	//TODO implement me | ||||
| 	panic("implement me") | ||||
|   | ||||
							
								
								
									
										138
									
								
								pkg/cri/sbserver/podsandbox/sandbox_stop.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										138
									
								
								pkg/cri/sbserver/podsandbox/sandbox_stop.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,138 @@ | ||||
| /* | ||||
|    Copyright The containerd Authors. | ||||
|  | ||||
|    Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|    you may not use this file except in compliance with the License. | ||||
|    You may obtain a copy of the License at | ||||
|  | ||||
|        http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
|    Unless required by applicable law or agreed to in writing, software | ||||
|    distributed under the License is distributed on an "AS IS" BASIS, | ||||
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|    See the License for the specific language governing permissions and | ||||
|    limitations under the License. | ||||
| */ | ||||
|  | ||||
| package podsandbox | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"syscall" | ||||
|  | ||||
| 	eventtypes "github.com/containerd/containerd/api/events" | ||||
| 	api "github.com/containerd/containerd/api/services/sandbox/v1" | ||||
| 	"github.com/containerd/containerd/errdefs" | ||||
| 	sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" | ||||
| 	ctrdutil "github.com/containerd/containerd/pkg/cri/util" | ||||
| 	"github.com/containerd/containerd/protobuf" | ||||
| 	"github.com/sirupsen/logrus" | ||||
| ) | ||||
|  | ||||
| func (c *Controller) Stop(ctx context.Context, sandboxID string) (*api.ControllerStopResponse, error) { | ||||
| 	sandbox, err := c.sandboxStore.Get(sandboxID) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("an error occurred when try to find sandbox %q: %w", | ||||
| 			sandboxID, err) | ||||
| 	} | ||||
|  | ||||
| 	if err := c.cleanupSandboxFiles(sandboxID, sandbox.Config); err != nil { | ||||
| 		return nil, fmt.Errorf("failed to cleanup sandbox files: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	// TODO: The Controller maintains its own Status instead of CRI's sandboxStore. | ||||
| 	// Only stop sandbox container when it's running or unknown. | ||||
| 	state := sandbox.Status.Get().State | ||||
| 	if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown { | ||||
| 		if err := c.stopSandboxContainer(ctx, sandbox); err != nil { | ||||
| 			return nil, fmt.Errorf("failed to stop sandbox container %q in %q state: %w", sandboxID, state, err) | ||||
| 		} | ||||
| 	} | ||||
| 	return &api.ControllerStopResponse{}, nil | ||||
| } | ||||
|  | ||||
| // stopSandboxContainer kills the sandbox container. | ||||
| // `task.Delete` is not called here because it will be called when | ||||
| // the event monitor handles the `TaskExit` event. | ||||
| func (c *Controller) stopSandboxContainer(ctx context.Context, sandbox sandboxstore.Sandbox) error { | ||||
| 	id := sandbox.ID | ||||
| 	container := sandbox.Container | ||||
| 	state := sandbox.Status.Get().State | ||||
| 	task, err := container.Task(ctx, nil) | ||||
| 	if err != nil { | ||||
| 		if !errdefs.IsNotFound(err) { | ||||
| 			return fmt.Errorf("failed to get sandbox container: %w", err) | ||||
| 		} | ||||
| 		// Don't return for unknown state, some cleanup needs to be done. | ||||
| 		if state == sandboxstore.StateUnknown { | ||||
| 			return cleanupUnknownSandbox(ctx, id, sandbox) | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// Handle unknown state. | ||||
| 	// The cleanup logic is the same with container unknown state. | ||||
| 	if state == sandboxstore.StateUnknown { | ||||
| 		// Start an exit handler for containers in unknown state. | ||||
| 		waitCtx, waitCancel := context.WithCancel(ctrdutil.NamespacedContext()) | ||||
| 		defer waitCancel() | ||||
| 		exitCh, err := task.Wait(waitCtx) | ||||
| 		if err != nil { | ||||
| 			if !errdefs.IsNotFound(err) { | ||||
| 				return fmt.Errorf("failed to wait for task: %w", err) | ||||
| 			} | ||||
| 			return cleanupUnknownSandbox(ctx, id, sandbox) | ||||
| 		} | ||||
|  | ||||
| 		exitCtx, exitCancel := context.WithCancel(context.Background()) | ||||
| 		stopCh := make(chan struct{}) | ||||
| 		go func() { | ||||
| 			defer close(stopCh) | ||||
| 			exitStatus, exitedAt, err := c.waitSandboxExit(exitCtx, id, exitCh) | ||||
| 			if err != nil && err != context.Canceled && err != context.DeadlineExceeded { | ||||
| 				e := &eventtypes.TaskExit{ | ||||
| 					ContainerID: id, | ||||
| 					ID:          id, | ||||
| 					Pid:         task.Pid(), | ||||
| 					ExitStatus:  exitStatus, | ||||
| 					ExitedAt:    protobuf.ToTimestamp(exitedAt), | ||||
| 				} | ||||
| 				logrus.WithError(err).Errorf("Failed to wait sandbox exit %+v", e) | ||||
| 				// TODO: how to backoff | ||||
| 				c.cri.BackOffEvent(id, e) | ||||
| 			} | ||||
| 		}() | ||||
| 		defer func() { | ||||
| 			exitCancel() | ||||
| 			// This ensures that exit monitor is stopped before | ||||
| 			// `Wait` is cancelled, so no exit event is generated | ||||
| 			// because of the `Wait` cancellation. | ||||
| 			<-stopCh | ||||
| 		}() | ||||
| 	} | ||||
|  | ||||
| 	// Kill the sandbox container. | ||||
| 	if err = task.Kill(ctx, syscall.SIGKILL); err != nil && !errdefs.IsNotFound(err) { | ||||
| 		return fmt.Errorf("failed to kill sandbox container: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	return c.waitSandboxStop(ctx, sandbox) | ||||
| } | ||||
|  | ||||
| // waitSandboxStop waits for sandbox to be stopped until context is cancelled or | ||||
| // the context deadline is exceeded. | ||||
| func (c *Controller) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox) error { | ||||
| 	select { | ||||
| 	case <-ctx.Done(): | ||||
| 		return fmt.Errorf("wait sandbox container %q: %w", sandbox.ID, ctx.Err()) | ||||
| 	case <-sandbox.Stopped(): | ||||
| 		return nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // cleanupUnknownSandbox cleanup stopped sandbox in unknown state. | ||||
| func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error { | ||||
| 	// Reuse handleSandboxExit to do the cleanup. | ||||
| 	return handleSandboxExit(ctx, sandbox) | ||||
| } | ||||
| @@ -17,31 +17,10 @@ | ||||
| package sbserver | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"os" | ||||
|  | ||||
| 	"github.com/containerd/containerd" | ||||
| 	"github.com/containerd/containerd/plugin" | ||||
| 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | ||||
|  | ||||
| 	osinterface "github.com/containerd/containerd/pkg/os" | ||||
| ) | ||||
|  | ||||
| // cleanupSandboxFiles unmount some sandbox files, we rely on the removal of sandbox root directory to | ||||
| // remove these files. Unmount should *NOT* return error if the mount point is already unmounted. | ||||
| func (c *criService) cleanupSandboxFiles(id string, config *runtime.PodSandboxConfig) error { | ||||
| 	if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetIpc() != runtime.NamespaceMode_NODE { | ||||
| 		path, err := c.os.FollowSymlinkInScope(c.getSandboxDevShm(id), "/") | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("failed to follow symlink: %w", err) | ||||
| 		} | ||||
| 		if err := c.os.(osinterface.UNIX).Unmount(path); err != nil && !os.IsNotExist(err) { | ||||
| 			return fmt.Errorf("failed to unmount %q: %w", path, err) | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // taskOpts generates task options for a (sandbox) container. | ||||
| func (c *criService) taskOpts(runtimeType string) []containerd.NewTaskOpts { | ||||
| 	// TODO(random-liu): Remove this after shim v1 is deprecated. | ||||
|   | ||||
| @@ -21,15 +21,8 @@ package sbserver | ||||
|  | ||||
| import ( | ||||
| 	"github.com/containerd/containerd" | ||||
| 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | ||||
| ) | ||||
|  | ||||
| // cleanupSandboxFiles unmount some sandbox files, we rely on the removal of sandbox root directory to | ||||
| // remove these files. Unmount should *NOT* return error if the mount point is already unmounted. | ||||
| func (c *criService) cleanupSandboxFiles(id string, config *runtime.PodSandboxConfig) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // taskOpts generates task options for a (sandbox) container. | ||||
| func (c *criService) taskOpts(runtimeType string) []containerd.NewTaskOpts { | ||||
| 	return []containerd.NewTaskOpts{} | ||||
|   | ||||
| @@ -18,14 +18,8 @@ package sbserver | ||||
|  | ||||
| import ( | ||||
| 	"github.com/containerd/containerd" | ||||
| 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | ||||
| ) | ||||
|  | ||||
| // No sandbox files needed for windows. | ||||
| func (c *criService) cleanupSandboxFiles(id string, config *runtime.PodSandboxConfig) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // No task options needed for windows. | ||||
| func (c *criService) taskOpts(runtimeType string) []containerd.NewTaskOpts { | ||||
| 	return nil | ||||
|   | ||||
| @@ -20,17 +20,12 @@ import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"syscall" | ||||
| 	"time" | ||||
|  | ||||
| 	eventtypes "github.com/containerd/containerd/api/events" | ||||
| 	"github.com/containerd/containerd/errdefs" | ||||
| 	"github.com/containerd/containerd/log" | ||||
| 	"github.com/containerd/containerd/protobuf" | ||||
| 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | ||||
|  | ||||
| 	sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" | ||||
| 	ctrdutil "github.com/containerd/containerd/pkg/cri/util" | ||||
| ) | ||||
|  | ||||
| // StopPodSandbox stops the sandbox. If there are any running containers in the | ||||
| @@ -69,17 +64,10 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if err := c.cleanupSandboxFiles(id, sandbox.Config); err != nil { | ||||
| 		return fmt.Errorf("failed to cleanup sandbox files: %w", err) | ||||
| 	if _, err := c.sandboxController.Stop(ctx, id); err != nil { | ||||
| 		return fmt.Errorf("failed to stop sandbox %q: %w", id, err) | ||||
| 	} | ||||
|  | ||||
| 	// Only stop sandbox container when it's running or unknown. | ||||
| 	state := sandbox.Status.Get().State | ||||
| 	if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown { | ||||
| 		if err := c.stopSandboxContainer(ctx, sandbox); err != nil { | ||||
| 			return fmt.Errorf("failed to stop sandbox container %q in %q state: %w", id, state, err) | ||||
| 		} | ||||
| 	} | ||||
| 	sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop) | ||||
|  | ||||
| 	// Teardown network for sandbox. | ||||
| @@ -106,58 +94,6 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // stopSandboxContainer kills the sandbox container. | ||||
| // `task.Delete` is not called here because it will be called when | ||||
| // the event monitor handles the `TaskExit` event. | ||||
| func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxstore.Sandbox) error { | ||||
| 	id := sandbox.ID | ||||
| 	container := sandbox.Container | ||||
| 	state := sandbox.Status.Get().State | ||||
| 	task, err := container.Task(ctx, nil) | ||||
| 	if err != nil { | ||||
| 		if !errdefs.IsNotFound(err) { | ||||
| 			return fmt.Errorf("failed to get sandbox container: %w", err) | ||||
| 		} | ||||
| 		// Don't return for unknown state, some cleanup needs to be done. | ||||
| 		if state == sandboxstore.StateUnknown { | ||||
| 			return cleanupUnknownSandbox(ctx, id, sandbox) | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// Handle unknown state. | ||||
| 	// The cleanup logic is the same with container unknown state. | ||||
| 	if state == sandboxstore.StateUnknown { | ||||
| 		// Start an exit handler for containers in unknown state. | ||||
| 		waitCtx, waitCancel := context.WithCancel(ctrdutil.NamespacedContext()) | ||||
| 		defer waitCancel() | ||||
| 		exitCh, err := task.Wait(waitCtx) | ||||
| 		if err != nil { | ||||
| 			if !errdefs.IsNotFound(err) { | ||||
| 				return fmt.Errorf("failed to wait for task: %w", err) | ||||
| 			} | ||||
| 			return cleanupUnknownSandbox(ctx, id, sandbox) | ||||
| 		} | ||||
|  | ||||
| 		exitCtx, exitCancel := context.WithCancel(context.Background()) | ||||
| 		stopCh := c.eventMonitor.startSandboxExitMonitor(exitCtx, id, task.Pid(), exitCh) | ||||
| 		defer func() { | ||||
| 			exitCancel() | ||||
| 			// This ensures that exit monitor is stopped before | ||||
| 			// `Wait` is cancelled, so no exit event is generated | ||||
| 			// because of the `Wait` cancellation. | ||||
| 			<-stopCh | ||||
| 		}() | ||||
| 	} | ||||
|  | ||||
| 	// Kill the sandbox container. | ||||
| 	if err = task.Kill(ctx, syscall.SIGKILL); err != nil && !errdefs.IsNotFound(err) { | ||||
| 		return fmt.Errorf("failed to kill sandbox container: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	return c.waitSandboxStop(ctx, sandbox) | ||||
| } | ||||
|  | ||||
| // waitSandboxStop waits for sandbox to be stopped until context is cancelled or | ||||
| // the context deadline is exceeded. | ||||
| func (c *criService) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox) error { | ||||
| @@ -188,15 +124,3 @@ func (c *criService) teardownPodNetwork(ctx context.Context, sandbox sandboxstor | ||||
|  | ||||
| 	return netPlugin.Remove(ctx, id, path, opts...) | ||||
| } | ||||
|  | ||||
| // cleanupUnknownSandbox cleanup stopped sandbox in unknown state. | ||||
| func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error { | ||||
| 	// Reuse handleSandboxExit to do the cleanup. | ||||
| 	return handleSandboxExit(ctx, &eventtypes.TaskExit{ | ||||
| 		ContainerID: id, | ||||
| 		ID:          id, | ||||
| 		Pid:         0, | ||||
| 		ExitStatus:  unknownExitCode, | ||||
| 		ExitedAt:    protobuf.ToTimestamp(time.Now()), | ||||
| 	}, sandbox) | ||||
| } | ||||
|   | ||||
| @@ -17,7 +17,6 @@ | ||||
| package sbserver | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| @@ -191,10 +190,10 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi | ||||
| 	return c, nil | ||||
| } | ||||
|  | ||||
| // StartSandboxExitMonitor is a temporary workaround to call monitor from pause controller. | ||||
| // BackOffEvent is a temporary workaround to call eventMonitor from controller.Stop. | ||||
| // TODO: get rid of this. | ||||
| func (c *criService) StartSandboxExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} { | ||||
| 	return c.eventMonitor.startSandboxExitMonitor(ctx, id, pid, exitCh) | ||||
| func (c *criService) BackOffEvent(id string, event interface{}) { | ||||
| 	c.eventMonitor.backOff.enBackOff(id, event) | ||||
| } | ||||
|  | ||||
| // Register registers all required services onto a specific grpc server. | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 wanglei01
					wanglei01