Merge pull request #9617 from abel-von/sandbox-plugin-0109

sandbox: use sandboxService in CRI plugin instead of calling controller API directly
This commit is contained in:
Fu Wei 2024-02-28 15:49:12 +00:00 committed by GitHub
commit 2cdf012387
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 308 additions and 170 deletions

View File

@ -129,7 +129,6 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string, tweakInitFn tweakPl
containerd.WithDefaultNamespace(constants.K8sContainerdNamespace), containerd.WithDefaultNamespace(constants.K8sContainerdNamespace),
containerd.WithDefaultPlatform(platforms.Default()), containerd.WithDefaultPlatform(platforms.Default()),
containerd.WithInMemoryServices(lastInitContext), containerd.WithInMemoryServices(lastInitContext),
containerd.WithInMemorySandboxControllers(lastInitContext),
) )
require.NoError(t, err) require.NoError(t, err)

View File

@ -277,7 +277,7 @@ type ImageConfig struct {
// "base": "docker.io/library/ubuntu:latest" // "base": "docker.io/library/ubuntu:latest"
// Migrated from: // Migrated from:
// (PluginConfig).SandboxImage string `toml:"sandbox_image" json:"sandboxImage"` // (PluginConfig).SandboxImage string `toml:"sandbox_image" json:"sandboxImage"`
PinnedImages map[string]string PinnedImages map[string]string `toml:"pinned_images" json:"pinned_images"`
// RuntimePlatforms is map between the runtime and the image platform to // RuntimePlatforms is map between the runtime and the image platform to
// use for that runtime. When resolving an image for a runtime, this // use for that runtime. When resolving an image for a runtime, this

View File

@ -63,12 +63,7 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
return nil, fmt.Errorf("failed to find sandbox id %q: %w", r.GetPodSandboxId(), err) return nil, fmt.Errorf("failed to find sandbox id %q: %w", r.GetPodSandboxId(), err)
} }
controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler) cstatus, err := c.sandboxService.SandboxStatus(ctx, sandbox.Sandboxer, sandbox.ID, false)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox controller: %w", err)
}
cstatus, err := controller.Status(ctx, sandbox.ID, false)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get controller status: %w", err) return nil, fmt.Errorf("failed to get controller status: %w", err)
} }
@ -150,7 +145,7 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
} }
}() }()
platform, err := controller.Platform(ctx, sandboxID) platform, err := c.sandboxService.SandboxPlatform(ctx, sandbox.Sandboxer, sandboxID)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to query sandbox platform: %w", err) return nil, fmt.Errorf("failed to query sandbox platform: %w", err)
} }

View File

@ -68,15 +68,12 @@ func (c *criService) getMetricsHandler(ctx context.Context, sandboxID string) (m
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to find sandbox id %q: %w", sandboxID, err) return nil, fmt.Errorf("failed to find sandbox id %q: %w", sandboxID, err)
} }
controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox controller: %w", err)
}
// Grab the platform that this containers sandbox advertises. Reason being, even if // Grab the platform that this containers sandbox advertises. Reason being, even if
// the host may be {insert platform}, if it virtualizes or emulates a different platform // the host may be {insert platform}, if it virtualizes or emulates a different platform
// it will return stats in that format, and we need to handle the conversion logic based // it will return stats in that format, and we need to handle the conversion logic based
// off of this info. // off of this info.
p, err := controller.Platform(ctx, sandboxID) p, err := c.sandboxService.SandboxPlatform(ctx, sandbox.Sandboxer, sandboxID)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -158,10 +158,10 @@ func (c *Controller) Wait(ctx context.Context, sandboxID string) (sandbox.ExitSt
} }
func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, exitCh <-chan containerd.ExitStatus) (exitStatus uint32, exitedAt time.Time, err error) { func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, exitCh <-chan containerd.ExitStatus) error {
select { select {
case e := <-exitCh: case e := <-exitCh:
exitStatus, exitedAt, err = e.Result() exitStatus, exitedAt, err := e.Result()
if err != nil { if err != nil {
log.G(ctx).WithError(err).Errorf("failed to get task exit status for %q", p.ID) log.G(ctx).WithError(err).Errorf("failed to get task exit status for %q", p.ID)
exitStatus = unknownExitCode exitStatus = unknownExitCode
@ -171,12 +171,16 @@ func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, e
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout) dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
defer dcancel() defer dcancel()
event := &eventtypes.TaskExit{ExitStatus: exitStatus, ExitedAt: protobuf.ToTimestamp(exitedAt)} event := &eventtypes.TaskExit{ExitStatus: exitStatus, ExitedAt: protobuf.ToTimestamp(exitedAt)}
if cleanErr := handleSandboxTaskExit(dctx, p, event); cleanErr != nil { if err := handleSandboxTaskExit(dctx, p, event); err != nil {
// TODO will backoff the event to the controller's own EventMonitor, but not cri's,
// because we should call handleSandboxTaskExit again the next time
// eventMonitor handle this event. but now it goes into cri's EventMonitor,
// the handleSandboxTaskExit will not be called anymore
c.cri.BackOffEvent(p.ID, e) c.cri.BackOffEvent(p.ID, e)
} }
return return nil
case <-ctx.Done(): case <-ctx.Done():
return unknownExitCode, time.Now(), ctx.Err() return ctx.Err()
} }
} }
@ -196,5 +200,8 @@ func handleSandboxTaskExit(ctx context.Context, sb *types.PodSandbox, e *eventty
} }
} }
} }
if err := sb.Exit(e.ExitStatus, protobuf.FromTimestamp(e.ExitedAt)); err != nil {
return err
}
return nil return nil
} }

View File

@ -23,7 +23,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
containerd "github.com/containerd/containerd/v2/client"
criconfig "github.com/containerd/containerd/v2/internal/cri/config" criconfig "github.com/containerd/containerd/v2/internal/cri/config"
"github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types" "github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types"
sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox" sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox"
@ -63,10 +62,7 @@ func Test_Status(t *testing.T) {
CreatedAt: createdAt, CreatedAt: createdAt,
}) })
sb.Metadata = sandboxstore.Metadata{ID: sandboxID} sb.Metadata = sandboxstore.Metadata{ID: sandboxID}
err := controller.store.Save(sb) assert.NoError(t, controller.store.Save(sb))
if err != nil {
t.Fatal(err)
}
s, err := controller.Status(context.Background(), sandboxID, false) s, err := controller.Status(context.Background(), sandboxID, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -75,7 +71,8 @@ func Test_Status(t *testing.T) {
assert.Equal(t, s.CreatedAt, createdAt) assert.Equal(t, s.CreatedAt, createdAt)
assert.Equal(t, s.State, sandboxstore.StateReady.String()) assert.Equal(t, s.State, sandboxstore.StateReady.String())
sb.Exit(*containerd.NewExitStatus(exitStatus, exitedAt, nil)) assert.NoError(t, sb.Exit(exitStatus, exitedAt))
exit, err := controller.Wait(context.Background(), sandboxID) exit, err := controller.Wait(context.Background(), sandboxID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -22,16 +22,17 @@ import (
goruntime "runtime" goruntime "runtime"
"time" "time"
"github.com/containerd/errdefs"
"github.com/containerd/log" "github.com/containerd/log"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
sandbox2 "github.com/containerd/containerd/v2/core/sandbox" sandbox2 "github.com/containerd/containerd/v2/core/sandbox"
"github.com/containerd/containerd/v2/internal/cri/config"
"github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types" "github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types"
sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox" sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/v2/internal/cri/util" ctrdutil "github.com/containerd/containerd/v2/internal/cri/util"
"github.com/containerd/containerd/v2/pkg/netns" "github.com/containerd/containerd/v2/pkg/netns"
"github.com/containerd/errdefs"
) )
// loadContainerTimeout is the default timeout for loading a container/sandbox. // loadContainerTimeout is the default timeout for loading a container/sandbox.
@ -133,8 +134,9 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta
} }
if ch != nil { if ch != nil {
go func() { go func() {
code, exitTime, err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, ch) if err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, ch); err != nil {
podSandbox.Exit(*containerd.NewExitStatus(code, exitTime, err)) log.G(context.Background()).Warnf("failed to wait pod sandbox exit %v", err)
}
}() }()
} }
@ -144,6 +146,7 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta
sandbox = sandboxstore.NewSandbox(*meta, s) sandbox = sandboxstore.NewSandbox(*meta, s)
sandbox.Container = cntr sandbox.Container = cntr
sandbox.Sandboxer = string(config.ModePodSandbox)
// Load network namespace. // Load network namespace.
sandbox.NetNS = getNetNS(meta) sandbox.NetNS = getNetNS(meta)

View File

@ -403,7 +403,7 @@ func TestRecoverContainer(t *testing.T) {
pSb := controller.store.Get(cont.ID()) pSb := controller.store.Get(cont.ID())
assert.NotNil(t, pSb) assert.NotNil(t, pSb)
assert.Equal(t, c.expectedState, pSb.State, "%s state is not expected", cont.ID()) assert.Equal(t, c.expectedState, pSb.Status.Get().State, "%s state is not expected", cont.ID())
if c.expectedExitCode > 0 { if c.expectedExitCode > 0 {
cont.t.waitExitCh <- struct{}{} cont.t.waitExitCh <- struct{}{}

View File

@ -216,7 +216,6 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
if err != nil { if err != nil {
return cin, fmt.Errorf("failed to get sandbox container info: %w", err) return cin, fmt.Errorf("failed to get sandbox container info: %w", err)
} }
podSandbox.CreatedAt = info.CreatedAt
// Create sandbox task in containerd. // Create sandbox task in containerd.
log.G(ctx).Tracef("Create sandbox container (id=%q, name=%q).", id, metadata.Name) log.G(ctx).Tracef("Create sandbox container (id=%q, name=%q).", id, metadata.Name)
@ -242,7 +241,6 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
} }
} }
}() }()
podSandbox.Pid = task.Pid()
// wait is a long running background request, no timeout needed. // wait is a long running background request, no timeout needed.
exitCh, err := task.Wait(ctrdutil.NamespacedContext()) exitCh, err := task.Wait(ctrdutil.NamespacedContext())
@ -267,7 +265,15 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
if err := task.Start(ctx); err != nil { if err := task.Start(ctx); err != nil {
return cin, fmt.Errorf("failed to start sandbox container task %q: %w", id, err) return cin, fmt.Errorf("failed to start sandbox container task %q: %w", id, err)
} }
podSandbox.State = sandboxstore.StateReady pid := task.Pid()
if err := podSandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
status.Pid = pid
status.State = sandboxstore.StateReady
status.CreatedAt = info.CreatedAt
return status, nil
}); err != nil {
return cin, fmt.Errorf("failed to update status of pod sandbox %q: %w", id, err)
}
cin.SandboxID = id cin.SandboxID = id
cin.Pid = task.Pid() cin.Pid = task.Pid()
@ -275,8 +281,9 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
cin.Labels = labels cin.Labels = labels
go func() { go func() {
code, exitTime, err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, exitCh) if err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, exitCh); err != nil {
podSandbox.Exit(*containerd.NewExitStatus(code, exitTime, err)) log.G(context.Background()).Warnf("failed to wait pod sandbox exit %v", err)
}
}() }()
return return

View File

@ -36,18 +36,15 @@ func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool)
if sb == nil { if sb == nil {
return sandbox.ControllerStatus{}, fmt.Errorf("unable to find sandbox %q: %w", sandboxID, errdefs.ErrNotFound) return sandbox.ControllerStatus{}, fmt.Errorf("unable to find sandbox %q: %w", sandboxID, errdefs.ErrNotFound)
} }
status := sb.Status.Get()
cstatus := sandbox.ControllerStatus{ cstatus := sandbox.ControllerStatus{
SandboxID: sandboxID, SandboxID: sandboxID,
Pid: sb.Pid, Pid: status.Pid,
State: sb.State.String(), State: status.State.String(),
CreatedAt: sb.CreatedAt, CreatedAt: status.CreatedAt,
ExitedAt: status.ExitedAt,
Extra: nil, Extra: nil,
} }
exitStatus := sb.GetExitStatus()
if exitStatus != nil {
cstatus.ExitedAt = exitStatus.ExitTime()
}
if verbose { if verbose {
info, err := toCRISandboxInfo(ctx, sb) info, err := toCRISandboxInfo(ctx, sb)
@ -64,7 +61,7 @@ func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool)
// toCRISandboxInfo converts internal container object information to CRI sandbox status response info map. // toCRISandboxInfo converts internal container object information to CRI sandbox status response info map.
func toCRISandboxInfo(ctx context.Context, sb *types.PodSandbox) (map[string]string, error) { func toCRISandboxInfo(ctx context.Context, sb *types.PodSandbox) (map[string]string, error) {
si := &critypes.SandboxInfo{ si := &critypes.SandboxInfo{
Pid: sb.Pid, Pid: sb.Status.Get().Pid,
Config: sb.Metadata.Config, Config: sb.Metadata.Config,
RuntimeHandler: sb.Metadata.RuntimeHandler, RuntimeHandler: sb.Metadata.RuntimeHandler,
CNIResult: sb.Metadata.CNIResult, CNIResult: sb.Metadata.CNIResult,

View File

@ -22,16 +22,15 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/containerd/errdefs"
"github.com/containerd/log" "github.com/containerd/log"
eventtypes "github.com/containerd/containerd/v2/api/events" eventtypes "github.com/containerd/containerd/v2/api/events"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/core/sandbox"
"github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types" "github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types"
sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox" sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/v2/internal/cri/util" ctrdutil "github.com/containerd/containerd/v2/internal/cri/util"
"github.com/containerd/containerd/v2/protobuf" "github.com/containerd/containerd/v2/protobuf"
"github.com/containerd/errdefs"
) )
func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.StopOpt) error { func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.StopOpt) error {
@ -46,7 +45,7 @@ func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.St
if err != nil { if err != nil {
return err return err
} }
state := podSandbox.State state := podSandbox.Status.Get().State
if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown { if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown {
if err := c.stopSandboxContainer(ctx, podSandbox); err != nil { if err := c.stopSandboxContainer(ctx, podSandbox); err != nil {
return fmt.Errorf("failed to stop sandbox container %q in %q state: %w", sandboxID, state, err) return fmt.Errorf("failed to stop sandbox container %q in %q state: %w", sandboxID, state, err)
@ -64,7 +63,7 @@ func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.St
func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types.PodSandbox) error { func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types.PodSandbox) error {
id := podSandbox.ID id := podSandbox.ID
container := podSandbox.Container container := podSandbox.Container
state := podSandbox.State state := podSandbox.Status.Get().State
task, err := container.Task(ctx, nil) task, err := container.Task(ctx, nil)
if err != nil { if err != nil {
if !errdefs.IsNotFound(err) { if !errdefs.IsNotFound(err) {
@ -95,12 +94,8 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types
stopCh := make(chan struct{}) stopCh := make(chan struct{})
go func() { go func() {
defer close(stopCh) defer close(stopCh)
exitStatus, exitedAt, err := c.waitSandboxExit(exitCtx, podSandbox, exitCh) err := c.waitSandboxExit(exitCtx, podSandbox, exitCh)
if err != context.Canceled && err != context.DeadlineExceeded { if err != nil {
// The error of context.Canceled or context.DeadlineExceeded indicates the task.Wait is not finished,
// so we can not set the exit status of the pod sandbox.
podSandbox.Exit(*containerd.NewExitStatus(exitStatus, exitedAt, err))
} else {
log.G(ctx).WithError(err).Errorf("Failed to wait pod sandbox exit %+v", err) log.G(ctx).WithError(err).Errorf("Failed to wait pod sandbox exit %+v", err)
} }
}() }()

View File

@ -18,7 +18,6 @@ package types
import ( import (
"context" "context"
"sync"
"time" "time"
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
@ -28,16 +27,12 @@ import (
) )
type PodSandbox struct { type PodSandbox struct {
mu sync.Mutex
ID string ID string
Container containerd.Container Container containerd.Container
State sandboxstore.State
Metadata sandboxstore.Metadata Metadata sandboxstore.Metadata
Runtime sandbox.RuntimeOpts Runtime sandbox.RuntimeOpts
Pid uint32 Status sandboxstore.StatusStorage
CreatedAt time.Time
stopChan *store.StopCh stopChan *store.StopCh
exitStatus *containerd.ExitStatus
} }
func NewPodSandbox(id string, status sandboxstore.Status) *PodSandbox { func NewPodSandbox(id string, status sandboxstore.Status) *PodSandbox {
@ -45,39 +40,34 @@ func NewPodSandbox(id string, status sandboxstore.Status) *PodSandbox {
ID: id, ID: id,
Container: nil, Container: nil,
stopChan: store.NewStopCh(), stopChan: store.NewStopCh(),
CreatedAt: status.CreatedAt, Status: sandboxstore.StoreStatus(status),
State: status.State,
Pid: status.Pid,
} }
if status.State == sandboxstore.StateNotReady { if status.State == sandboxstore.StateNotReady {
podSandbox.Exit(*containerd.NewExitStatus(status.ExitStatus, status.ExitedAt, nil)) podSandbox.stopChan.Stop()
} }
return podSandbox return podSandbox
} }
func (p *PodSandbox) Exit(status containerd.ExitStatus) { func (p *PodSandbox) Exit(code uint32, exitTime time.Time) error {
p.mu.Lock() if err := p.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
defer p.mu.Unlock() status.State = sandboxstore.StateNotReady
p.exitStatus = &status status.ExitStatus = code
p.State = sandboxstore.StateNotReady status.ExitedAt = exitTime
status.Pid = 0
return status, nil
}); err != nil {
return err
}
p.stopChan.Stop() p.stopChan.Stop()
return nil
} }
func (p *PodSandbox) Wait(ctx context.Context) (*containerd.ExitStatus, error) { func (p *PodSandbox) Wait(ctx context.Context) (containerd.ExitStatus, error) {
s := p.GetExitStatus()
if s != nil {
return s, nil
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return containerd.ExitStatus{}, ctx.Err()
case <-p.stopChan.Stopped(): case <-p.stopChan.Stopped():
return p.GetExitStatus(), nil status := p.Status.Get()
return *containerd.NewExitStatus(status.ExitStatus, status.ExitedAt, nil), nil
} }
} }
func (p *PodSandbox) GetExitStatus() *containerd.ExitStatus {
p.mu.Lock()
defer p.mu.Unlock()
return p.exitStatus
}

View File

@ -0,0 +1,73 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/containerd/containerd/v2/internal/cri/store/sandbox"
)
func Test_PodSandbox(t *testing.T) {
p := NewPodSandbox("test", sandbox.Status{State: sandbox.StateUnknown})
assert.Equal(t, p.Status.Get().State, sandbox.StateUnknown)
assert.Equal(t, p.ID, "test")
p.Metadata = sandbox.Metadata{ID: "test", NetNSPath: "/test"}
createAt := time.Now()
assert.NoError(t, p.Status.Update(func(status sandbox.Status) (sandbox.Status, error) {
status.State = sandbox.StateReady
status.Pid = uint32(100)
status.CreatedAt = createAt
return status, nil
}))
status := p.Status.Get()
assert.Equal(t, status.State, sandbox.StateReady)
assert.Equal(t, status.Pid, uint32(100))
assert.Equal(t, status.CreatedAt, createAt)
exitAt := time.Now().Add(time.Second)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
_, err := p.Wait(ctx)
assert.Equal(t, err, ctx.Err())
}()
wg.Add(1)
go func() {
defer wg.Done()
exitStatus, err := p.Wait(context.Background())
assert.Equal(t, err, nil)
code, exitTime, err := exitStatus.Result()
assert.Equal(t, err, nil)
assert.Equal(t, code, uint32(128))
assert.Equal(t, exitTime, exitAt)
}()
time.Sleep(time.Second)
if err := p.Exit(uint32(128), exitAt); err != nil {
t.Fatalf("failed to set exit of pod sandbox %v", err)
}
wg.Wait()
}

View File

@ -58,8 +58,10 @@ func (c *criService) recover(ctx context.Context) error {
return fmt.Errorf("failed to list sandbox containers: %w", err) return fmt.Errorf("failed to list sandbox containers: %w", err)
} }
podSandboxController := c.client.SandboxController(string(criconfig.ModePodSandbox)) podSandboxController, err := c.sandboxService.SandboxController(string(criconfig.ModePodSandbox))
if err != nil {
return fmt.Errorf("failed to get podsanbox controller %v", err)
}
podSandboxLoader, ok := podSandboxController.(podSandboxRecover) podSandboxLoader, ok := podSandboxController.(podSandboxRecover)
if !ok { if !ok {
log.G(ctx).Fatal("pod sandbox controller doesn't support recovery") log.G(ctx).Fatal("pod sandbox controller doesn't support recovery")
@ -134,6 +136,7 @@ func (c *criService) recover(ctx context.Context) error {
} }
sb := sandboxstore.NewSandbox(metadata, sandboxstore.Status{State: state}) sb := sandboxstore.NewSandbox(metadata, sandboxstore.Status{State: state})
sb.Sandboxer = sbx.Sandboxer
// Load network namespace. // Load network namespace.
sb.NetNS = getNetNS(&metadata) sb.NetNS = getNetNS(&metadata)
@ -149,20 +152,11 @@ func (c *criService) recover(ctx context.Context) error {
if status.State == sandboxstore.StateNotReady { if status.State == sandboxstore.StateNotReady {
continue continue
} }
controller, err := c.sandboxService.SandboxController(sb.Config, sb.RuntimeHandler) exitCh, err := c.sandboxService.WaitSandbox(ctrdutil.NamespacedContext(), sb.Sandboxer, sb.ID)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Error("failed to get sandbox controller while waiting sandbox") log.G(ctx).WithError(err).Error("failed to wait sandbox")
continue continue
} }
exitCh := make(chan containerd.ExitStatus, 1)
go func() {
exit, err := controller.Wait(ctrdutil.NamespacedContext(), sb.ID)
if err != nil {
log.G(ctx).WithError(err).Error("failed to wait for sandbox exit")
exitCh <- *containerd.NewExitStatus(containerd.UnknownExitStatus, time.Time{}, err)
}
exitCh <- *containerd.NewExitStatus(exit.ExitStatus, exit.ExitedAt, nil)
}()
c.eventMonitor.startSandboxExitMonitor(context.Background(), sb.ID, exitCh) c.eventMonitor.startSandboxExitMonitor(context.Background(), sb.ID, exitCh)
} }
// Recover all containers. // Recover all containers.

View File

@ -79,13 +79,7 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
} }
} }
// Use sandbox controller to delete sandbox if err := c.sandboxService.ShutdownSandbox(ctx, sandbox.Sandboxer, id); err != nil && !errdefs.IsNotFound(err) {
controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox controller: %w", err)
}
if err := controller.Shutdown(ctx, id); err != nil && !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to delete sandbox %q: %w", id, err) return nil, fmt.Errorf("failed to delete sandbox %q: %w", id, err)
} }

View File

@ -31,7 +31,6 @@ import (
"github.com/containerd/typeurl/v2" "github.com/containerd/typeurl/v2"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerd "github.com/containerd/containerd/v2/client"
sb "github.com/containerd/containerd/v2/core/sandbox" sb "github.com/containerd/containerd/v2/core/sandbox"
"github.com/containerd/containerd/v2/internal/cri/annotations" "github.com/containerd/containerd/v2/internal/cri/annotations"
"github.com/containerd/containerd/v2/internal/cri/bandwidth" "github.com/containerd/containerd/v2/internal/cri/bandwidth"
@ -124,6 +123,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
CreatedAt: time.Now().UTC(), CreatedAt: time.Now().UTC(),
}, },
) )
sandbox.Sandboxer = ociRuntime.Sandboxer
if _, err := c.client.SandboxStore().Create(ctx, sandboxInfo); err != nil { if _, err := c.client.SandboxStore().Create(ctx, sandboxInfo); err != nil {
return nil, fmt.Errorf("failed to save sandbox metadata: %w", err) return nil, fmt.Errorf("failed to save sandbox metadata: %w", err)
@ -240,21 +240,16 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
return nil, fmt.Errorf("unable to save sandbox %q to store: %w", id, err) return nil, fmt.Errorf("unable to save sandbox %q to store: %w", id, err)
} }
controller, err := c.sandboxService.SandboxController(config, r.GetRuntimeHandler())
if err != nil {
return nil, fmt.Errorf("failed to get sandbox controller: %w", err)
}
// Save sandbox metadata to store // Save sandbox metadata to store
if sandboxInfo, err = c.client.SandboxStore().Update(ctx, sandboxInfo, "extensions"); err != nil { if sandboxInfo, err = c.client.SandboxStore().Update(ctx, sandboxInfo, "extensions"); err != nil {
return nil, fmt.Errorf("unable to update extensions for sandbox %q: %w", id, err) return nil, fmt.Errorf("unable to update extensions for sandbox %q: %w", id, err)
} }
if err := controller.Create(ctx, sandboxInfo, sb.WithOptions(config), sb.WithNetNSPath(sandbox.NetNSPath)); err != nil { if err := c.sandboxService.CreateSandbox(ctx, sandboxInfo, sb.WithOptions(config), sb.WithNetNSPath(sandbox.NetNSPath)); err != nil {
return nil, fmt.Errorf("failed to create sandbox %q: %w", id, err) return nil, fmt.Errorf("failed to create sandbox %q: %w", id, err)
} }
ctrl, err := controller.Start(ctx, id) ctrl, err := c.sandboxService.StartSandbox(ctx, sandbox.Sandboxer, id)
if err != nil { if err != nil {
var cerr podsandbox.CleanupErr var cerr podsandbox.CleanupErr
if errors.As(err, &cerr) { if errors.As(err, &cerr) {
@ -401,22 +396,11 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
// SandboxStatus from the store and include it in the event. // SandboxStatus from the store and include it in the event.
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT) c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
// TODO: Use sandbox client instead exitCh, err := c.sandboxService.WaitSandbox(util.NamespacedContext(), sandbox.Sandboxer, id)
exitCh := make(chan containerd.ExitStatus, 1)
go func() {
defer close(exitCh)
ctx := util.NamespacedContext()
resp, err := controller.Wait(ctx, id)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Error("failed to wait for sandbox exit") return nil, fmt.Errorf("failed to wait sandbox %s: %v", id, err)
exitCh <- *containerd.NewExitStatus(containerd.UnknownExitStatus, time.Time{}, err)
return
} }
exitCh <- *containerd.NewExitStatus(resp.ExitStatus, resp.ExitedAt, nil)
}()
// start the monitor after adding sandbox into the store, this ensures // start the monitor after adding sandbox into the store, this ensures
// that sandbox is in the store, when event monitor receives the TaskExit event. // that sandbox is in the store, when event monitor receives the TaskExit event.
// //

View File

@ -17,9 +17,11 @@
package server package server
import ( import (
"context"
"fmt" "fmt"
"time"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" "github.com/containerd/platforms"
"github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/core/sandbox"
@ -27,21 +29,91 @@ import (
) )
type criSandboxService struct { type criSandboxService struct {
cli *client.Client sandboxControllers map[string]sandbox.Controller
config *criconfig.Config config *criconfig.Config
} }
func newCriSandboxService(config *criconfig.Config, c *client.Client) *criSandboxService { func newCriSandboxService(config *criconfig.Config, sandboxers map[string]sandbox.Controller) *criSandboxService {
return &criSandboxService{ return &criSandboxService{
cli: c, sandboxControllers: sandboxers,
config: config, config: config,
} }
} }
func (c *criSandboxService) SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error) { func (c *criSandboxService) SandboxController(sandboxer string) (sandbox.Controller, error) {
ociRuntime, err := c.config.GetSandboxRuntime(config, runtimeHandler) sbController, ok := c.sandboxControllers[sandboxer]
if err != nil { if !ok {
return nil, fmt.Errorf("failed to get sandbox runtime: %w", err) return nil, fmt.Errorf("failed to get sandbox controller by %s", sandboxer)
} }
return c.cli.SandboxController(ociRuntime.Sandboxer), nil return sbController, nil
}
func (c *criSandboxService) CreateSandbox(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error {
ctrl, err := c.SandboxController(info.Sandboxer)
if err != nil {
return err
}
return ctrl.Create(ctx, info, opts...)
}
func (c *criSandboxService) StartSandbox(ctx context.Context, sandboxer string, sandboxID string) (sandbox.ControllerInstance, error) {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return sandbox.ControllerInstance{}, err
}
return ctrl.Start(ctx, sandboxID)
}
func (c *criSandboxService) WaitSandbox(ctx context.Context, sandboxer string, sandboxID string) (<-chan client.ExitStatus, error) {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return nil, err
}
ch := make(chan client.ExitStatus, 1)
go func() {
defer close(ch)
exitStatus, err := ctrl.Wait(ctx, sandboxID)
if err != nil {
ch <- *client.NewExitStatus(client.UnknownExitStatus, time.Time{}, err)
return
}
ch <- *client.NewExitStatus(exitStatus.ExitStatus, exitStatus.ExitedAt, nil)
}()
return ch, nil
}
func (c *criSandboxService) SandboxStatus(ctx context.Context, sandboxer string, sandboxID string, verbose bool) (sandbox.ControllerStatus, error) {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return sandbox.ControllerStatus{}, err
}
return ctrl.Status(ctx, sandboxID, verbose)
}
func (c *criSandboxService) SandboxPlatform(ctx context.Context, sandboxer string, sandboxID string) (platforms.Platform, error) {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return platforms.Platform{}, err
}
return ctrl.Platform(ctx, sandboxID)
}
func (c *criSandboxService) ShutdownSandbox(ctx context.Context, sandboxer string, sandboxID string) error {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return err
}
return ctrl.Shutdown(ctx, sandboxID)
}
func (c *criSandboxService) StopSandbox(ctx context.Context, sandboxer, sandboxID string, opts ...sandbox.StopOpt) error {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return err
}
return ctrl.Stop(ctx, sandboxID, opts...)
} }

View File

@ -41,17 +41,12 @@ func (c *criService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandbox
return nil, fmt.Errorf("failed to get sandbox ip: %w", err) return nil, fmt.Errorf("failed to get sandbox ip: %w", err)
} }
controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox controller: %w", err)
}
var ( var (
createdAt time.Time createdAt time.Time
state string state string
info map[string]string info map[string]string
) )
cstatus, err := controller.Status(ctx, sandbox.ID, r.GetVerbose()) cstatus, err := c.sandboxService.SandboxStatus(ctx, sandbox.Sandboxer, sandbox.ID, r.GetVerbose())
if err != nil { if err != nil {
// If the shim died unexpectedly (segfault etc.) let's set the state as // If the shim died unexpectedly (segfault etc.) let's set the state as
// NOTREADY and not just error out to make k8s and clients like crictl // NOTREADY and not just error out to make k8s and clients like crictl

View File

@ -68,13 +68,7 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa
// Only stop sandbox container when it's running or unknown. // Only stop sandbox container when it's running or unknown.
state := sandbox.Status.Get().State state := sandbox.Status.Get().State
if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown { if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown {
// Use sandbox controller to stop sandbox if err := c.sandboxService.StopSandbox(ctx, sandbox.Sandboxer, id); err != nil {
controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler)
if err != nil {
return fmt.Errorf("failed to get sandbox controller: %w", err)
}
if err := controller.Stop(ctx, id); err != nil {
// Log and ignore the error if controller already removed the sandbox // Log and ignore the error if controller already removed the sandbox
if errdefs.IsNotFound(err) { if errdefs.IsNotFound(err) {
log.G(ctx).Warnf("sandbox %q is not found when stopping it", id) log.G(ctx).Warnf("sandbox %q is not found when stopping it", id)

View File

@ -28,9 +28,9 @@ import (
"github.com/containerd/go-cni" "github.com/containerd/go-cni"
"github.com/containerd/log" "github.com/containerd/log"
"github.com/containerd/platforms"
"github.com/containerd/typeurl/v2" "github.com/containerd/typeurl/v2"
"github.com/opencontainers/runtime-spec/specs-go/features" "github.com/opencontainers/runtime-spec/specs-go/features"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubelet/pkg/cri/streaming" "k8s.io/kubelet/pkg/cri/streaming"
@ -74,7 +74,14 @@ type CRIService interface {
} }
type sandboxService interface { type sandboxService interface {
SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error) CreateSandbox(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error
StartSandbox(ctx context.Context, sandboxer string, sandboxID string) (sandbox.ControllerInstance, error)
WaitSandbox(ctx context.Context, sandboxer string, sandboxID string) (<-chan containerd.ExitStatus, error)
StopSandbox(ctx context.Context, sandboxer, sandboxID string, opts ...sandbox.StopOpt) error
ShutdownSandbox(ctx context.Context, sandboxer string, sandboxID string) error
SandboxStatus(ctx context.Context, sandboxer string, sandboxID string, verbose bool) (sandbox.ControllerStatus, error)
SandboxPlatform(ctx context.Context, sandboxer string, sandboxID string) (platforms.Platform, error)
SandboxController(sandboxer string) (sandbox.Controller, error)
} }
// RuntimeService specifies dependencies to runtime service which provides // RuntimeService specifies dependencies to runtime service which provides
@ -188,7 +195,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
sandboxNameIndex: registrar.NewRegistrar(), sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(), containerNameIndex: registrar.NewRegistrar(),
netPlugin: make(map[string]cni.CNI), netPlugin: make(map[string]cni.CNI),
sandboxService: newCriSandboxService(&config, options.Client), sandboxService: newCriSandboxService(&config, options.SandboxControllers),
} }
// TODO: Make discard time configurable // TODO: Make discard time configurable
@ -231,8 +238,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
} }
// Initialize pod sandbox controller // Initialize pod sandbox controller
// TODO: Get this from options, NOT client podSandboxController := options.SandboxControllers[string(criconfig.ModePodSandbox)].(*podsandbox.Controller)
podSandboxController := options.Client.SandboxController(string(criconfig.ModePodSandbox)).(*podsandbox.Controller)
podSandboxController.Init(c) podSandboxController.Init(c)
c.nri = options.NRI c.nri = options.NRI

View File

@ -19,10 +19,12 @@ package server
import ( import (
"context" "context"
"github.com/containerd/errdefs"
"github.com/containerd/go-cni" "github.com/containerd/go-cni"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" "github.com/containerd/platforms"
"github.com/containerd/containerd/v2/api/types" "github.com/containerd/containerd/v2/api/types"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/core/sandbox"
criconfig "github.com/containerd/containerd/v2/internal/cri/config" criconfig "github.com/containerd/containerd/v2/internal/cri/config"
containerstore "github.com/containerd/containerd/v2/internal/cri/store/container" containerstore "github.com/containerd/containerd/v2/internal/cri/store/container"
@ -32,13 +34,39 @@ import (
"github.com/containerd/containerd/v2/internal/registrar" "github.com/containerd/containerd/v2/internal/registrar"
"github.com/containerd/containerd/v2/pkg/oci" "github.com/containerd/containerd/v2/pkg/oci"
ostesting "github.com/containerd/containerd/v2/pkg/os/testing" ostesting "github.com/containerd/containerd/v2/pkg/os/testing"
"github.com/containerd/errdefs"
"github.com/containerd/platforms"
) )
type fakeSandboxService struct{} type fakeSandboxService struct{}
func (f *fakeSandboxService) SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error) { func (f *fakeSandboxService) CreateSandbox(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error {
return errdefs.ErrNotImplemented
}
func (f *fakeSandboxService) StartSandbox(ctx context.Context, sandboxer string, sandboxID string) (sandbox.ControllerInstance, error) {
return sandbox.ControllerInstance{}, errdefs.ErrNotImplemented
}
func (f *fakeSandboxService) StopSandbox(ctx context.Context, sandboxer, sandboxID string, opts ...sandbox.StopOpt) error {
return errdefs.ErrNotImplemented
}
func (f *fakeSandboxService) ShutdownSandbox(ctx context.Context, sandboxer string, sandboxID string) error {
return errdefs.ErrNotImplemented
}
func (f *fakeSandboxService) WaitSandbox(ctx context.Context, sandboxer string, sandboxID string) (<-chan containerd.ExitStatus, error) {
return nil, errdefs.ErrNotImplemented
}
func (f *fakeSandboxService) SandboxStatus(ctx context.Context, sandboxer string, sandboxID string, verbose bool) (sandbox.ControllerStatus, error) {
return sandbox.ControllerStatus{}, errdefs.ErrNotImplemented
}
func (f *fakeSandboxService) SandboxPlatform(ctx context.Context, sandboxer string, sandboxID string) (platforms.Platform, error) {
return platforms.DefaultSpec(), nil
}
func (f *fakeSandboxService) SandboxController(sandboxer string) (sandbox.Controller, error) {
return &fakeSandboxController{}, nil return &fakeSandboxController{}, nil
} }

View File

@ -37,6 +37,8 @@ type Sandbox struct {
Status StatusStorage Status StatusStorage
// Container is the containerd sandbox container client. // Container is the containerd sandbox container client.
Container containerd.Container Container containerd.Container
// Sandboxer is the sandbox controller name of the sandbox
Sandboxer string
// CNI network namespace client. // CNI network namespace client.
// For hostnetwork pod, this is always nil; // For hostnetwork pod, this is always nil;
// For non hostnetwork pod, this should never be nil. // For non hostnetwork pod, this should never be nil.

View File

@ -24,6 +24,8 @@ import (
"github.com/containerd/log" "github.com/containerd/log"
"github.com/containerd/plugin" "github.com/containerd/plugin"
"github.com/containerd/plugin/registry" "github.com/containerd/plugin/registry"
"google.golang.org/grpc"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/core/sandbox"
@ -37,10 +39,6 @@ import (
"github.com/containerd/containerd/v2/plugins/services/warning" "github.com/containerd/containerd/v2/plugins/services/warning"
"github.com/containerd/containerd/v2/version" "github.com/containerd/containerd/v2/version"
"github.com/containerd/platforms" "github.com/containerd/platforms"
"google.golang.org/grpc"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
) )
// Register CRI service plugin // Register CRI service plugin
@ -127,10 +125,9 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
return nil, fmt.Errorf("failed to create containerd client: %w", err) return nil, fmt.Errorf("failed to create containerd client: %w", err)
} }
// TODO(dmcgowan): Get the full list directly from configured plugins sbControllers, err := getSandboxControllers(ic)
sbControllers := map[string]sandbox.Controller{ if err != nil {
string(criconfig.ModePodSandbox): client.SandboxController(string(criconfig.ModePodSandbox)), return nil, fmt.Errorf("failed to get sandbox controllers from plugins %v", err)
string(criconfig.ModeShim): client.SandboxController(string(criconfig.ModeShim)),
} }
streamingConfig, err := config.StreamingConfig() streamingConfig, err := config.StreamingConfig()
@ -240,3 +237,15 @@ func getNRIAPI(ic *plugin.InitContext) *nri.API {
return nri.NewAPI(api) return nri.NewAPI(api)
} }
func getSandboxControllers(ic *plugin.InitContext) (map[string]sandbox.Controller, error) {
sandboxers, err := ic.GetByType(plugins.SandboxControllerPlugin)
if err != nil {
return nil, err
}
sc := make(map[string]sandbox.Controller)
for name, p := range sandboxers {
sc[name] = p.(sandbox.Controller)
}
return sc, nil
}