CRI: implement Controller.Wait for SandboxAPI

Rework sandbox monitoring, we should rely on Controller.Wait instead of
CRIService.StartSandboxExitMonitor

Signed-off-by: WangLei <wllenyj@linux.alibaba.com>
This commit is contained in:
wanglei01 2022-09-21 22:26:34 +08:00
parent e14dca4a40
commit 927906992f
5 changed files with 172 additions and 10 deletions

View File

@ -18,15 +18,21 @@ package podsandbox
import (
"context"
"fmt"
"time"
"github.com/containerd/containerd"
api "github.com/containerd/containerd/api/services/sandbox/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/oci"
criconfig "github.com/containerd/containerd/pkg/cri/config"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
osinterface "github.com/containerd/containerd/pkg/os"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/sandbox"
"github.com/sirupsen/logrus"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
@ -52,6 +58,8 @@ type Controller struct {
cri CRIService
// baseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec`
baseOCISpecs map[string]*oci.Spec
store *Store
}
func New(
@ -69,6 +77,7 @@ func New(
os: os,
cri: cri,
baseOCISpecs: baseOCISpecs,
store: NewStore(),
}
}
@ -84,10 +93,89 @@ func (c *Controller) Delete(ctx context.Context, sandboxID string) (*api.Control
panic("implement me")
}
func (c *Controller) Wait(ctx context.Context, sandboxID string) (*api.ControllerWaitResponse, error) {
func (c *Controller) Status(ctx context.Context, sandboxID string) (*api.ControllerStatusResponse, error) {
//TODO implement me
panic("implement me")
}
func (c *Controller) Status(ctx context.Context, sandboxID string) (*api.ControllerStatusResponse, error) {
panic("implement me")
func (c *Controller) Wait(ctx context.Context, sandboxID string) (*api.ControllerWaitResponse, error) {
status := c.store.Get(sandboxID)
if status == nil {
return nil, fmt.Errorf("failed to get exit channel. %q", sandboxID)
}
exitStatus, exitedAt, err := c.waitSandboxExit(ctx, sandboxID, status.Waiter)
return &api.ControllerWaitResponse{
ExitStatus: exitStatus,
ExitedAt: protobuf.ToTimestamp(exitedAt),
}, err
}
func (c *Controller) waitSandboxExit(ctx context.Context, id string, exitCh <-chan containerd.ExitStatus) (exitStatus uint32, exitedAt time.Time, err error) {
exitStatus = unknownExitCode
exitedAt = time.Now()
select {
case exitRes := <-exitCh:
logrus.Debugf("received sandbox exit %+v", exitRes)
exitStatus, exitedAt, err = exitRes.Result()
if err != nil {
logrus.WithError(err).Errorf("failed to get task exit status for %q", id)
exitStatus = unknownExitCode
exitedAt = time.Now()
}
err = func() error {
dctx := ctrdutil.NamespacedContext()
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
defer dcancel()
sb, err := c.sandboxStore.Get(id)
if err == nil {
if err := handleSandboxExit(dctx, sb); err != nil {
return err
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to get sandbox %s: %w", id, err)
}
return nil
}()
if err != nil {
logrus.WithError(err).Errorf("failed to handle sandbox TaskExit %s", id)
// Don't backoff, the caller is responsible for.
return
}
case <-ctx.Done():
return exitStatus, exitedAt, ctx.Err()
}
return
}
// handleSandboxExit handles TaskExit event for sandbox.
func handleSandboxExit(ctx context.Context, sb sandboxstore.Sandbox) error {
// No stream attached to sandbox container.
task, err := sb.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to load task for sandbox: %w", err)
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx, WithNRISandboxDelete(sb.ID), containerd.WithProcessKill); err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop sandbox: %w", err)
}
// Move on to make sure container status is updated.
}
}
sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
status.State = sandboxstore.StateNotReady
status.Pid = 0
return status, nil
})
// Using channel to propagate the information of sandbox stop
sb.Stop()
return nil
}

View File

@ -23,6 +23,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
@ -64,6 +65,15 @@ const (
runtimeRunhcsV1 = "io.containerd.runhcs.v1"
)
const (
// unknownExitCode is the exit code when exit reason is unknown.
unknownExitCode = 255
)
const (
handleEventTimeout = 10 * time.Second
)
// getSandboxRootDir returns the root directory for managing sandbox files,
// e.g. hosts files.
func (c *Controller) getSandboxRootDir(id string) string {

View File

@ -225,6 +225,7 @@ func (c *Controller) Start(ctx context.Context, id string) (_ *api.ControllerSta
if err != nil {
return nil, fmt.Errorf("failed to wait for sandbox container task: %w", err)
}
c.store.Save(id, exitCh)
nric, err := nri.New()
if err != nil {
@ -244,13 +245,6 @@ func (c *Controller) Start(ctx context.Context, id string) (_ *api.ControllerSta
return nil, fmt.Errorf("failed to start sandbox container task %q: %w", id, err)
}
// start the monitor after adding sandbox into the store, this ensures
// that sandbox is in the store, when event monitor receives the TaskExit event.
//
// TaskOOM from containerd may come before sandbox is added to store,
// but we don't care about sandbox TaskOOM right now, so it is fine.
c.cri.StartSandboxExitMonitor(context.Background(), id, task.Pid(), exitCh) // TODO: Move back to CRI service.
resp := &api.ControllerStartResponse{
SandboxID: id,
Pid: task.Pid(),

View File

@ -0,0 +1,49 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podsandbox
import (
"sync"
"github.com/containerd/containerd"
)
type Status struct {
Waiter <-chan containerd.ExitStatus
}
type Store struct {
sync.Map
}
func NewStore() *Store {
return &Store{}
}
func (s *Store) Save(id string, exitCh <-chan containerd.ExitStatus) {
s.Store(id, &Status{Waiter: exitCh})
}
func (s *Store) Get(id string) *Status {
i, ok := s.LoadAndDelete(id)
if !ok {
// not exist
return nil
}
// Only save *Status
return i.(*Status)
}

View File

@ -27,6 +27,7 @@ import (
"strings"
"time"
eventtypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/protobuf"
sb "github.com/containerd/containerd/sandbox"
"github.com/containerd/go-cni"
@ -203,6 +204,26 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
return nil, fmt.Errorf("failed to add sandbox %+v into store: %w", sandbox, err)
}
// start the monitor after adding sandbox into the store, this ensures
// that sandbox is in the store, when event monitor receives the TaskExit event.
//
// TaskOOM from containerd may come before sandbox is added to store,
// but we don't care about sandbox TaskOOM right now, so it is fine.
go func() {
resp, err := c.sandboxController.Wait(context.Background(), id)
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
e := &eventtypes.TaskExit{
ContainerID: id,
ID: id,
// Pid is not used
Pid: 0,
ExitStatus: resp.ExitStatus,
ExitedAt: resp.ExitedAt,
}
c.eventMonitor.backOff.enBackOff(id, e)
}
}()
sandboxRuntimeCreateTimer.WithValues(labels["oci_runtime_type"]).UpdateSince(runtimeStart)
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil