Merge pull request #9463 from abel-von/sandbox-plugin-1205
sandbox: remove sandboxStore from podsandbox controller
This commit is contained in:
commit
9f8b845334
@ -34,16 +34,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/containers"
|
||||
cri "github.com/containerd/containerd/v2/integration/cri-api/pkg/apis"
|
||||
_ "github.com/containerd/containerd/v2/integration/images" // Keep this around to parse `imageListFile` command line var
|
||||
"github.com/containerd/containerd/v2/integration/remote"
|
||||
dialer "github.com/containerd/containerd/v2/integration/remote/util"
|
||||
criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/constants"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/util"
|
||||
"github.com/containerd/log"
|
||||
"github.com/opencontainers/selinux/go-selinux"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -52,6 +42,17 @@ import (
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/containers"
|
||||
cri "github.com/containerd/containerd/v2/integration/cri-api/pkg/apis"
|
||||
_ "github.com/containerd/containerd/v2/integration/images" // Keep this around to parse `imageListFile` command line var
|
||||
"github.com/containerd/containerd/v2/integration/remote"
|
||||
dialer "github.com/containerd/containerd/v2/integration/remote/util"
|
||||
criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/constants"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/base"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/util"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -676,7 +677,7 @@ func CRIConfig() (*criconfig.Config, error) {
|
||||
}
|
||||
|
||||
// SandboxInfo gets sandbox info.
|
||||
func SandboxInfo(id string) (*runtime.PodSandboxStatus, *server.SandboxInfo, error) {
|
||||
func SandboxInfo(id string) (*runtime.PodSandboxStatus, *base.SandboxInfo, error) {
|
||||
client, err := RawRuntimeClient()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to get raw runtime client: %w", err)
|
||||
@ -689,7 +690,7 @@ func SandboxInfo(id string) (*runtime.PodSandboxStatus, *server.SandboxInfo, err
|
||||
return nil, nil, fmt.Errorf("failed to get sandbox status: %w", err)
|
||||
}
|
||||
status := resp.GetStatus()
|
||||
var info server.SandboxInfo
|
||||
var info base.SandboxInfo
|
||||
if err := json.Unmarshal([]byte(resp.GetInfo()["info"]), &info); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to unmarshal sandbox info: %w", err)
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
criapiv1 "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/base"
|
||||
"github.com/containerd/containerd/v2/pkg/failpoint"
|
||||
)
|
||||
|
||||
@ -299,7 +299,7 @@ func TestRunPodSandboxAndTeardownCNISlow(t *testing.T) {
|
||||
}
|
||||
|
||||
// sbserverSandboxInfo gets sandbox info.
|
||||
func sbserverSandboxInfo(id string) (*criapiv1.PodSandboxStatus, *podsandbox.SandboxInfo, error) {
|
||||
func sbserverSandboxInfo(id string) (*criapiv1.PodSandboxStatus, *base.SandboxInfo, error) {
|
||||
client, err := RawRuntimeClient()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to get raw runtime client: %w", err)
|
||||
@ -312,7 +312,7 @@ func sbserverSandboxInfo(id string) (*criapiv1.PodSandboxStatus, *podsandbox.San
|
||||
return nil, nil, fmt.Errorf("failed to get sandbox status: %w", err)
|
||||
}
|
||||
status := resp.GetStatus()
|
||||
var info podsandbox.SandboxInfo
|
||||
var info base.SandboxInfo
|
||||
if err := json.Unmarshal([]byte(resp.GetInfo()["info"]), &info); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to unmarshal sandbox info: %w", err)
|
||||
}
|
||||
|
47
pkg/cri/server/base/sandbox_info.go
Normal file
47
pkg/cri/server/base/sandbox_info.go
Normal file
@ -0,0 +1,47 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package base
|
||||
|
||||
import (
|
||||
"github.com/containerd/go-cni"
|
||||
"github.com/opencontainers/runtime-spec/specs-go"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
"github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
)
|
||||
|
||||
// SandboxInfo is extra information for sandbox.
|
||||
// TODO (mikebrow): discuss predefining constants structures for some or all of these field names in CRI
|
||||
type SandboxInfo struct {
|
||||
Pid uint32 `json:"pid"`
|
||||
Status string `json:"processStatus"`
|
||||
NetNSClosed bool `json:"netNamespaceClosed"`
|
||||
Image string `json:"image"`
|
||||
SnapshotKey string `json:"snapshotKey"`
|
||||
Snapshotter string `json:"snapshotter"`
|
||||
// Note: a new field `RuntimeHandler` has been added into the CRI PodSandboxStatus struct, and
|
||||
// should be set. This `RuntimeHandler` field will be deprecated after containerd 1.3 (tracked
|
||||
// in https://github.com/containerd/cri/issues/1064).
|
||||
RuntimeHandler string `json:"runtimeHandler"` // see the Note above
|
||||
RuntimeType string `json:"runtimeType"`
|
||||
RuntimeOptions interface{} `json:"runtimeOptions"`
|
||||
Config *runtime.PodSandboxConfig `json:"config"`
|
||||
// Note: RuntimeSpec may not be populated if the sandbox has not been fully created.
|
||||
RuntimeSpec *specs.Spec `json:"runtimeSpec"`
|
||||
CNIResult *cni.Result `json:"cniResult"`
|
||||
Metadata *sandbox.Metadata `json:"sandboxMetadata"`
|
||||
}
|
@ -23,6 +23,11 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/typeurl/v2"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
"k8s.io/utils/clock"
|
||||
|
||||
eventtypes "github.com/containerd/containerd/v2/api/events"
|
||||
apitasks "github.com/containerd/containerd/v2/api/services/tasks/v1"
|
||||
containerdio "github.com/containerd/containerd/v2/cio"
|
||||
@ -34,10 +39,6 @@ import (
|
||||
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util"
|
||||
"github.com/containerd/containerd/v2/protobuf"
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/typeurl/v2"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -108,7 +109,7 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
|
||||
}
|
||||
|
||||
// startSandboxExitMonitor starts an exit monitor for a given sandbox.
|
||||
func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
|
||||
func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
|
||||
stopCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stopCh)
|
||||
|
@ -34,8 +34,8 @@ import (
|
||||
"github.com/containerd/containerd/v2/pkg/cri/constants"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/base"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/images"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types"
|
||||
imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image"
|
||||
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util"
|
||||
osinterface "github.com/containerd/containerd/v2/pkg/os"
|
||||
"github.com/containerd/containerd/v2/platforms"
|
||||
@ -116,8 +116,6 @@ type Controller struct {
|
||||
client *containerd.Client
|
||||
// imageService is a dependency to CRI image service.
|
||||
imageService ImageService
|
||||
// sandboxStore stores all resources associated with sandboxes.
|
||||
sandboxStore *sandboxstore.Store
|
||||
// os is an interface for all required os operations.
|
||||
os osinterface.OS
|
||||
// cri is CRI service that provides missing gaps needed by controller.
|
||||
@ -129,11 +127,9 @@ type Controller struct {
|
||||
}
|
||||
|
||||
func (c *Controller) Init(
|
||||
sandboxStore *sandboxstore.Store,
|
||||
cri CRIService,
|
||||
) {
|
||||
c.cri = cri
|
||||
c.sandboxStore = sandboxStore
|
||||
}
|
||||
|
||||
var _ sandbox.Controller = (*Controller)(nil)
|
||||
@ -143,63 +139,46 @@ func (c *Controller) Platform(_ctx context.Context, _sandboxID string) (platform
|
||||
}
|
||||
|
||||
func (c *Controller) Wait(ctx context.Context, sandboxID string) (sandbox.ExitStatus, error) {
|
||||
status := c.store.Get(sandboxID)
|
||||
if status == nil {
|
||||
podSandbox := c.store.Get(sandboxID)
|
||||
if podSandbox == nil {
|
||||
return sandbox.ExitStatus{}, fmt.Errorf("failed to get exit channel. %q", sandboxID)
|
||||
|
||||
}
|
||||
exit, err := podSandbox.Wait(ctx)
|
||||
if err != nil {
|
||||
return sandbox.ExitStatus{}, fmt.Errorf("failed to wait pod sandbox, %w", err)
|
||||
}
|
||||
|
||||
exitStatus, exitedAt, err := c.waitSandboxExit(ctx, sandboxID, status.Waiter)
|
||||
|
||||
return sandbox.ExitStatus{
|
||||
ExitStatus: exitStatus,
|
||||
ExitedAt: exitedAt,
|
||||
ExitStatus: exit.ExitCode(),
|
||||
ExitedAt: exit.ExitTime(),
|
||||
}, err
|
||||
|
||||
}
|
||||
|
||||
func (c *Controller) waitSandboxExit(ctx context.Context, id string, exitCh <-chan containerd.ExitStatus) (exitStatus uint32, exitedAt time.Time, err error) {
|
||||
exitStatus = unknownExitCode
|
||||
exitedAt = time.Now()
|
||||
func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, exitCh <-chan containerd.ExitStatus) (exitStatus uint32, exitedAt time.Time, err error) {
|
||||
select {
|
||||
case exitRes := <-exitCh:
|
||||
log.G(ctx).Debugf("received sandbox exit %+v", exitRes)
|
||||
|
||||
exitStatus, exitedAt, err = exitRes.Result()
|
||||
case e := <-exitCh:
|
||||
exitStatus, exitedAt, err = e.Result()
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("failed to get task exit status for %q", id)
|
||||
log.G(ctx).WithError(err).Errorf("failed to get task exit status for %q", p.ID)
|
||||
exitStatus = unknownExitCode
|
||||
exitedAt = time.Now()
|
||||
}
|
||||
|
||||
err = func() error {
|
||||
dctx := ctrdutil.NamespacedContext()
|
||||
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
|
||||
defer dcancel()
|
||||
|
||||
sb, err := c.sandboxStore.Get(id)
|
||||
if err == nil {
|
||||
if err := handleSandboxExit(dctx, sb, &eventtypes.TaskExit{ExitStatus: exitStatus, ExitedAt: protobuf.ToTimestamp(exitedAt)}); err != nil {
|
||||
return err
|
||||
event := &eventtypes.TaskExit{ExitStatus: exitStatus, ExitedAt: protobuf.ToTimestamp(exitedAt)}
|
||||
if cleanErr := handleSandboxTaskExit(dctx, p, event); cleanErr != nil {
|
||||
c.cri.BackOffEvent(p.ID, e)
|
||||
}
|
||||
return nil
|
||||
} else if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to get sandbox %s: %w", id, err)
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("failed to handle sandbox TaskExit %s", id)
|
||||
// Don't backoff, the caller is responsible for.
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return exitStatus, exitedAt, ctx.Err()
|
||||
return unknownExitCode, time.Now(), ctx.Err()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// handleSandboxExit handles TaskExit event for sandbox.
|
||||
// TODO https://github.com/containerd/containerd/issues/7548
|
||||
func handleSandboxExit(ctx context.Context, sb sandboxstore.Sandbox, e *eventtypes.TaskExit) error {
|
||||
// handleSandboxTaskExit handles TaskExit event for sandbox.
|
||||
func handleSandboxTaskExit(ctx context.Context, sb *types.PodSandbox, e *eventtypes.TaskExit) error {
|
||||
// No stream attached to sandbox container.
|
||||
task, err := sb.Container.Task(ctx, nil)
|
||||
if err != nil {
|
||||
@ -212,17 +191,7 @@ func handleSandboxExit(ctx context.Context, sb sandboxstore.Sandbox, e *eventtyp
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to stop sandbox: %w", err)
|
||||
}
|
||||
// Move on to make sure container status is updated.
|
||||
}
|
||||
}
|
||||
sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
|
||||
status.State = sandboxstore.StateNotReady
|
||||
status.Pid = 0
|
||||
status.ExitStatus = e.ExitStatus
|
||||
status.ExitedAt = e.ExitedAt.AsTime()
|
||||
return status, nil
|
||||
})
|
||||
// Using channel to propagate the information of sandbox stop
|
||||
sb.Stop()
|
||||
return nil
|
||||
}
|
||||
|
@ -21,11 +21,13 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/store/label"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types"
|
||||
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
ostesting "github.com/containerd/containerd/v2/pkg/os/testing"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -48,11 +50,10 @@ var testConfig = criconfig.Config{
|
||||
|
||||
// newControllerService creates a fake criService for test.
|
||||
func newControllerService() *Controller {
|
||||
labels := label.NewStore()
|
||||
return &Controller{
|
||||
config: testConfig,
|
||||
os: ostesting.NewFakeOS(),
|
||||
sandboxStore: sandboxstore.NewStore(labels),
|
||||
store: NewStore(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -60,20 +61,14 @@ func Test_Status(t *testing.T) {
|
||||
sandboxID, pid, exitStatus := "1", uint32(1), uint32(0)
|
||||
createdAt, exitedAt := time.Now(), time.Now()
|
||||
controller := newControllerService()
|
||||
status := sandboxstore.Status{
|
||||
|
||||
sb := types.NewPodSandbox(sandboxID, sandboxstore.Status{
|
||||
State: sandboxstore.StateReady,
|
||||
Pid: pid,
|
||||
CreatedAt: createdAt,
|
||||
ExitStatus: exitStatus,
|
||||
ExitedAt: exitedAt,
|
||||
State: sandboxstore.StateReady,
|
||||
}
|
||||
sb := sandboxstore.Sandbox{
|
||||
Metadata: sandboxstore.Metadata{
|
||||
ID: sandboxID,
|
||||
},
|
||||
Status: sandboxstore.StoreStatus(status),
|
||||
}
|
||||
err := controller.sandboxStore.Add(sb)
|
||||
})
|
||||
sb.Metadata = sandboxstore.Metadata{ID: sandboxID}
|
||||
err := controller.store.Save(sb)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -82,6 +77,20 @@ func Test_Status(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.Equal(t, s.Pid, pid)
|
||||
assert.Equal(t, s.ExitedAt, exitedAt)
|
||||
assert.Equal(t, s.CreatedAt, createdAt)
|
||||
assert.Equal(t, s.State, sandboxstore.StateReady.String())
|
||||
|
||||
sb.Exit(*containerd.NewExitStatus(exitStatus, exitedAt, nil))
|
||||
exit, err := controller.Wait(context.Background(), sandboxID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.Equal(t, exit.ExitStatus, exitStatus)
|
||||
assert.Equal(t, exit.ExitedAt, exitedAt)
|
||||
|
||||
s, err = controller.Status(context.Background(), sandboxID, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.Equal(t, s.State, sandboxstore.StateNotReady.String())
|
||||
}
|
||||
|
@ -23,6 +23,12 @@ import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/typeurl/v2"
|
||||
docker "github.com/distribution/reference"
|
||||
imagedigest "github.com/opencontainers/go-digest"
|
||||
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/containers"
|
||||
clabels "github.com/containerd/containerd/v2/labels"
|
||||
@ -30,12 +36,8 @@ import (
|
||||
criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
|
||||
crilabels "github.com/containerd/containerd/v2/pkg/cri/labels"
|
||||
imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image"
|
||||
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util"
|
||||
"github.com/containerd/log"
|
||||
docker "github.com/distribution/reference"
|
||||
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
|
||||
|
||||
imagedigest "github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -196,3 +198,24 @@ func (c *Controller) runtimeSnapshotter(ctx context.Context, ociRuntime criconfi
|
||||
log.G(ctx).Debugf("Set snapshotter for runtime %s to %s", ociRuntime.Type, ociRuntime.Snapshotter)
|
||||
return ociRuntime.Snapshotter
|
||||
}
|
||||
|
||||
func getMetadata(ctx context.Context, container containerd.Container) (*sandboxstore.Metadata, error) {
|
||||
// Load sandbox metadata.
|
||||
exts, err := container.Extensions(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get sandbox container extensions: %w", err)
|
||||
}
|
||||
ext, ok := exts[crilabels.SandboxMetadataExtension]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("metadata extension %q not found", crilabels.SandboxMetadataExtension)
|
||||
}
|
||||
data, err := typeurl.UnmarshalAny(ext)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal metadata extension %q: %w", ext, err)
|
||||
}
|
||||
meta, ok := data.(*sandboxstore.Metadata)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed to convert the extension to sandbox metadata")
|
||||
}
|
||||
return meta, nil
|
||||
}
|
||||
|
@ -22,16 +22,16 @@ import (
|
||||
goruntime "runtime"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/v2/pkg/netns"
|
||||
"github.com/containerd/typeurl/v2"
|
||||
"github.com/containerd/log"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/errdefs"
|
||||
crilabels "github.com/containerd/containerd/v2/pkg/cri/labels"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types"
|
||||
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util"
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/containerd/v2/pkg/netns"
|
||||
sandbox2 "github.com/containerd/containerd/v2/sandbox"
|
||||
)
|
||||
|
||||
// loadContainerTimeout is the default timeout for loading a container/sandbox.
|
||||
@ -51,36 +51,29 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta
|
||||
ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout)
|
||||
defer cancel()
|
||||
var sandbox sandboxstore.Sandbox
|
||||
// Load sandbox metadata.
|
||||
exts, err := cntr.Extensions(ctx)
|
||||
meta, err := getMetadata(ctx, cntr)
|
||||
if err != nil {
|
||||
return sandbox, fmt.Errorf("failed to get sandbox container extensions: %w", err)
|
||||
return sandbox, err
|
||||
}
|
||||
ext, ok := exts[crilabels.SandboxMetadataExtension]
|
||||
if !ok {
|
||||
return sandbox, fmt.Errorf("metadata extension %q not found", crilabels.SandboxMetadataExtension)
|
||||
}
|
||||
data, err := typeurl.UnmarshalAny(ext)
|
||||
if err != nil {
|
||||
return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %w", ext, err)
|
||||
}
|
||||
meta := data.(*sandboxstore.Metadata)
|
||||
|
||||
s, err := func() (sandboxstore.Status, error) {
|
||||
status := sandboxstore.Status{
|
||||
State: sandboxstore.StateUnknown,
|
||||
}
|
||||
// Load sandbox created timestamp.
|
||||
info, err := cntr.Info(ctx)
|
||||
if err != nil {
|
||||
return status, fmt.Errorf("failed to get sandbox container info: %w", err)
|
||||
return sandbox, fmt.Errorf("failed to get sandbox container info: %w", err)
|
||||
}
|
||||
|
||||
s, ch, err := func() (sandboxstore.Status, <-chan containerd.ExitStatus, error) {
|
||||
status := sandboxstore.Status{
|
||||
State: sandboxstore.StateUnknown,
|
||||
}
|
||||
var channel <-chan containerd.ExitStatus
|
||||
|
||||
status.CreatedAt = info.CreatedAt
|
||||
|
||||
// Load sandbox state.
|
||||
t, err := cntr.Task(ctx, nil)
|
||||
if err != nil && !errdefs.IsNotFound(err) {
|
||||
return status, fmt.Errorf("failed to load task: %w", err)
|
||||
return status, channel, fmt.Errorf("failed to load task: %w", err)
|
||||
}
|
||||
var taskStatus containerd.Status
|
||||
var notFound bool
|
||||
@ -93,7 +86,7 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta
|
||||
if err != nil {
|
||||
// It's still possible that task is deleted during this window.
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return status, fmt.Errorf("failed to get task status: %w", err)
|
||||
return status, channel, fmt.Errorf("failed to get task status: %w", err)
|
||||
}
|
||||
notFound = true
|
||||
}
|
||||
@ -103,37 +96,48 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta
|
||||
status.State = sandboxstore.StateNotReady
|
||||
} else {
|
||||
if taskStatus.Status == containerd.Running {
|
||||
// Wait for the task for sandbox monitor.
|
||||
// wait is a long running background request, no timeout needed.
|
||||
exitCh, err := t.Wait(ctrdutil.NamespacedContext())
|
||||
if err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return status, fmt.Errorf("failed to wait for task: %w", err)
|
||||
}
|
||||
status.State = sandboxstore.StateNotReady
|
||||
} else {
|
||||
// Task is running, set sandbox state as READY.
|
||||
status.State = sandboxstore.StateReady
|
||||
status.Pid = t.Pid()
|
||||
|
||||
go func() {
|
||||
c.waitSandboxExit(context.Background(), meta.ID, exitCh)
|
||||
}()
|
||||
exitCh, err := t.Wait(ctrdutil.NamespacedContext())
|
||||
if err != nil {
|
||||
return status, channel, fmt.Errorf("failed to wait for sandbox container task: %w", err)
|
||||
}
|
||||
channel = exitCh
|
||||
} else {
|
||||
// Task is not running. Delete the task and set sandbox state as NOTREADY.
|
||||
if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
|
||||
return status, fmt.Errorf("failed to delete task: %w", err)
|
||||
return status, channel, fmt.Errorf("failed to delete task: %w", err)
|
||||
}
|
||||
status.State = sandboxstore.StateNotReady
|
||||
}
|
||||
}
|
||||
return status, nil
|
||||
return status, channel, nil
|
||||
}()
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID())
|
||||
}
|
||||
|
||||
// save it to cache in the podsandbox controller
|
||||
podSandbox := types.NewPodSandbox(cntr.ID(), s)
|
||||
podSandbox.Container = cntr
|
||||
if meta != nil {
|
||||
podSandbox.Metadata = *meta
|
||||
}
|
||||
podSandbox.Runtime = sandbox2.RuntimeOpts{
|
||||
Name: info.Runtime.Name,
|
||||
Options: info.Runtime.Options,
|
||||
}
|
||||
if ch != nil {
|
||||
go func() {
|
||||
code, exitTime, err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, ch)
|
||||
podSandbox.Exit(*containerd.NewExitStatus(code, exitTime, err))
|
||||
}()
|
||||
}
|
||||
|
||||
if err := c.store.Save(podSandbox); err != nil {
|
||||
return sandbox, fmt.Errorf("failed to save pod sandbox container in mem store: %w", err)
|
||||
}
|
||||
|
||||
sandbox = sandboxstore.NewSandbox(*meta, s)
|
||||
sandbox.Container = cntr
|
||||
|
||||
|
@ -27,11 +27,8 @@ import (
|
||||
)
|
||||
|
||||
func (c *Controller) Shutdown(ctx context.Context, sandboxID string) error {
|
||||
sandbox, err := c.sandboxStore.Get(sandboxID)
|
||||
if err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("an error occurred when try to find sandbox %q: %w", sandboxID, err)
|
||||
}
|
||||
sandbox := c.store.Get(sandboxID)
|
||||
if sandbox == nil {
|
||||
// Do not return error if the id doesn't exist.
|
||||
log.G(ctx).Tracef("Sandbox controller Delete called for sandbox %q that does not exist", sandboxID)
|
||||
return nil
|
||||
@ -62,6 +59,8 @@ func (c *Controller) Shutdown(ctx context.Context, sandboxID string) error {
|
||||
}
|
||||
}
|
||||
|
||||
c.store.Remove(sandboxID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"github.com/containerd/containerd/v2/errdefs"
|
||||
crilabels "github.com/containerd/containerd/v2/pkg/cri/labels"
|
||||
customopts "github.com/containerd/containerd/v2/pkg/cri/opts"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types"
|
||||
imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image"
|
||||
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util"
|
||||
@ -62,16 +63,11 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
|
||||
retErr = errors.Join(retErr, CleanupErr{cleanupErr})
|
||||
}
|
||||
}()
|
||||
|
||||
sandboxInfo, err := c.client.SandboxStore().Get(ctx, id)
|
||||
if err != nil {
|
||||
return cin, fmt.Errorf("unable to find sandbox with id %q: %w", id, err)
|
||||
}
|
||||
|
||||
var metadata sandboxstore.Metadata
|
||||
if err := sandboxInfo.GetExtension(MetadataKey, &metadata); err != nil {
|
||||
return cin, fmt.Errorf("failed to get sandbox %q metadata: %w", id, err)
|
||||
podSandbox := c.store.Get(id)
|
||||
if podSandbox == nil {
|
||||
return cin, fmt.Errorf("unable to find pod sandbox with id %q: %w", id, errdefs.ErrNotFound)
|
||||
}
|
||||
metadata := podSandbox.Metadata
|
||||
|
||||
var (
|
||||
config = metadata.Config
|
||||
@ -147,13 +143,14 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
|
||||
containerd.WithSpec(spec, specOpts...),
|
||||
containerd.WithContainerLabels(sandboxLabels),
|
||||
containerd.WithContainerExtension(crilabels.SandboxMetadataExtension, &metadata),
|
||||
containerd.WithRuntime(ociRuntime.Type, sandboxInfo.Runtime.Options),
|
||||
containerd.WithRuntime(ociRuntime.Type, podSandbox.Runtime.Options),
|
||||
}
|
||||
|
||||
container, err := c.client.NewContainer(ctx, id, opts...)
|
||||
if err != nil {
|
||||
return cin, fmt.Errorf("failed to create containerd container: %w", err)
|
||||
}
|
||||
podSandbox.Container = container
|
||||
defer func() {
|
||||
if retErr != nil && cleanupErr == nil {
|
||||
deferCtx, deferCancel := ctrdutil.DeferContext()
|
||||
@ -161,6 +158,7 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
|
||||
if cleanupErr = container.Delete(deferCtx, containerd.WithSnapshotCleanup); cleanupErr != nil {
|
||||
log.G(ctx).WithError(cleanupErr).Errorf("Failed to delete containerd container %q", id)
|
||||
}
|
||||
podSandbox.Container = nil
|
||||
}
|
||||
}()
|
||||
|
||||
@ -213,6 +211,7 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
|
||||
if err != nil {
|
||||
return cin, fmt.Errorf("failed to get sandbox container info: %w", err)
|
||||
}
|
||||
podSandbox.CreatedAt = info.CreatedAt
|
||||
|
||||
// Create sandbox task in containerd.
|
||||
log.G(ctx).Tracef("Create sandbox container (id=%q, name=%q).", id, metadata.Name)
|
||||
@ -238,13 +237,13 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
|
||||
}
|
||||
}
|
||||
}()
|
||||
podSandbox.Pid = task.Pid()
|
||||
|
||||
// wait is a long running background request, no timeout needed.
|
||||
exitCh, err := task.Wait(ctrdutil.NamespacedContext())
|
||||
if err != nil {
|
||||
return cin, fmt.Errorf("failed to wait for sandbox container task: %w", err)
|
||||
}
|
||||
c.store.Save(id, exitCh)
|
||||
|
||||
nric, err := nri.New()
|
||||
if err != nil {
|
||||
@ -263,18 +262,30 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
|
||||
if err := task.Start(ctx); err != nil {
|
||||
return cin, fmt.Errorf("failed to start sandbox container task %q: %w", id, err)
|
||||
}
|
||||
podSandbox.State = sandboxstore.StateReady
|
||||
|
||||
cin.SandboxID = id
|
||||
cin.Pid = task.Pid()
|
||||
cin.CreatedAt = info.CreatedAt
|
||||
cin.Labels = labels
|
||||
|
||||
go func() {
|
||||
code, exitTime, err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, exitCh)
|
||||
podSandbox.Exit(*containerd.NewExitStatus(code, exitTime, err))
|
||||
}()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Controller) Create(ctx context.Context, _info sandbox.Sandbox, _ ...sandbox.CreateOpt) error {
|
||||
// Not used by pod-sandbox implementation as there is no need to split pause containers logic.
|
||||
return nil
|
||||
func (c *Controller) Create(_ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error {
|
||||
metadata := sandboxstore.Metadata{}
|
||||
if err := info.GetExtension(MetadataKey, &metadata); err != nil {
|
||||
return fmt.Errorf("failed to get sandbox %q metadata: %w", info.ID, err)
|
||||
}
|
||||
podSandbox := types.NewPodSandbox(info.ID, sandboxstore.Status{State: sandboxstore.StateUnknown})
|
||||
podSandbox.Metadata = metadata
|
||||
podSandbox.Runtime = info.Runtime
|
||||
return c.store.Save(podSandbox)
|
||||
}
|
||||
|
||||
func (c *Controller) ensureImageExists(ctx context.Context, ref string, config *runtime.PodSandboxConfig) (*imagestore.Image, error) {
|
||||
|
@ -21,57 +21,32 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/containerd/typeurl/v2"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/containers"
|
||||
"github.com/containerd/containerd/v2/errdefs"
|
||||
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/base"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types"
|
||||
"github.com/containerd/containerd/v2/sandbox"
|
||||
"github.com/containerd/go-cni"
|
||||
"github.com/containerd/typeurl/v2"
|
||||
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
)
|
||||
|
||||
// SandboxInfo is extra information for sandbox.
|
||||
// TODO (mikebrow): discuss predefining constants structures for some or all of these field names in CRI
|
||||
type SandboxInfo struct {
|
||||
Pid uint32 `json:"pid"`
|
||||
Status string `json:"processStatus"`
|
||||
NetNSClosed bool `json:"netNamespaceClosed"`
|
||||
Image string `json:"image"`
|
||||
SnapshotKey string `json:"snapshotKey"`
|
||||
Snapshotter string `json:"snapshotter"`
|
||||
// Note: a new field `RuntimeHandler` has been added into the CRI PodSandboxStatus struct, and
|
||||
// should be set. This `RuntimeHandler` field will be deprecated after containerd 1.3 (tracked
|
||||
// in https://github.com/containerd/cri/issues/1064).
|
||||
RuntimeHandler string `json:"runtimeHandler"` // see the Note above
|
||||
RuntimeType string `json:"runtimeType"`
|
||||
RuntimeOptions interface{} `json:"runtimeOptions"`
|
||||
Config *runtime.PodSandboxConfig `json:"config"`
|
||||
// Note: RuntimeSpec may not be populated if the sandbox has not been fully created.
|
||||
RuntimeSpec *runtimespec.Spec `json:"runtimeSpec"`
|
||||
CNIResult *cni.Result `json:"cniResult"`
|
||||
Metadata *sandboxstore.Metadata `json:"sandboxMetadata"`
|
||||
}
|
||||
|
||||
func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool) (sandbox.ControllerStatus, error) {
|
||||
sb, err := c.sandboxStore.Get(sandboxID)
|
||||
if err != nil {
|
||||
return sandbox.ControllerStatus{}, fmt.Errorf("an error occurred while trying to find sandbox %q: %w",
|
||||
sandboxID, err)
|
||||
sb := c.store.Get(sandboxID)
|
||||
if sb == nil {
|
||||
return sandbox.ControllerStatus{}, fmt.Errorf("unable to find sandbox %q: %w", sandboxID, errdefs.ErrNotFound)
|
||||
}
|
||||
|
||||
status := sb.Status.Get()
|
||||
cstatus := sandbox.ControllerStatus{
|
||||
SandboxID: sandboxID,
|
||||
Pid: status.Pid,
|
||||
State: status.State.String(),
|
||||
CreatedAt: status.CreatedAt,
|
||||
Pid: sb.Pid,
|
||||
State: sb.State.String(),
|
||||
CreatedAt: sb.CreatedAt,
|
||||
Extra: nil,
|
||||
}
|
||||
|
||||
if !status.ExitedAt.IsZero() {
|
||||
cstatus.ExitedAt = status.ExitedAt
|
||||
exitStatus := sb.GetExitStatus()
|
||||
if exitStatus != nil {
|
||||
cstatus.ExitedAt = exitStatus.ExitTime()
|
||||
}
|
||||
|
||||
if verbose {
|
||||
@ -87,15 +62,16 @@ func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool)
|
||||
}
|
||||
|
||||
// toCRISandboxInfo converts internal container object information to CRI sandbox status response info map.
|
||||
func toCRISandboxInfo(ctx context.Context, sandbox sandboxstore.Sandbox) (map[string]string, error) {
|
||||
si := &SandboxInfo{
|
||||
Pid: sandbox.Status.Get().Pid,
|
||||
Config: sandbox.Config,
|
||||
RuntimeHandler: sandbox.RuntimeHandler,
|
||||
CNIResult: sandbox.CNIResult,
|
||||
func toCRISandboxInfo(ctx context.Context, sb *types.PodSandbox) (map[string]string, error) {
|
||||
si := &base.SandboxInfo{
|
||||
Pid: sb.Pid,
|
||||
Config: sb.Metadata.Config,
|
||||
RuntimeHandler: sb.Metadata.RuntimeHandler,
|
||||
CNIResult: sb.Metadata.CNIResult,
|
||||
Metadata: &sb.Metadata,
|
||||
}
|
||||
|
||||
if container := sandbox.Container; container != nil {
|
||||
if container := sb.Container; container != nil {
|
||||
task, err := container.Task(ctx, nil)
|
||||
if err != nil && !errdefs.IsNotFound(err) {
|
||||
return nil, fmt.Errorf("failed to get sandbox container task: %w", err)
|
||||
@ -145,18 +121,16 @@ func toCRISandboxInfo(ctx context.Context, sandbox sandboxstore.Sandbox) (map[st
|
||||
// status which does not exist in containerd.
|
||||
si.Status = "deleted"
|
||||
}
|
||||
|
||||
if sandbox.NetNS != nil {
|
||||
netns := getNetNS(&sb.Metadata)
|
||||
if netns != nil {
|
||||
// Add network closed information if sandbox is not using host network.
|
||||
closed, err := sandbox.NetNS.Closed()
|
||||
closed, err := netns.Closed()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to check network namespace closed: %w", err)
|
||||
}
|
||||
si.NetNSClosed = closed
|
||||
}
|
||||
|
||||
si.Metadata = &sandbox.Metadata
|
||||
|
||||
infoBytes, err := json.Marshal(si)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal info %v: %w", si, err)
|
||||
|
@ -22,52 +22,57 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/log"
|
||||
|
||||
eventtypes "github.com/containerd/containerd/v2/api/events"
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/errdefs"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types"
|
||||
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util"
|
||||
"github.com/containerd/containerd/v2/protobuf"
|
||||
"github.com/containerd/containerd/v2/sandbox"
|
||||
"github.com/containerd/log"
|
||||
)
|
||||
|
||||
func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.StopOpt) error {
|
||||
sandbox, err := c.sandboxStore.Get(sandboxID)
|
||||
podSandbox := c.store.Get(sandboxID)
|
||||
if podSandbox == nil {
|
||||
return errdefs.ErrNotFound
|
||||
}
|
||||
if podSandbox.Container == nil {
|
||||
return nil
|
||||
}
|
||||
meta, err := getMetadata(ctx, podSandbox.Container)
|
||||
if err != nil {
|
||||
return fmt.Errorf("an error occurred when try to find sandbox %q: %w",
|
||||
sandboxID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.cleanupSandboxFiles(sandboxID, sandbox.Config); err != nil {
|
||||
return fmt.Errorf("failed to cleanup sandbox files: %w", err)
|
||||
}
|
||||
|
||||
// TODO: The Controller maintains its own Status instead of CRI's sandboxStore.
|
||||
// Only stop sandbox container when it's running or unknown.
|
||||
state := sandbox.Status.Get().State
|
||||
if (state == sandboxstore.StateReady || state == sandboxstore.StateUnknown) && sandbox.Container != nil {
|
||||
if err := c.stopSandboxContainer(ctx, sandbox); err != nil {
|
||||
state := podSandbox.State
|
||||
if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown {
|
||||
if err := c.stopSandboxContainer(ctx, podSandbox); err != nil {
|
||||
return fmt.Errorf("failed to stop sandbox container %q in %q state: %w", sandboxID, state, err)
|
||||
}
|
||||
}
|
||||
if err := c.cleanupSandboxFiles(sandboxID, meta.Config); err != nil {
|
||||
return fmt.Errorf("failed to cleanup sandbox files: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// stopSandboxContainer kills the sandbox container.
|
||||
// `task.Delete` is not called here because it will be called when
|
||||
// the event monitor handles the `TaskExit` event.
|
||||
func (c *Controller) stopSandboxContainer(ctx context.Context, sandbox sandboxstore.Sandbox) error {
|
||||
id := sandbox.ID
|
||||
container := sandbox.Container
|
||||
state := sandbox.Status.Get().State
|
||||
func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types.PodSandbox) error {
|
||||
id := podSandbox.ID
|
||||
container := podSandbox.Container
|
||||
state := podSandbox.State
|
||||
task, err := container.Task(ctx, nil)
|
||||
if err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to get sandbox container: %w", err)
|
||||
return fmt.Errorf("failed to get pod sandbox container: %w", err)
|
||||
}
|
||||
// Don't return for unknown state, some cleanup needs to be done.
|
||||
if state == sandboxstore.StateUnknown {
|
||||
return cleanupUnknownSandbox(ctx, id, sandbox)
|
||||
return cleanupUnknownSandbox(ctx, id, podSandbox)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -75,7 +80,7 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, sandbox sandboxst
|
||||
// Handle unknown state.
|
||||
// The cleanup logic is the same with container unknown state.
|
||||
if state == sandboxstore.StateUnknown {
|
||||
// Start an exit handler for containers in unknown state.
|
||||
// Start an exit handler for sandbox container in unknown state.
|
||||
waitCtx, waitCancel := context.WithCancel(ctrdutil.NamespacedContext())
|
||||
defer waitCancel()
|
||||
exitCh, err := task.Wait(waitCtx)
|
||||
@ -83,23 +88,20 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, sandbox sandboxst
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to wait for task: %w", err)
|
||||
}
|
||||
return cleanupUnknownSandbox(ctx, id, sandbox)
|
||||
return cleanupUnknownSandbox(ctx, id, podSandbox)
|
||||
}
|
||||
|
||||
exitCtx, exitCancel := context.WithCancel(context.Background())
|
||||
stopCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stopCh)
|
||||
exitStatus, exitedAt, err := c.waitSandboxExit(exitCtx, id, exitCh)
|
||||
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
|
||||
e := &eventtypes.SandboxExit{
|
||||
SandboxID: id,
|
||||
ExitStatus: exitStatus,
|
||||
ExitedAt: protobuf.ToTimestamp(exitedAt),
|
||||
}
|
||||
log.G(ctx).WithError(err).Errorf("Failed to wait sandbox exit %+v", e)
|
||||
// TODO: how to backoff
|
||||
c.cri.BackOffEvent(id, e)
|
||||
exitStatus, exitedAt, err := c.waitSandboxExit(exitCtx, podSandbox, exitCh)
|
||||
if err != context.Canceled && err != context.DeadlineExceeded {
|
||||
// The error of context.Canceled or context.DeadlineExceeded indicates the task.Wait is not finished,
|
||||
// so we can not set the exit status of the pod sandbox.
|
||||
podSandbox.Exit(*containerd.NewExitStatus(exitStatus, exitedAt, err))
|
||||
} else {
|
||||
log.G(ctx).WithError(err).Errorf("Failed to wait pod sandbox exit %+v", err)
|
||||
}
|
||||
}()
|
||||
defer func() {
|
||||
@ -111,27 +113,17 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, sandbox sandboxst
|
||||
}()
|
||||
}
|
||||
|
||||
// Kill the sandbox container.
|
||||
// Kill the pod sandbox container.
|
||||
if err = task.Kill(ctx, syscall.SIGKILL); err != nil && !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to kill sandbox container: %w", err)
|
||||
return fmt.Errorf("failed to kill pod sandbox container: %w", err)
|
||||
}
|
||||
|
||||
return c.waitSandboxStop(ctx, sandbox)
|
||||
}
|
||||
|
||||
// waitSandboxStop waits for sandbox to be stopped until context is cancelled or
|
||||
// the context deadline is exceeded.
|
||||
func (c *Controller) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("wait sandbox container %q: %w", sandbox.ID, ctx.Err())
|
||||
case <-sandbox.Stopped():
|
||||
return nil
|
||||
}
|
||||
_, err = podSandbox.Wait(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
// cleanupUnknownSandbox cleanup stopped sandbox in unknown state.
|
||||
func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error {
|
||||
// Reuse handleSandboxExit to do the cleanup.
|
||||
return handleSandboxExit(ctx, sandbox, &eventtypes.TaskExit{ExitStatus: unknownExitCode, ExitedAt: protobuf.ToTimestamp(time.Now())})
|
||||
func cleanupUnknownSandbox(ctx context.Context, id string, sandbox *types.PodSandbox) error {
|
||||
// Reuse handleSandboxTaskExit to do the cleanup.
|
||||
return handleSandboxTaskExit(ctx, sandbox, &eventtypes.TaskExit{ExitStatus: unknownExitCode, ExitedAt: protobuf.ToTimestamp(time.Now())})
|
||||
}
|
||||
|
@ -17,33 +17,40 @@
|
||||
package podsandbox
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types"
|
||||
)
|
||||
|
||||
type Status struct {
|
||||
Waiter <-chan containerd.ExitStatus
|
||||
}
|
||||
|
||||
type Store struct {
|
||||
sync.Map
|
||||
m sync.Map
|
||||
}
|
||||
|
||||
func NewStore() *Store {
|
||||
return &Store{}
|
||||
}
|
||||
|
||||
func (s *Store) Save(id string, exitCh <-chan containerd.ExitStatus) {
|
||||
s.Store(id, &Status{Waiter: exitCh})
|
||||
func (s *Store) Save(p *types.PodSandbox) error {
|
||||
if p == nil {
|
||||
return fmt.Errorf("pod sandbox should not be nil")
|
||||
}
|
||||
s.m.Store(p.ID, p)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) Get(id string) *Status {
|
||||
i, ok := s.LoadAndDelete(id)
|
||||
func (s *Store) Get(id string) *types.PodSandbox {
|
||||
i, ok := s.m.Load(id)
|
||||
if !ok {
|
||||
// not exist
|
||||
return nil
|
||||
}
|
||||
// Only save *Status
|
||||
return i.(*Status)
|
||||
return i.(*types.PodSandbox)
|
||||
}
|
||||
|
||||
func (s *Store) Remove(id string) *types.PodSandbox {
|
||||
i, ok := s.m.LoadAndDelete(id)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return i.(*types.PodSandbox)
|
||||
}
|
||||
|
83
pkg/cri/server/podsandbox/types/podsandbox.go
Normal file
83
pkg/cri/server/podsandbox/types/podsandbox.go
Normal file
@ -0,0 +1,83 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package types
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/store"
|
||||
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
"github.com/containerd/containerd/v2/sandbox"
|
||||
)
|
||||
|
||||
type PodSandbox struct {
|
||||
mu sync.Mutex
|
||||
ID string
|
||||
Container containerd.Container
|
||||
State sandboxstore.State
|
||||
Metadata sandboxstore.Metadata
|
||||
Runtime sandbox.RuntimeOpts
|
||||
Pid uint32
|
||||
CreatedAt time.Time
|
||||
stopChan *store.StopCh
|
||||
exitStatus *containerd.ExitStatus
|
||||
}
|
||||
|
||||
func NewPodSandbox(id string, status sandboxstore.Status) *PodSandbox {
|
||||
podSandbox := &PodSandbox{
|
||||
ID: id,
|
||||
Container: nil,
|
||||
stopChan: store.NewStopCh(),
|
||||
CreatedAt: status.CreatedAt,
|
||||
State: status.State,
|
||||
Pid: status.Pid,
|
||||
}
|
||||
if status.State == sandboxstore.StateNotReady {
|
||||
podSandbox.Exit(*containerd.NewExitStatus(status.ExitStatus, status.ExitedAt, nil))
|
||||
}
|
||||
return podSandbox
|
||||
}
|
||||
|
||||
func (p *PodSandbox) Exit(status containerd.ExitStatus) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.exitStatus = &status
|
||||
p.State = sandboxstore.StateNotReady
|
||||
p.stopChan.Stop()
|
||||
}
|
||||
|
||||
func (p *PodSandbox) Wait(ctx context.Context) (*containerd.ExitStatus, error) {
|
||||
s := p.GetExitStatus()
|
||||
if s != nil {
|
||||
return s, nil
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-p.stopChan.Stopped():
|
||||
return p.GetExitStatus(), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PodSandbox) GetExitStatus() *containerd.ExitStatus {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
return p.exitStatus
|
||||
}
|
@ -146,6 +146,28 @@ func (c *criService) recover(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
for _, sb := range c.sandboxStore.List() {
|
||||
sb := sb
|
||||
status := sb.Status.Get()
|
||||
if status.State == sandboxstore.StateNotReady {
|
||||
continue
|
||||
}
|
||||
controller, err := c.sandboxService.SandboxController(sb.Config, sb.RuntimeHandler)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed to get sandbox controller while waiting sandbox")
|
||||
continue
|
||||
}
|
||||
exitCh := make(chan containerd.ExitStatus, 1)
|
||||
go func() {
|
||||
exit, err := controller.Wait(ctrdutil.NamespacedContext(), sb.ID)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed to wait for sandbox exit")
|
||||
exitCh <- *containerd.NewExitStatus(containerd.UnknownExitStatus, time.Time{}, err)
|
||||
}
|
||||
exitCh <- *containerd.NewExitStatus(exit.ExitStatus, exit.ExitedAt, nil)
|
||||
}()
|
||||
c.eventMonitor.startSandboxExitMonitor(context.Background(), sb.ID, exitCh)
|
||||
}
|
||||
// Recover all containers.
|
||||
containers, err := c.client.Containers(ctx, filterLabel(crilabels.ContainerKindLabel, crilabels.ContainerKindContainer))
|
||||
if err != nil {
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/containerd/go-cni"
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/typeurl/v2"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
@ -39,7 +40,6 @@ import (
|
||||
"github.com/containerd/containerd/v2/pkg/cri/util"
|
||||
"github.com/containerd/containerd/v2/pkg/netns"
|
||||
sb "github.com/containerd/containerd/v2/sandbox"
|
||||
"github.com/containerd/log"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -255,7 +255,6 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
|
||||
|
||||
ctrl, err := controller.Start(ctx, id)
|
||||
if err != nil {
|
||||
sandbox.Container, _ = c.client.LoadContainer(ctx, id)
|
||||
var cerr podsandbox.CleanupErr
|
||||
if errors.As(err, &cerr) {
|
||||
cleanupErr = fmt.Errorf("failed to cleanup sandbox: %w", cerr)
|
||||
@ -422,7 +421,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
|
||||
//
|
||||
// TaskOOM from containerd may come before sandbox is added to store,
|
||||
// but we don't care about sandbox TaskOOM right now, so it is fine.
|
||||
c.eventMonitor.startSandboxExitMonitor(context.Background(), id, ctrl.Pid, exitCh)
|
||||
c.eventMonitor.startSandboxExitMonitor(context.Background(), id, exitCh)
|
||||
|
||||
// Send CONTAINER_STARTED event with ContainerId equal to SandboxId.
|
||||
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
|
||||
|
@ -18,14 +18,15 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/v2/errdefs"
|
||||
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
"github.com/containerd/go-cni"
|
||||
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
"github.com/containerd/containerd/v2/errdefs"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/server/base"
|
||||
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
)
|
||||
|
||||
// PodSandboxStatus returns the status of the PodSandbox.
|
||||
@ -64,6 +65,12 @@ func (c *criService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandbox
|
||||
return nil, fmt.Errorf("failed to query controller status: %w", err)
|
||||
}
|
||||
state = runtime.PodSandboxState_SANDBOX_NOTREADY.String()
|
||||
if r.GetVerbose() {
|
||||
info, err = toDeletedCRISandboxInfo(sandbox)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
state = cstatus.State
|
||||
createdAt = cstatus.CreatedAt
|
||||
@ -104,26 +111,6 @@ func (c *criService) getIPs(sandbox sandboxstore.Sandbox) (string, []string, err
|
||||
return sandbox.IP, sandbox.AdditionalIPs, nil
|
||||
}
|
||||
|
||||
// SandboxInfo is extra information for sandbox.
|
||||
// TODO (mikebrow): discuss predefining constants structures for some or all of these field names in CRI
|
||||
type SandboxInfo struct {
|
||||
Pid uint32 `json:"pid"`
|
||||
Status string `json:"processStatus"`
|
||||
NetNSClosed bool `json:"netNamespaceClosed"`
|
||||
Image string `json:"image"`
|
||||
SnapshotKey string `json:"snapshotKey"`
|
||||
Snapshotter string `json:"snapshotter"`
|
||||
// Note: a new field `RuntimeHandler` has been added into the CRI PodSandboxStatus struct, and
|
||||
// should be set. This `RuntimeHandler` field will be deprecated after containerd 1.3 (tracked
|
||||
// in https://github.com/containerd/cri/issues/1064).
|
||||
RuntimeHandler string `json:"runtimeHandler"` // see the Note above
|
||||
RuntimeType string `json:"runtimeType"`
|
||||
RuntimeOptions interface{} `json:"runtimeOptions"`
|
||||
Config *runtime.PodSandboxConfig `json:"config"`
|
||||
RuntimeSpec *runtimespec.Spec `json:"runtimeSpec"`
|
||||
CNIResult *cni.Result `json:"cniResult"`
|
||||
}
|
||||
|
||||
// toCRISandboxStatus converts sandbox metadata into CRI pod sandbox status.
|
||||
func toCRISandboxStatus(meta sandboxstore.Metadata, status string, createdAt time.Time, ip string, additionalIPs []string) *runtime.PodSandboxStatus {
|
||||
// Set sandbox state to NOTREADY by default.
|
||||
@ -159,3 +146,40 @@ func toCRISandboxStatus(meta sandboxstore.Metadata, status string, createdAt tim
|
||||
RuntimeHandler: meta.RuntimeHandler,
|
||||
}
|
||||
}
|
||||
|
||||
// toDeletedCRISandboxInfo converts cached sandbox to CRI sandbox status response info map.
|
||||
// In most cases, controller.Status() with verbose=true should have SandboxInfo in the return,
|
||||
// but if controller.Status() returns a NotFound error,
|
||||
// we should fallback to get SandboxInfo from cached sandbox itself.
|
||||
func toDeletedCRISandboxInfo(sandbox sandboxstore.Sandbox) (map[string]string, error) {
|
||||
si := &base.SandboxInfo{
|
||||
Pid: sandbox.Status.Get().Pid,
|
||||
Config: sandbox.Config,
|
||||
RuntimeHandler: sandbox.RuntimeHandler,
|
||||
CNIResult: sandbox.CNIResult,
|
||||
}
|
||||
|
||||
// If processStatus is empty, it means that the task is deleted. Apply "deleted"
|
||||
// status which does not exist in containerd.
|
||||
si.Status = "deleted"
|
||||
|
||||
if sandbox.NetNS != nil {
|
||||
// Add network closed information if sandbox is not using host network.
|
||||
closed, err := sandbox.NetNS.Closed()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to check network namespace closed: %w", err)
|
||||
}
|
||||
si.NetNSClosed = closed
|
||||
}
|
||||
|
||||
si.Metadata = &sandbox.Metadata
|
||||
|
||||
infoBytes, err := json.Marshal(si)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal info %v: %w", si, err)
|
||||
}
|
||||
|
||||
return map[string]string{
|
||||
"info": string(infoBytes),
|
||||
}, nil
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"github.com/containerd/log"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
"github.com/containerd/containerd/v2/errdefs"
|
||||
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
|
||||
)
|
||||
|
||||
@ -74,9 +75,14 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa
|
||||
}
|
||||
|
||||
if err := controller.Stop(ctx, id); err != nil {
|
||||
// Log and ignore the error if controller already removed the sandbox
|
||||
if errdefs.IsNotFound(err) {
|
||||
log.G(ctx).Warnf("sandbox %q is not found when stopping it", id)
|
||||
} else {
|
||||
return fmt.Errorf("failed to stop sandbox %q: %w", id, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop)
|
||||
|
||||
|
@ -183,7 +183,7 @@ func NewCRIService(criBase *base.CRIBase, imageService imageService, client *con
|
||||
}
|
||||
|
||||
podSandboxController := client.SandboxController(string(criconfig.ModePodSandbox)).(*podsandbox.Controller)
|
||||
podSandboxController.Init(c.sandboxStore, c)
|
||||
podSandboxController.Init(c)
|
||||
|
||||
c.nri = nri
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user