sandbox: remove sandboxStore from podsandbox
Signed-off-by: Abel Feng <fshb1988@gmail.com>
This commit is contained in:
		| @@ -34,16 +34,6 @@ import ( | |||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	containerd "github.com/containerd/containerd/v2/client" |  | ||||||
| 	"github.com/containerd/containerd/v2/containers" |  | ||||||
| 	cri "github.com/containerd/containerd/v2/integration/cri-api/pkg/apis" |  | ||||||
| 	_ "github.com/containerd/containerd/v2/integration/images" // Keep this around to parse `imageListFile` command line var |  | ||||||
| 	"github.com/containerd/containerd/v2/integration/remote" |  | ||||||
| 	dialer "github.com/containerd/containerd/v2/integration/remote/util" |  | ||||||
| 	criconfig "github.com/containerd/containerd/v2/pkg/cri/config" |  | ||||||
| 	"github.com/containerd/containerd/v2/pkg/cri/constants" |  | ||||||
| 	"github.com/containerd/containerd/v2/pkg/cri/server" |  | ||||||
| 	"github.com/containerd/containerd/v2/pkg/cri/util" |  | ||||||
| 	"github.com/containerd/log" | 	"github.com/containerd/log" | ||||||
| 	"github.com/opencontainers/selinux/go-selinux" | 	"github.com/opencontainers/selinux/go-selinux" | ||||||
| 	"github.com/stretchr/testify/assert" | 	"github.com/stretchr/testify/assert" | ||||||
| @@ -52,6 +42,17 @@ import ( | |||||||
| 	"google.golang.org/grpc/credentials/insecure" | 	"google.golang.org/grpc/credentials/insecure" | ||||||
| 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | ||||||
| 	"k8s.io/klog/v2" | 	"k8s.io/klog/v2" | ||||||
|  |  | ||||||
|  | 	containerd "github.com/containerd/containerd/v2/client" | ||||||
|  | 	"github.com/containerd/containerd/v2/containers" | ||||||
|  | 	cri "github.com/containerd/containerd/v2/integration/cri-api/pkg/apis" | ||||||
|  | 	_ "github.com/containerd/containerd/v2/integration/images" // Keep this around to parse `imageListFile` command line var | ||||||
|  | 	"github.com/containerd/containerd/v2/integration/remote" | ||||||
|  | 	dialer "github.com/containerd/containerd/v2/integration/remote/util" | ||||||
|  | 	criconfig "github.com/containerd/containerd/v2/pkg/cri/config" | ||||||
|  | 	"github.com/containerd/containerd/v2/pkg/cri/constants" | ||||||
|  | 	"github.com/containerd/containerd/v2/pkg/cri/server/base" | ||||||
|  | 	"github.com/containerd/containerd/v2/pkg/cri/util" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| @@ -676,7 +677,7 @@ func CRIConfig() (*criconfig.Config, error) { | |||||||
| } | } | ||||||
|  |  | ||||||
| // SandboxInfo gets sandbox info. | // SandboxInfo gets sandbox info. | ||||||
| func SandboxInfo(id string) (*runtime.PodSandboxStatus, *server.SandboxInfo, error) { | func SandboxInfo(id string) (*runtime.PodSandboxStatus, *base.SandboxInfo, error) { | ||||||
| 	client, err := RawRuntimeClient() | 	client, err := RawRuntimeClient() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, nil, fmt.Errorf("failed to get raw runtime client: %w", err) | 		return nil, nil, fmt.Errorf("failed to get raw runtime client: %w", err) | ||||||
| @@ -689,7 +690,7 @@ func SandboxInfo(id string) (*runtime.PodSandboxStatus, *server.SandboxInfo, err | |||||||
| 		return nil, nil, fmt.Errorf("failed to get sandbox status: %w", err) | 		return nil, nil, fmt.Errorf("failed to get sandbox status: %w", err) | ||||||
| 	} | 	} | ||||||
| 	status := resp.GetStatus() | 	status := resp.GetStatus() | ||||||
| 	var info server.SandboxInfo | 	var info base.SandboxInfo | ||||||
| 	if err := json.Unmarshal([]byte(resp.GetInfo()["info"]), &info); err != nil { | 	if err := json.Unmarshal([]byte(resp.GetInfo()["info"]), &info); err != nil { | ||||||
| 		return nil, nil, fmt.Errorf("failed to unmarshal sandbox info: %w", err) | 		return nil, nil, fmt.Errorf("failed to unmarshal sandbox info: %w", err) | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -35,7 +35,7 @@ import ( | |||||||
| 	"github.com/stretchr/testify/require" | 	"github.com/stretchr/testify/require" | ||||||
| 	criapiv1 "k8s.io/cri-api/pkg/apis/runtime/v1" | 	criapiv1 "k8s.io/cri-api/pkg/apis/runtime/v1" | ||||||
|  |  | ||||||
| 	"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox" | 	"github.com/containerd/containerd/v2/pkg/cri/server/base" | ||||||
| 	"github.com/containerd/containerd/v2/pkg/failpoint" | 	"github.com/containerd/containerd/v2/pkg/failpoint" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -299,7 +299,7 @@ func TestRunPodSandboxAndTeardownCNISlow(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| // sbserverSandboxInfo gets sandbox info. | // sbserverSandboxInfo gets sandbox info. | ||||||
| func sbserverSandboxInfo(id string) (*criapiv1.PodSandboxStatus, *podsandbox.SandboxInfo, error) { | func sbserverSandboxInfo(id string) (*criapiv1.PodSandboxStatus, *base.SandboxInfo, error) { | ||||||
| 	client, err := RawRuntimeClient() | 	client, err := RawRuntimeClient() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, nil, fmt.Errorf("failed to get raw runtime client: %w", err) | 		return nil, nil, fmt.Errorf("failed to get raw runtime client: %w", err) | ||||||
| @@ -312,7 +312,7 @@ func sbserverSandboxInfo(id string) (*criapiv1.PodSandboxStatus, *podsandbox.San | |||||||
| 		return nil, nil, fmt.Errorf("failed to get sandbox status: %w", err) | 		return nil, nil, fmt.Errorf("failed to get sandbox status: %w", err) | ||||||
| 	} | 	} | ||||||
| 	status := resp.GetStatus() | 	status := resp.GetStatus() | ||||||
| 	var info podsandbox.SandboxInfo | 	var info base.SandboxInfo | ||||||
| 	if err := json.Unmarshal([]byte(resp.GetInfo()["info"]), &info); err != nil { | 	if err := json.Unmarshal([]byte(resp.GetInfo()["info"]), &info); err != nil { | ||||||
| 		return nil, nil, fmt.Errorf("failed to unmarshal sandbox info: %w", err) | 		return nil, nil, fmt.Errorf("failed to unmarshal sandbox info: %w", err) | ||||||
| 	} | 	} | ||||||
|   | |||||||
							
								
								
									
										47
									
								
								pkg/cri/server/base/sandbox_info.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								pkg/cri/server/base/sandbox_info.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,47 @@ | |||||||
|  | /* | ||||||
|  |    Copyright The containerd Authors. | ||||||
|  |  | ||||||
|  |    Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  |    you may not use this file except in compliance with the License. | ||||||
|  |    You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |        http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  |    Unless required by applicable law or agreed to in writing, software | ||||||
|  |    distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  |    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  |    See the License for the specific language governing permissions and | ||||||
|  |    limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package base | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"github.com/containerd/go-cni" | ||||||
|  | 	"github.com/opencontainers/runtime-spec/specs-go" | ||||||
|  | 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | ||||||
|  |  | ||||||
|  | 	"github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // SandboxInfo is extra information for sandbox. | ||||||
|  | // TODO (mikebrow): discuss predefining constants structures for some or all of these field names in CRI | ||||||
|  | type SandboxInfo struct { | ||||||
|  | 	Pid         uint32 `json:"pid"` | ||||||
|  | 	Status      string `json:"processStatus"` | ||||||
|  | 	NetNSClosed bool   `json:"netNamespaceClosed"` | ||||||
|  | 	Image       string `json:"image"` | ||||||
|  | 	SnapshotKey string `json:"snapshotKey"` | ||||||
|  | 	Snapshotter string `json:"snapshotter"` | ||||||
|  | 	// Note: a new field `RuntimeHandler` has been added into the CRI PodSandboxStatus struct, and | ||||||
|  | 	// should be set. This `RuntimeHandler` field will be deprecated after containerd 1.3 (tracked | ||||||
|  | 	// in https://github.com/containerd/cri/issues/1064). | ||||||
|  | 	RuntimeHandler string                    `json:"runtimeHandler"` // see the Note above | ||||||
|  | 	RuntimeType    string                    `json:"runtimeType"` | ||||||
|  | 	RuntimeOptions interface{}               `json:"runtimeOptions"` | ||||||
|  | 	Config         *runtime.PodSandboxConfig `json:"config"` | ||||||
|  | 	// Note: RuntimeSpec may not be populated if the sandbox has not been fully created. | ||||||
|  | 	RuntimeSpec *specs.Spec       `json:"runtimeSpec"` | ||||||
|  | 	CNIResult   *cni.Result       `json:"cniResult"` | ||||||
|  | 	Metadata    *sandbox.Metadata `json:"sandboxMetadata"` | ||||||
|  | } | ||||||
| @@ -23,6 +23,11 @@ import ( | |||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/containerd/log" | ||||||
|  | 	"github.com/containerd/typeurl/v2" | ||||||
|  | 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | ||||||
|  | 	"k8s.io/utils/clock" | ||||||
|  |  | ||||||
| 	eventtypes "github.com/containerd/containerd/v2/api/events" | 	eventtypes "github.com/containerd/containerd/v2/api/events" | ||||||
| 	apitasks "github.com/containerd/containerd/v2/api/services/tasks/v1" | 	apitasks "github.com/containerd/containerd/v2/api/services/tasks/v1" | ||||||
| 	containerdio "github.com/containerd/containerd/v2/cio" | 	containerdio "github.com/containerd/containerd/v2/cio" | ||||||
| @@ -34,10 +39,6 @@ import ( | |||||||
| 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | ||||||
| 	ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" | 	ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" | ||||||
| 	"github.com/containerd/containerd/v2/protobuf" | 	"github.com/containerd/containerd/v2/protobuf" | ||||||
| 	"github.com/containerd/log" |  | ||||||
| 	"github.com/containerd/typeurl/v2" |  | ||||||
| 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" |  | ||||||
| 	"k8s.io/utils/clock" |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| @@ -108,7 +109,7 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) { | |||||||
| } | } | ||||||
|  |  | ||||||
| // startSandboxExitMonitor starts an exit monitor for a given sandbox. | // startSandboxExitMonitor starts an exit monitor for a given sandbox. | ||||||
| func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} { | func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, exitCh <-chan containerd.ExitStatus) <-chan struct{} { | ||||||
| 	stopCh := make(chan struct{}) | 	stopCh := make(chan struct{}) | ||||||
| 	go func() { | 	go func() { | ||||||
| 		defer close(stopCh) | 		defer close(stopCh) | ||||||
|   | |||||||
| @@ -34,8 +34,8 @@ import ( | |||||||
| 	"github.com/containerd/containerd/v2/pkg/cri/constants" | 	"github.com/containerd/containerd/v2/pkg/cri/constants" | ||||||
| 	"github.com/containerd/containerd/v2/pkg/cri/server/base" | 	"github.com/containerd/containerd/v2/pkg/cri/server/base" | ||||||
| 	"github.com/containerd/containerd/v2/pkg/cri/server/images" | 	"github.com/containerd/containerd/v2/pkg/cri/server/images" | ||||||
|  | 	"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types" | ||||||
| 	imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image" | 	imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image" | ||||||
| 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" |  | ||||||
| 	ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" | 	ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" | ||||||
| 	osinterface "github.com/containerd/containerd/v2/pkg/os" | 	osinterface "github.com/containerd/containerd/v2/pkg/os" | ||||||
| 	"github.com/containerd/containerd/v2/platforms" | 	"github.com/containerd/containerd/v2/platforms" | ||||||
| @@ -116,8 +116,6 @@ type Controller struct { | |||||||
| 	client *containerd.Client | 	client *containerd.Client | ||||||
| 	// imageService is a dependency to CRI image service. | 	// imageService is a dependency to CRI image service. | ||||||
| 	imageService ImageService | 	imageService ImageService | ||||||
| 	// sandboxStore stores all resources associated with sandboxes. |  | ||||||
| 	sandboxStore *sandboxstore.Store |  | ||||||
| 	// os is an interface for all required os operations. | 	// os is an interface for all required os operations. | ||||||
| 	os osinterface.OS | 	os osinterface.OS | ||||||
| 	// cri is CRI service that provides missing gaps needed by controller. | 	// cri is CRI service that provides missing gaps needed by controller. | ||||||
| @@ -129,11 +127,9 @@ type Controller struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (c *Controller) Init( | func (c *Controller) Init( | ||||||
| 	sandboxStore *sandboxstore.Store, |  | ||||||
| 	cri CRIService, | 	cri CRIService, | ||||||
| ) { | ) { | ||||||
| 	c.cri = cri | 	c.cri = cri | ||||||
| 	c.sandboxStore = sandboxStore |  | ||||||
| } | } | ||||||
|  |  | ||||||
| var _ sandbox.Controller = (*Controller)(nil) | var _ sandbox.Controller = (*Controller)(nil) | ||||||
| @@ -143,63 +139,46 @@ func (c *Controller) Platform(_ctx context.Context, _sandboxID string) (platform | |||||||
| } | } | ||||||
|  |  | ||||||
| func (c *Controller) Wait(ctx context.Context, sandboxID string) (sandbox.ExitStatus, error) { | func (c *Controller) Wait(ctx context.Context, sandboxID string) (sandbox.ExitStatus, error) { | ||||||
| 	status := c.store.Get(sandboxID) | 	podSandbox := c.store.Get(sandboxID) | ||||||
| 	if status == nil { | 	if podSandbox == nil { | ||||||
| 		return sandbox.ExitStatus{}, fmt.Errorf("failed to get exit channel. %q", sandboxID) | 		return sandbox.ExitStatus{}, fmt.Errorf("failed to get exit channel. %q", sandboxID) | ||||||
|  |  | ||||||
|  | 	} | ||||||
|  | 	exit, err := podSandbox.Wait(ctx) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return sandbox.ExitStatus{}, fmt.Errorf("failed to wait pod sandbox, %w", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	exitStatus, exitedAt, err := c.waitSandboxExit(ctx, sandboxID, status.Waiter) |  | ||||||
|  |  | ||||||
| 	return sandbox.ExitStatus{ | 	return sandbox.ExitStatus{ | ||||||
| 		ExitStatus: exitStatus, | 		ExitStatus: exit.ExitCode(), | ||||||
| 		ExitedAt:   exitedAt, | 		ExitedAt:   exit.ExitTime(), | ||||||
| 	}, err | 	}, err | ||||||
|  |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c *Controller) waitSandboxExit(ctx context.Context, id string, exitCh <-chan containerd.ExitStatus) (exitStatus uint32, exitedAt time.Time, err error) { | func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, exitCh <-chan containerd.ExitStatus) (exitStatus uint32, exitedAt time.Time, err error) { | ||||||
| 	exitStatus = unknownExitCode |  | ||||||
| 	exitedAt = time.Now() |  | ||||||
| 	select { | 	select { | ||||||
| 	case exitRes := <-exitCh: | 	case e := <-exitCh: | ||||||
| 		log.G(ctx).Debugf("received sandbox exit %+v", exitRes) | 		exitStatus, exitedAt, err = e.Result() | ||||||
|  |  | ||||||
| 		exitStatus, exitedAt, err = exitRes.Result() |  | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			log.G(ctx).WithError(err).Errorf("failed to get task exit status for %q", id) | 			log.G(ctx).WithError(err).Errorf("failed to get task exit status for %q", p.ID) | ||||||
| 			exitStatus = unknownExitCode | 			exitStatus = unknownExitCode | ||||||
| 			exitedAt = time.Now() | 			exitedAt = time.Now() | ||||||
| 		} | 		} | ||||||
|  | 		dctx := ctrdutil.NamespacedContext() | ||||||
| 		err = func() error { | 		dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout) | ||||||
| 			dctx := ctrdutil.NamespacedContext() | 		defer dcancel() | ||||||
| 			dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout) | 		event := &eventtypes.TaskExit{ExitStatus: exitStatus, ExitedAt: protobuf.ToTimestamp(exitedAt)} | ||||||
| 			defer dcancel() | 		if cleanErr := handleSandboxTaskExit(dctx, p, event); cleanErr != nil { | ||||||
|  | 			c.cri.BackOffEvent(p.ID, e) | ||||||
| 			sb, err := c.sandboxStore.Get(id) |  | ||||||
| 			if err == nil { |  | ||||||
| 				if err := handleSandboxExit(dctx, sb, &eventtypes.TaskExit{ExitStatus: exitStatus, ExitedAt: protobuf.ToTimestamp(exitedAt)}); err != nil { |  | ||||||
| 					return err |  | ||||||
| 				} |  | ||||||
| 				return nil |  | ||||||
| 			} else if !errdefs.IsNotFound(err) { |  | ||||||
| 				return fmt.Errorf("failed to get sandbox %s: %w", id, err) |  | ||||||
| 			} |  | ||||||
| 			return nil |  | ||||||
| 		}() |  | ||||||
| 		if err != nil { |  | ||||||
| 			log.G(ctx).WithError(err).Errorf("failed to handle sandbox TaskExit %s", id) |  | ||||||
| 			// Don't backoff, the caller is responsible for. |  | ||||||
| 			return |  | ||||||
| 		} | 		} | ||||||
|  | 		return | ||||||
| 	case <-ctx.Done(): | 	case <-ctx.Done(): | ||||||
| 		return exitStatus, exitedAt, ctx.Err() | 		return unknownExitCode, time.Now(), ctx.Err() | ||||||
| 	} | 	} | ||||||
| 	return |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // handleSandboxExit handles TaskExit event for sandbox. | // handleSandboxTaskExit handles TaskExit event for sandbox. | ||||||
| // TODO https://github.com/containerd/containerd/issues/7548 | func handleSandboxTaskExit(ctx context.Context, sb *types.PodSandbox, e *eventtypes.TaskExit) error { | ||||||
| func handleSandboxExit(ctx context.Context, sb sandboxstore.Sandbox, e *eventtypes.TaskExit) error { |  | ||||||
| 	// No stream attached to sandbox container. | 	// No stream attached to sandbox container. | ||||||
| 	task, err := sb.Container.Task(ctx, nil) | 	task, err := sb.Container.Task(ctx, nil) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @@ -212,17 +191,7 @@ func handleSandboxExit(ctx context.Context, sb sandboxstore.Sandbox, e *eventtyp | |||||||
| 			if !errdefs.IsNotFound(err) { | 			if !errdefs.IsNotFound(err) { | ||||||
| 				return fmt.Errorf("failed to stop sandbox: %w", err) | 				return fmt.Errorf("failed to stop sandbox: %w", err) | ||||||
| 			} | 			} | ||||||
| 			// Move on to make sure container status is updated. |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { |  | ||||||
| 		status.State = sandboxstore.StateNotReady |  | ||||||
| 		status.Pid = 0 |  | ||||||
| 		status.ExitStatus = e.ExitStatus |  | ||||||
| 		status.ExitedAt = e.ExitedAt.AsTime() |  | ||||||
| 		return status, nil |  | ||||||
| 	}) |  | ||||||
| 	// Using channel to propagate the information of sandbox stop |  | ||||||
| 	sb.Stop() |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|   | |||||||
| @@ -21,11 +21,13 @@ import ( | |||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/stretchr/testify/assert" | ||||||
|  |  | ||||||
|  | 	containerd "github.com/containerd/containerd/v2/client" | ||||||
| 	criconfig "github.com/containerd/containerd/v2/pkg/cri/config" | 	criconfig "github.com/containerd/containerd/v2/pkg/cri/config" | ||||||
| 	"github.com/containerd/containerd/v2/pkg/cri/store/label" | 	"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types" | ||||||
| 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | ||||||
| 	ostesting "github.com/containerd/containerd/v2/pkg/os/testing" | 	ostesting "github.com/containerd/containerd/v2/pkg/os/testing" | ||||||
| 	"github.com/stretchr/testify/assert" |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| @@ -48,11 +50,10 @@ var testConfig = criconfig.Config{ | |||||||
|  |  | ||||||
| // newControllerService creates a fake criService for test. | // newControllerService creates a fake criService for test. | ||||||
| func newControllerService() *Controller { | func newControllerService() *Controller { | ||||||
| 	labels := label.NewStore() |  | ||||||
| 	return &Controller{ | 	return &Controller{ | ||||||
| 		config:       testConfig, | 		config: testConfig, | ||||||
| 		os:           ostesting.NewFakeOS(), | 		os:     ostesting.NewFakeOS(), | ||||||
| 		sandboxStore: sandboxstore.NewStore(labels), | 		store:  NewStore(), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -60,20 +61,14 @@ func Test_Status(t *testing.T) { | |||||||
| 	sandboxID, pid, exitStatus := "1", uint32(1), uint32(0) | 	sandboxID, pid, exitStatus := "1", uint32(1), uint32(0) | ||||||
| 	createdAt, exitedAt := time.Now(), time.Now() | 	createdAt, exitedAt := time.Now(), time.Now() | ||||||
| 	controller := newControllerService() | 	controller := newControllerService() | ||||||
| 	status := sandboxstore.Status{ |  | ||||||
| 		Pid:        pid, | 	sb := types.NewPodSandbox(sandboxID, sandboxstore.Status{ | ||||||
| 		CreatedAt:  createdAt, | 		State:     sandboxstore.StateReady, | ||||||
| 		ExitStatus: exitStatus, | 		Pid:       pid, | ||||||
| 		ExitedAt:   exitedAt, | 		CreatedAt: createdAt, | ||||||
| 		State:      sandboxstore.StateReady, | 	}) | ||||||
| 	} | 	sb.Metadata = sandboxstore.Metadata{ID: sandboxID} | ||||||
| 	sb := sandboxstore.Sandbox{ | 	err := controller.store.Save(sb) | ||||||
| 		Metadata: sandboxstore.Metadata{ |  | ||||||
| 			ID: sandboxID, |  | ||||||
| 		}, |  | ||||||
| 		Status: sandboxstore.StoreStatus(status), |  | ||||||
| 	} |  | ||||||
| 	err := controller.sandboxStore.Add(sb) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| @@ -82,6 +77,20 @@ func Test_Status(t *testing.T) { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	assert.Equal(t, s.Pid, pid) | 	assert.Equal(t, s.Pid, pid) | ||||||
| 	assert.Equal(t, s.ExitedAt, exitedAt) | 	assert.Equal(t, s.CreatedAt, createdAt) | ||||||
| 	assert.Equal(t, s.State, sandboxstore.StateReady.String()) | 	assert.Equal(t, s.State, sandboxstore.StateReady.String()) | ||||||
|  |  | ||||||
|  | 	sb.Exit(*containerd.NewExitStatus(exitStatus, exitedAt, nil)) | ||||||
|  | 	exit, err := controller.Wait(context.Background(), sandboxID) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatal(err) | ||||||
|  | 	} | ||||||
|  | 	assert.Equal(t, exit.ExitStatus, exitStatus) | ||||||
|  | 	assert.Equal(t, exit.ExitedAt, exitedAt) | ||||||
|  |  | ||||||
|  | 	s, err = controller.Status(context.Background(), sandboxID, false) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatal(err) | ||||||
|  | 	} | ||||||
|  | 	assert.Equal(t, s.State, sandboxstore.StateNotReady.String()) | ||||||
| } | } | ||||||
|   | |||||||
| @@ -23,6 +23,12 @@ import ( | |||||||
| 	"path/filepath" | 	"path/filepath" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/containerd/log" | ||||||
|  | 	"github.com/containerd/typeurl/v2" | ||||||
|  | 	docker "github.com/distribution/reference" | ||||||
|  | 	imagedigest "github.com/opencontainers/go-digest" | ||||||
|  | 	runtimespec "github.com/opencontainers/runtime-spec/specs-go" | ||||||
|  |  | ||||||
| 	containerd "github.com/containerd/containerd/v2/client" | 	containerd "github.com/containerd/containerd/v2/client" | ||||||
| 	"github.com/containerd/containerd/v2/containers" | 	"github.com/containerd/containerd/v2/containers" | ||||||
| 	clabels "github.com/containerd/containerd/v2/labels" | 	clabels "github.com/containerd/containerd/v2/labels" | ||||||
| @@ -30,12 +36,8 @@ import ( | |||||||
| 	criconfig "github.com/containerd/containerd/v2/pkg/cri/config" | 	criconfig "github.com/containerd/containerd/v2/pkg/cri/config" | ||||||
| 	crilabels "github.com/containerd/containerd/v2/pkg/cri/labels" | 	crilabels "github.com/containerd/containerd/v2/pkg/cri/labels" | ||||||
| 	imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image" | 	imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image" | ||||||
|  | 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | ||||||
| 	ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" | 	ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" | ||||||
| 	"github.com/containerd/log" |  | ||||||
| 	docker "github.com/distribution/reference" |  | ||||||
| 	runtimespec "github.com/opencontainers/runtime-spec/specs-go" |  | ||||||
|  |  | ||||||
| 	imagedigest "github.com/opencontainers/go-digest" |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| @@ -196,3 +198,24 @@ func (c *Controller) runtimeSnapshotter(ctx context.Context, ociRuntime criconfi | |||||||
| 	log.G(ctx).Debugf("Set snapshotter for runtime %s to %s", ociRuntime.Type, ociRuntime.Snapshotter) | 	log.G(ctx).Debugf("Set snapshotter for runtime %s to %s", ociRuntime.Type, ociRuntime.Snapshotter) | ||||||
| 	return ociRuntime.Snapshotter | 	return ociRuntime.Snapshotter | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func getMetadata(ctx context.Context, container containerd.Container) (*sandboxstore.Metadata, error) { | ||||||
|  | 	// Load sandbox metadata. | ||||||
|  | 	exts, err := container.Extensions(ctx) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, fmt.Errorf("failed to get sandbox container extensions: %w", err) | ||||||
|  | 	} | ||||||
|  | 	ext, ok := exts[crilabels.SandboxMetadataExtension] | ||||||
|  | 	if !ok { | ||||||
|  | 		return nil, fmt.Errorf("metadata extension %q not found", crilabels.SandboxMetadataExtension) | ||||||
|  | 	} | ||||||
|  | 	data, err := typeurl.UnmarshalAny(ext) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, fmt.Errorf("failed to unmarshal metadata extension %q: %w", ext, err) | ||||||
|  | 	} | ||||||
|  | 	meta, ok := data.(*sandboxstore.Metadata) | ||||||
|  | 	if !ok { | ||||||
|  | 		return nil, fmt.Errorf("failed to convert the extension to sandbox metadata") | ||||||
|  | 	} | ||||||
|  | 	return meta, nil | ||||||
|  | } | ||||||
|   | |||||||
| @@ -22,16 +22,16 @@ import ( | |||||||
| 	goruntime "runtime" | 	goruntime "runtime" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/containerd/containerd/v2/pkg/netns" | 	"github.com/containerd/log" | ||||||
| 	"github.com/containerd/typeurl/v2" |  | ||||||
| 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | ||||||
|  |  | ||||||
| 	containerd "github.com/containerd/containerd/v2/client" | 	containerd "github.com/containerd/containerd/v2/client" | ||||||
| 	"github.com/containerd/containerd/v2/errdefs" | 	"github.com/containerd/containerd/v2/errdefs" | ||||||
| 	crilabels "github.com/containerd/containerd/v2/pkg/cri/labels" | 	"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types" | ||||||
| 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | ||||||
| 	ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" | 	ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" | ||||||
| 	"github.com/containerd/log" | 	"github.com/containerd/containerd/v2/pkg/netns" | ||||||
|  | 	sandbox2 "github.com/containerd/containerd/v2/sandbox" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // loadContainerTimeout is the default timeout for loading a container/sandbox. | // loadContainerTimeout is the default timeout for loading a container/sandbox. | ||||||
| @@ -51,36 +51,29 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta | |||||||
| 	ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout) | 	ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout) | ||||||
| 	defer cancel() | 	defer cancel() | ||||||
| 	var sandbox sandboxstore.Sandbox | 	var sandbox sandboxstore.Sandbox | ||||||
| 	// Load sandbox metadata. | 	meta, err := getMetadata(ctx, cntr) | ||||||
| 	exts, err := cntr.Extensions(ctx) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return sandbox, fmt.Errorf("failed to get sandbox container extensions: %w", err) | 		return sandbox, err | ||||||
| 	} | 	} | ||||||
| 	ext, ok := exts[crilabels.SandboxMetadataExtension] |  | ||||||
| 	if !ok { |  | ||||||
| 		return sandbox, fmt.Errorf("metadata extension %q not found", crilabels.SandboxMetadataExtension) |  | ||||||
| 	} |  | ||||||
| 	data, err := typeurl.UnmarshalAny(ext) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %w", ext, err) |  | ||||||
| 	} |  | ||||||
| 	meta := data.(*sandboxstore.Metadata) |  | ||||||
|  |  | ||||||
| 	s, err := func() (sandboxstore.Status, error) { | 	// Load sandbox created timestamp. | ||||||
|  | 	info, err := cntr.Info(ctx) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return sandbox, fmt.Errorf("failed to get sandbox container info: %w", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	s, ch, err := func() (sandboxstore.Status, <-chan containerd.ExitStatus, error) { | ||||||
| 		status := sandboxstore.Status{ | 		status := sandboxstore.Status{ | ||||||
| 			State: sandboxstore.StateUnknown, | 			State: sandboxstore.StateUnknown, | ||||||
| 		} | 		} | ||||||
| 		// Load sandbox created timestamp. | 		var channel <-chan containerd.ExitStatus | ||||||
| 		info, err := cntr.Info(ctx) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return status, fmt.Errorf("failed to get sandbox container info: %w", err) |  | ||||||
| 		} |  | ||||||
| 		status.CreatedAt = info.CreatedAt | 		status.CreatedAt = info.CreatedAt | ||||||
|  |  | ||||||
| 		// Load sandbox state. | 		// Load sandbox state. | ||||||
| 		t, err := cntr.Task(ctx, nil) | 		t, err := cntr.Task(ctx, nil) | ||||||
| 		if err != nil && !errdefs.IsNotFound(err) { | 		if err != nil && !errdefs.IsNotFound(err) { | ||||||
| 			return status, fmt.Errorf("failed to load task: %w", err) | 			return status, channel, fmt.Errorf("failed to load task: %w", err) | ||||||
| 		} | 		} | ||||||
| 		var taskStatus containerd.Status | 		var taskStatus containerd.Status | ||||||
| 		var notFound bool | 		var notFound bool | ||||||
| @@ -93,7 +86,7 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta | |||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				// It's still possible that task is deleted during this window. | 				// It's still possible that task is deleted during this window. | ||||||
| 				if !errdefs.IsNotFound(err) { | 				if !errdefs.IsNotFound(err) { | ||||||
| 					return status, fmt.Errorf("failed to get task status: %w", err) | 					return status, channel, fmt.Errorf("failed to get task status: %w", err) | ||||||
| 				} | 				} | ||||||
| 				notFound = true | 				notFound = true | ||||||
| 			} | 			} | ||||||
| @@ -103,37 +96,48 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta | |||||||
| 			status.State = sandboxstore.StateNotReady | 			status.State = sandboxstore.StateNotReady | ||||||
| 		} else { | 		} else { | ||||||
| 			if taskStatus.Status == containerd.Running { | 			if taskStatus.Status == containerd.Running { | ||||||
| 				// Wait for the task for sandbox monitor. | 				status.State = sandboxstore.StateReady | ||||||
| 				// wait is a long running background request, no timeout needed. | 				status.Pid = t.Pid() | ||||||
| 				exitCh, err := t.Wait(ctrdutil.NamespacedContext()) | 				exitCh, err := t.Wait(ctrdutil.NamespacedContext()) | ||||||
| 				if err != nil { | 				if err != nil { | ||||||
| 					if !errdefs.IsNotFound(err) { | 					return status, channel, fmt.Errorf("failed to wait for sandbox container task: %w", err) | ||||||
| 						return status, fmt.Errorf("failed to wait for task: %w", err) |  | ||||||
| 					} |  | ||||||
| 					status.State = sandboxstore.StateNotReady |  | ||||||
| 				} else { |  | ||||||
| 					// Task is running, set sandbox state as READY. |  | ||||||
| 					status.State = sandboxstore.StateReady |  | ||||||
| 					status.Pid = t.Pid() |  | ||||||
|  |  | ||||||
| 					go func() { |  | ||||||
| 						c.waitSandboxExit(context.Background(), meta.ID, exitCh) |  | ||||||
| 					}() |  | ||||||
| 				} | 				} | ||||||
|  | 				channel = exitCh | ||||||
| 			} else { | 			} else { | ||||||
| 				// Task is not running. Delete the task and set sandbox state as NOTREADY. | 				// Task is not running. Delete the task and set sandbox state as NOTREADY. | ||||||
| 				if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { | 				if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { | ||||||
| 					return status, fmt.Errorf("failed to delete task: %w", err) | 					return status, channel, fmt.Errorf("failed to delete task: %w", err) | ||||||
| 				} | 				} | ||||||
| 				status.State = sandboxstore.StateNotReady | 				status.State = sandboxstore.StateNotReady | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		return status, nil | 		return status, channel, nil | ||||||
| 	}() | 	}() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID()) | 		log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// save it to cache in the podsandbox controller | ||||||
|  | 	podSandbox := types.NewPodSandbox(cntr.ID(), s) | ||||||
|  | 	podSandbox.Container = cntr | ||||||
|  | 	if meta != nil { | ||||||
|  | 		podSandbox.Metadata = *meta | ||||||
|  | 	} | ||||||
|  | 	podSandbox.Runtime = sandbox2.RuntimeOpts{ | ||||||
|  | 		Name:    info.Runtime.Name, | ||||||
|  | 		Options: info.Runtime.Options, | ||||||
|  | 	} | ||||||
|  | 	if ch != nil { | ||||||
|  | 		go func() { | ||||||
|  | 			code, exitTime, err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, ch) | ||||||
|  | 			podSandbox.Exit(*containerd.NewExitStatus(code, exitTime, err)) | ||||||
|  | 		}() | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := c.store.Save(podSandbox); err != nil { | ||||||
|  | 		return sandbox, fmt.Errorf("failed to save pod sandbox container in mem store: %w", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	sandbox = sandboxstore.NewSandbox(*meta, s) | 	sandbox = sandboxstore.NewSandbox(*meta, s) | ||||||
| 	sandbox.Container = cntr | 	sandbox.Container = cntr | ||||||
|  |  | ||||||
|   | |||||||
| @@ -27,11 +27,8 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| func (c *Controller) Shutdown(ctx context.Context, sandboxID string) error { | func (c *Controller) Shutdown(ctx context.Context, sandboxID string) error { | ||||||
| 	sandbox, err := c.sandboxStore.Get(sandboxID) | 	sandbox := c.store.Get(sandboxID) | ||||||
| 	if err != nil { | 	if sandbox == nil { | ||||||
| 		if !errdefs.IsNotFound(err) { |  | ||||||
| 			return fmt.Errorf("an error occurred when try to find sandbox %q: %w", sandboxID, err) |  | ||||||
| 		} |  | ||||||
| 		// Do not return error if the id doesn't exist. | 		// Do not return error if the id doesn't exist. | ||||||
| 		log.G(ctx).Tracef("Sandbox controller Delete called for sandbox %q that does not exist", sandboxID) | 		log.G(ctx).Tracef("Sandbox controller Delete called for sandbox %q that does not exist", sandboxID) | ||||||
| 		return nil | 		return nil | ||||||
| @@ -62,6 +59,8 @@ func (c *Controller) Shutdown(ctx context.Context, sandboxID string) error { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	c.store.Remove(sandboxID) | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -34,6 +34,7 @@ import ( | |||||||
| 	"github.com/containerd/containerd/v2/errdefs" | 	"github.com/containerd/containerd/v2/errdefs" | ||||||
| 	crilabels "github.com/containerd/containerd/v2/pkg/cri/labels" | 	crilabels "github.com/containerd/containerd/v2/pkg/cri/labels" | ||||||
| 	customopts "github.com/containerd/containerd/v2/pkg/cri/opts" | 	customopts "github.com/containerd/containerd/v2/pkg/cri/opts" | ||||||
|  | 	"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types" | ||||||
| 	imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image" | 	imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image" | ||||||
| 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | ||||||
| 	ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" | 	ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" | ||||||
| @@ -62,16 +63,11 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll | |||||||
| 			retErr = errors.Join(retErr, CleanupErr{cleanupErr}) | 			retErr = errors.Join(retErr, CleanupErr{cleanupErr}) | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  | 	podSandbox := c.store.Get(id) | ||||||
| 	sandboxInfo, err := c.client.SandboxStore().Get(ctx, id) | 	if podSandbox == nil { | ||||||
| 	if err != nil { | 		return cin, fmt.Errorf("unable to find pod sandbox with id %q: %w", id, errdefs.ErrNotFound) | ||||||
| 		return cin, fmt.Errorf("unable to find sandbox with id %q: %w", id, err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	var metadata sandboxstore.Metadata |  | ||||||
| 	if err := sandboxInfo.GetExtension(MetadataKey, &metadata); err != nil { |  | ||||||
| 		return cin, fmt.Errorf("failed to get sandbox %q metadata: %w", id, err) |  | ||||||
| 	} | 	} | ||||||
|  | 	metadata := podSandbox.Metadata | ||||||
|  |  | ||||||
| 	var ( | 	var ( | ||||||
| 		config = metadata.Config | 		config = metadata.Config | ||||||
| @@ -147,13 +143,14 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll | |||||||
| 		containerd.WithSpec(spec, specOpts...), | 		containerd.WithSpec(spec, specOpts...), | ||||||
| 		containerd.WithContainerLabels(sandboxLabels), | 		containerd.WithContainerLabels(sandboxLabels), | ||||||
| 		containerd.WithContainerExtension(crilabels.SandboxMetadataExtension, &metadata), | 		containerd.WithContainerExtension(crilabels.SandboxMetadataExtension, &metadata), | ||||||
| 		containerd.WithRuntime(ociRuntime.Type, sandboxInfo.Runtime.Options), | 		containerd.WithRuntime(ociRuntime.Type, podSandbox.Runtime.Options), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	container, err := c.client.NewContainer(ctx, id, opts...) | 	container, err := c.client.NewContainer(ctx, id, opts...) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return cin, fmt.Errorf("failed to create containerd container: %w", err) | 		return cin, fmt.Errorf("failed to create containerd container: %w", err) | ||||||
| 	} | 	} | ||||||
|  | 	podSandbox.Container = container | ||||||
| 	defer func() { | 	defer func() { | ||||||
| 		if retErr != nil && cleanupErr == nil { | 		if retErr != nil && cleanupErr == nil { | ||||||
| 			deferCtx, deferCancel := ctrdutil.DeferContext() | 			deferCtx, deferCancel := ctrdutil.DeferContext() | ||||||
| @@ -161,6 +158,7 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll | |||||||
| 			if cleanupErr = container.Delete(deferCtx, containerd.WithSnapshotCleanup); cleanupErr != nil { | 			if cleanupErr = container.Delete(deferCtx, containerd.WithSnapshotCleanup); cleanupErr != nil { | ||||||
| 				log.G(ctx).WithError(cleanupErr).Errorf("Failed to delete containerd container %q", id) | 				log.G(ctx).WithError(cleanupErr).Errorf("Failed to delete containerd container %q", id) | ||||||
| 			} | 			} | ||||||
|  | 			podSandbox.Container = nil | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| @@ -213,6 +211,7 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return cin, fmt.Errorf("failed to get sandbox container info: %w", err) | 		return cin, fmt.Errorf("failed to get sandbox container info: %w", err) | ||||||
| 	} | 	} | ||||||
|  | 	podSandbox.CreatedAt = info.CreatedAt | ||||||
|  |  | ||||||
| 	// Create sandbox task in containerd. | 	// Create sandbox task in containerd. | ||||||
| 	log.G(ctx).Tracef("Create sandbox container (id=%q, name=%q).", id, metadata.Name) | 	log.G(ctx).Tracef("Create sandbox container (id=%q, name=%q).", id, metadata.Name) | ||||||
| @@ -238,13 +237,13 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  | 	podSandbox.Pid = task.Pid() | ||||||
|  |  | ||||||
| 	// wait is a long running background request, no timeout needed. | 	// wait is a long running background request, no timeout needed. | ||||||
| 	exitCh, err := task.Wait(ctrdutil.NamespacedContext()) | 	exitCh, err := task.Wait(ctrdutil.NamespacedContext()) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return cin, fmt.Errorf("failed to wait for sandbox container task: %w", err) | 		return cin, fmt.Errorf("failed to wait for sandbox container task: %w", err) | ||||||
| 	} | 	} | ||||||
| 	c.store.Save(id, exitCh) |  | ||||||
|  |  | ||||||
| 	nric, err := nri.New() | 	nric, err := nri.New() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @@ -263,18 +262,30 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll | |||||||
| 	if err := task.Start(ctx); err != nil { | 	if err := task.Start(ctx); err != nil { | ||||||
| 		return cin, fmt.Errorf("failed to start sandbox container task %q: %w", id, err) | 		return cin, fmt.Errorf("failed to start sandbox container task %q: %w", id, err) | ||||||
| 	} | 	} | ||||||
|  | 	podSandbox.State = sandboxstore.StateReady | ||||||
|  |  | ||||||
| 	cin.SandboxID = id | 	cin.SandboxID = id | ||||||
| 	cin.Pid = task.Pid() | 	cin.Pid = task.Pid() | ||||||
| 	cin.CreatedAt = info.CreatedAt | 	cin.CreatedAt = info.CreatedAt | ||||||
| 	cin.Labels = labels | 	cin.Labels = labels | ||||||
|  |  | ||||||
|  | 	go func() { | ||||||
|  | 		code, exitTime, err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, exitCh) | ||||||
|  | 		podSandbox.Exit(*containerd.NewExitStatus(code, exitTime, err)) | ||||||
|  | 	}() | ||||||
|  |  | ||||||
| 	return | 	return | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c *Controller) Create(ctx context.Context, _info sandbox.Sandbox, _ ...sandbox.CreateOpt) error { | func (c *Controller) Create(_ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error { | ||||||
| 	// Not used by pod-sandbox implementation as there is no need to split pause containers logic. | 	metadata := sandboxstore.Metadata{} | ||||||
| 	return nil | 	if err := info.GetExtension(MetadataKey, &metadata); err != nil { | ||||||
|  | 		return fmt.Errorf("failed to get sandbox %q metadata: %w", info.ID, err) | ||||||
|  | 	} | ||||||
|  | 	podSandbox := types.NewPodSandbox(info.ID, sandboxstore.Status{State: sandboxstore.StateUnknown}) | ||||||
|  | 	podSandbox.Metadata = metadata | ||||||
|  | 	podSandbox.Runtime = info.Runtime | ||||||
|  | 	return c.store.Save(podSandbox) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c *Controller) ensureImageExists(ctx context.Context, ref string, config *runtime.PodSandboxConfig) (*imagestore.Image, error) { | func (c *Controller) ensureImageExists(ctx context.Context, ref string, config *runtime.PodSandboxConfig) (*imagestore.Image, error) { | ||||||
|   | |||||||
| @@ -21,57 +21,32 @@ import ( | |||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  |  | ||||||
|  | 	"github.com/containerd/typeurl/v2" | ||||||
|  |  | ||||||
| 	containerd "github.com/containerd/containerd/v2/client" | 	containerd "github.com/containerd/containerd/v2/client" | ||||||
| 	"github.com/containerd/containerd/v2/containers" | 	"github.com/containerd/containerd/v2/containers" | ||||||
| 	"github.com/containerd/containerd/v2/errdefs" | 	"github.com/containerd/containerd/v2/errdefs" | ||||||
| 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | 	"github.com/containerd/containerd/v2/pkg/cri/server/base" | ||||||
|  | 	"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types" | ||||||
| 	"github.com/containerd/containerd/v2/sandbox" | 	"github.com/containerd/containerd/v2/sandbox" | ||||||
| 	"github.com/containerd/go-cni" |  | ||||||
| 	"github.com/containerd/typeurl/v2" |  | ||||||
| 	runtimespec "github.com/opencontainers/runtime-spec/specs-go" |  | ||||||
| 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // SandboxInfo is extra information for sandbox. |  | ||||||
| // TODO (mikebrow): discuss predefining constants structures for some or all of these field names in CRI |  | ||||||
| type SandboxInfo struct { |  | ||||||
| 	Pid         uint32 `json:"pid"` |  | ||||||
| 	Status      string `json:"processStatus"` |  | ||||||
| 	NetNSClosed bool   `json:"netNamespaceClosed"` |  | ||||||
| 	Image       string `json:"image"` |  | ||||||
| 	SnapshotKey string `json:"snapshotKey"` |  | ||||||
| 	Snapshotter string `json:"snapshotter"` |  | ||||||
| 	// Note: a new field `RuntimeHandler` has been added into the CRI PodSandboxStatus struct, and |  | ||||||
| 	// should be set. This `RuntimeHandler` field will be deprecated after containerd 1.3 (tracked |  | ||||||
| 	// in https://github.com/containerd/cri/issues/1064). |  | ||||||
| 	RuntimeHandler string                    `json:"runtimeHandler"` // see the Note above |  | ||||||
| 	RuntimeType    string                    `json:"runtimeType"` |  | ||||||
| 	RuntimeOptions interface{}               `json:"runtimeOptions"` |  | ||||||
| 	Config         *runtime.PodSandboxConfig `json:"config"` |  | ||||||
| 	// Note: RuntimeSpec may not be populated if the sandbox has not been fully created. |  | ||||||
| 	RuntimeSpec *runtimespec.Spec      `json:"runtimeSpec"` |  | ||||||
| 	CNIResult   *cni.Result            `json:"cniResult"` |  | ||||||
| 	Metadata    *sandboxstore.Metadata `json:"sandboxMetadata"` |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool) (sandbox.ControllerStatus, error) { | func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool) (sandbox.ControllerStatus, error) { | ||||||
| 	sb, err := c.sandboxStore.Get(sandboxID) | 	sb := c.store.Get(sandboxID) | ||||||
| 	if err != nil { | 	if sb == nil { | ||||||
| 		return sandbox.ControllerStatus{}, fmt.Errorf("an error occurred while trying to find sandbox %q: %w", | 		return sandbox.ControllerStatus{}, fmt.Errorf("unable to find sandbox %q: %w", sandboxID, errdefs.ErrNotFound) | ||||||
| 			sandboxID, err) |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	status := sb.Status.Get() |  | ||||||
| 	cstatus := sandbox.ControllerStatus{ | 	cstatus := sandbox.ControllerStatus{ | ||||||
| 		SandboxID: sandboxID, | 		SandboxID: sandboxID, | ||||||
| 		Pid:       status.Pid, | 		Pid:       sb.Pid, | ||||||
| 		State:     status.State.String(), | 		State:     sb.State.String(), | ||||||
| 		CreatedAt: status.CreatedAt, | 		CreatedAt: sb.CreatedAt, | ||||||
| 		Extra:     nil, | 		Extra:     nil, | ||||||
| 	} | 	} | ||||||
|  | 	exitStatus := sb.GetExitStatus() | ||||||
| 	if !status.ExitedAt.IsZero() { | 	if exitStatus != nil { | ||||||
| 		cstatus.ExitedAt = status.ExitedAt | 		cstatus.ExitedAt = exitStatus.ExitTime() | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if verbose { | 	if verbose { | ||||||
| @@ -87,15 +62,16 @@ func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool) | |||||||
| } | } | ||||||
|  |  | ||||||
| // toCRISandboxInfo converts internal container object information to CRI sandbox status response info map. | // toCRISandboxInfo converts internal container object information to CRI sandbox status response info map. | ||||||
| func toCRISandboxInfo(ctx context.Context, sandbox sandboxstore.Sandbox) (map[string]string, error) { | func toCRISandboxInfo(ctx context.Context, sb *types.PodSandbox) (map[string]string, error) { | ||||||
| 	si := &SandboxInfo{ | 	si := &base.SandboxInfo{ | ||||||
| 		Pid:            sandbox.Status.Get().Pid, | 		Pid:            sb.Pid, | ||||||
| 		Config:         sandbox.Config, | 		Config:         sb.Metadata.Config, | ||||||
| 		RuntimeHandler: sandbox.RuntimeHandler, | 		RuntimeHandler: sb.Metadata.RuntimeHandler, | ||||||
| 		CNIResult:      sandbox.CNIResult, | 		CNIResult:      sb.Metadata.CNIResult, | ||||||
|  | 		Metadata:       &sb.Metadata, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if container := sandbox.Container; container != nil { | 	if container := sb.Container; container != nil { | ||||||
| 		task, err := container.Task(ctx, nil) | 		task, err := container.Task(ctx, nil) | ||||||
| 		if err != nil && !errdefs.IsNotFound(err) { | 		if err != nil && !errdefs.IsNotFound(err) { | ||||||
| 			return nil, fmt.Errorf("failed to get sandbox container task: %w", err) | 			return nil, fmt.Errorf("failed to get sandbox container task: %w", err) | ||||||
| @@ -145,18 +121,16 @@ func toCRISandboxInfo(ctx context.Context, sandbox sandboxstore.Sandbox) (map[st | |||||||
| 		// status which does not exist in containerd. | 		// status which does not exist in containerd. | ||||||
| 		si.Status = "deleted" | 		si.Status = "deleted" | ||||||
| 	} | 	} | ||||||
|  | 	netns := getNetNS(&sb.Metadata) | ||||||
| 	if sandbox.NetNS != nil { | 	if netns != nil { | ||||||
| 		// Add network closed information if sandbox is not using host network. | 		// Add network closed information if sandbox is not using host network. | ||||||
| 		closed, err := sandbox.NetNS.Closed() | 		closed, err := netns.Closed() | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return nil, fmt.Errorf("failed to check network namespace closed: %w", err) | 			return nil, fmt.Errorf("failed to check network namespace closed: %w", err) | ||||||
| 		} | 		} | ||||||
| 		si.NetNSClosed = closed | 		si.NetNSClosed = closed | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	si.Metadata = &sandbox.Metadata |  | ||||||
|  |  | ||||||
| 	infoBytes, err := json.Marshal(si) | 	infoBytes, err := json.Marshal(si) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, fmt.Errorf("failed to marshal info %v: %w", si, err) | 		return nil, fmt.Errorf("failed to marshal info %v: %w", si, err) | ||||||
|   | |||||||
| @@ -22,52 +22,57 @@ import ( | |||||||
| 	"syscall" | 	"syscall" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/containerd/log" | ||||||
|  |  | ||||||
| 	eventtypes "github.com/containerd/containerd/v2/api/events" | 	eventtypes "github.com/containerd/containerd/v2/api/events" | ||||||
|  | 	containerd "github.com/containerd/containerd/v2/client" | ||||||
| 	"github.com/containerd/containerd/v2/errdefs" | 	"github.com/containerd/containerd/v2/errdefs" | ||||||
|  | 	"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types" | ||||||
| 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | ||||||
| 	ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" | 	ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" | ||||||
| 	"github.com/containerd/containerd/v2/protobuf" | 	"github.com/containerd/containerd/v2/protobuf" | ||||||
| 	"github.com/containerd/containerd/v2/sandbox" | 	"github.com/containerd/containerd/v2/sandbox" | ||||||
| 	"github.com/containerd/log" |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.StopOpt) error { | func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.StopOpt) error { | ||||||
| 	sandbox, err := c.sandboxStore.Get(sandboxID) | 	podSandbox := c.store.Get(sandboxID) | ||||||
|  | 	if podSandbox == nil { | ||||||
|  | 		return errdefs.ErrNotFound | ||||||
|  | 	} | ||||||
|  | 	if podSandbox.Container == nil { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	meta, err := getMetadata(ctx, podSandbox.Container) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("an error occurred when try to find sandbox %q: %w", | 		return err | ||||||
| 			sandboxID, err) |  | ||||||
| 	} | 	} | ||||||
|  | 	state := podSandbox.State | ||||||
| 	if err := c.cleanupSandboxFiles(sandboxID, sandbox.Config); err != nil { | 	if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown { | ||||||
| 		return fmt.Errorf("failed to cleanup sandbox files: %w", err) | 		if err := c.stopSandboxContainer(ctx, podSandbox); err != nil { | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// TODO: The Controller maintains its own Status instead of CRI's sandboxStore. |  | ||||||
| 	// Only stop sandbox container when it's running or unknown. |  | ||||||
| 	state := sandbox.Status.Get().State |  | ||||||
| 	if (state == sandboxstore.StateReady || state == sandboxstore.StateUnknown) && sandbox.Container != nil { |  | ||||||
| 		if err := c.stopSandboxContainer(ctx, sandbox); err != nil { |  | ||||||
| 			return fmt.Errorf("failed to stop sandbox container %q in %q state: %w", sandboxID, state, err) | 			return fmt.Errorf("failed to stop sandbox container %q in %q state: %w", sandboxID, state, err) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	if err := c.cleanupSandboxFiles(sandboxID, meta.Config); err != nil { | ||||||
|  | 		return fmt.Errorf("failed to cleanup sandbox files: %w", err) | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // stopSandboxContainer kills the sandbox container. | // stopSandboxContainer kills the sandbox container. | ||||||
| // `task.Delete` is not called here because it will be called when | // `task.Delete` is not called here because it will be called when | ||||||
| // the event monitor handles the `TaskExit` event. | // the event monitor handles the `TaskExit` event. | ||||||
| func (c *Controller) stopSandboxContainer(ctx context.Context, sandbox sandboxstore.Sandbox) error { | func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types.PodSandbox) error { | ||||||
| 	id := sandbox.ID | 	id := podSandbox.ID | ||||||
| 	container := sandbox.Container | 	container := podSandbox.Container | ||||||
| 	state := sandbox.Status.Get().State | 	state := podSandbox.State | ||||||
| 	task, err := container.Task(ctx, nil) | 	task, err := container.Task(ctx, nil) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		if !errdefs.IsNotFound(err) { | 		if !errdefs.IsNotFound(err) { | ||||||
| 			return fmt.Errorf("failed to get sandbox container: %w", err) | 			return fmt.Errorf("failed to get pod sandbox container: %w", err) | ||||||
| 		} | 		} | ||||||
| 		// Don't return for unknown state, some cleanup needs to be done. | 		// Don't return for unknown state, some cleanup needs to be done. | ||||||
| 		if state == sandboxstore.StateUnknown { | 		if state == sandboxstore.StateUnknown { | ||||||
| 			return cleanupUnknownSandbox(ctx, id, sandbox) | 			return cleanupUnknownSandbox(ctx, id, podSandbox) | ||||||
| 		} | 		} | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| @@ -75,7 +80,7 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, sandbox sandboxst | |||||||
| 	// Handle unknown state. | 	// Handle unknown state. | ||||||
| 	// The cleanup logic is the same with container unknown state. | 	// The cleanup logic is the same with container unknown state. | ||||||
| 	if state == sandboxstore.StateUnknown { | 	if state == sandboxstore.StateUnknown { | ||||||
| 		// Start an exit handler for containers in unknown state. | 		// Start an exit handler for sandbox container in unknown state. | ||||||
| 		waitCtx, waitCancel := context.WithCancel(ctrdutil.NamespacedContext()) | 		waitCtx, waitCancel := context.WithCancel(ctrdutil.NamespacedContext()) | ||||||
| 		defer waitCancel() | 		defer waitCancel() | ||||||
| 		exitCh, err := task.Wait(waitCtx) | 		exitCh, err := task.Wait(waitCtx) | ||||||
| @@ -83,23 +88,20 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, sandbox sandboxst | |||||||
| 			if !errdefs.IsNotFound(err) { | 			if !errdefs.IsNotFound(err) { | ||||||
| 				return fmt.Errorf("failed to wait for task: %w", err) | 				return fmt.Errorf("failed to wait for task: %w", err) | ||||||
| 			} | 			} | ||||||
| 			return cleanupUnknownSandbox(ctx, id, sandbox) | 			return cleanupUnknownSandbox(ctx, id, podSandbox) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		exitCtx, exitCancel := context.WithCancel(context.Background()) | 		exitCtx, exitCancel := context.WithCancel(context.Background()) | ||||||
| 		stopCh := make(chan struct{}) | 		stopCh := make(chan struct{}) | ||||||
| 		go func() { | 		go func() { | ||||||
| 			defer close(stopCh) | 			defer close(stopCh) | ||||||
| 			exitStatus, exitedAt, err := c.waitSandboxExit(exitCtx, id, exitCh) | 			exitStatus, exitedAt, err := c.waitSandboxExit(exitCtx, podSandbox, exitCh) | ||||||
| 			if err != nil && err != context.Canceled && err != context.DeadlineExceeded { | 			if err != context.Canceled && err != context.DeadlineExceeded { | ||||||
| 				e := &eventtypes.SandboxExit{ | 				// The error of context.Canceled or context.DeadlineExceeded indicates the task.Wait is not finished, | ||||||
| 					SandboxID:  id, | 				// so we can not set the exit status of the pod sandbox. | ||||||
| 					ExitStatus: exitStatus, | 				podSandbox.Exit(*containerd.NewExitStatus(exitStatus, exitedAt, err)) | ||||||
| 					ExitedAt:   protobuf.ToTimestamp(exitedAt), | 			} else { | ||||||
| 				} | 				log.G(ctx).WithError(err).Errorf("Failed to wait pod sandbox exit %+v", err) | ||||||
| 				log.G(ctx).WithError(err).Errorf("Failed to wait sandbox exit %+v", e) |  | ||||||
| 				// TODO: how to backoff |  | ||||||
| 				c.cri.BackOffEvent(id, e) |  | ||||||
| 			} | 			} | ||||||
| 		}() | 		}() | ||||||
| 		defer func() { | 		defer func() { | ||||||
| @@ -111,27 +113,17 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, sandbox sandboxst | |||||||
| 		}() | 		}() | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Kill the sandbox container. | 	// Kill the pod sandbox container. | ||||||
| 	if err = task.Kill(ctx, syscall.SIGKILL); err != nil && !errdefs.IsNotFound(err) { | 	if err = task.Kill(ctx, syscall.SIGKILL); err != nil && !errdefs.IsNotFound(err) { | ||||||
| 		return fmt.Errorf("failed to kill sandbox container: %w", err) | 		return fmt.Errorf("failed to kill pod sandbox container: %w", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return c.waitSandboxStop(ctx, sandbox) | 	_, err = podSandbox.Wait(ctx) | ||||||
| } | 	return err | ||||||
|  |  | ||||||
| // waitSandboxStop waits for sandbox to be stopped until context is cancelled or |  | ||||||
| // the context deadline is exceeded. |  | ||||||
| func (c *Controller) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox) error { |  | ||||||
| 	select { |  | ||||||
| 	case <-ctx.Done(): |  | ||||||
| 		return fmt.Errorf("wait sandbox container %q: %w", sandbox.ID, ctx.Err()) |  | ||||||
| 	case <-sandbox.Stopped(): |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // cleanupUnknownSandbox cleanup stopped sandbox in unknown state. | // cleanupUnknownSandbox cleanup stopped sandbox in unknown state. | ||||||
| func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error { | func cleanupUnknownSandbox(ctx context.Context, id string, sandbox *types.PodSandbox) error { | ||||||
| 	// Reuse handleSandboxExit to do the cleanup. | 	// Reuse handleSandboxTaskExit to do the cleanup. | ||||||
| 	return handleSandboxExit(ctx, sandbox, &eventtypes.TaskExit{ExitStatus: unknownExitCode, ExitedAt: protobuf.ToTimestamp(time.Now())}) | 	return handleSandboxTaskExit(ctx, sandbox, &eventtypes.TaskExit{ExitStatus: unknownExitCode, ExitedAt: protobuf.ToTimestamp(time.Now())}) | ||||||
| } | } | ||||||
|   | |||||||
| @@ -17,33 +17,40 @@ | |||||||
| package podsandbox | package podsandbox | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"fmt" | ||||||
| 	"sync" | 	"sync" | ||||||
|  |  | ||||||
| 	containerd "github.com/containerd/containerd/v2/client" | 	"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type Status struct { |  | ||||||
| 	Waiter <-chan containerd.ExitStatus |  | ||||||
| } |  | ||||||
|  |  | ||||||
| type Store struct { | type Store struct { | ||||||
| 	sync.Map | 	m sync.Map | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewStore() *Store { | func NewStore() *Store { | ||||||
| 	return &Store{} | 	return &Store{} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *Store) Save(id string, exitCh <-chan containerd.ExitStatus) { | func (s *Store) Save(p *types.PodSandbox) error { | ||||||
| 	s.Store(id, &Status{Waiter: exitCh}) | 	if p == nil { | ||||||
|  | 		return fmt.Errorf("pod sandbox should not be nil") | ||||||
|  | 	} | ||||||
|  | 	s.m.Store(p.ID, p) | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *Store) Get(id string) *Status { | func (s *Store) Get(id string) *types.PodSandbox { | ||||||
| 	i, ok := s.LoadAndDelete(id) | 	i, ok := s.m.Load(id) | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		// not exist |  | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 	// Only save *Status | 	return i.(*types.PodSandbox) | ||||||
| 	return i.(*Status) | } | ||||||
|  |  | ||||||
|  | func (s *Store) Remove(id string) *types.PodSandbox { | ||||||
|  | 	i, ok := s.m.LoadAndDelete(id) | ||||||
|  | 	if !ok { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	return i.(*types.PodSandbox) | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										83
									
								
								pkg/cri/server/podsandbox/types/podsandbox.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										83
									
								
								pkg/cri/server/podsandbox/types/podsandbox.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,83 @@ | |||||||
|  | /* | ||||||
|  |    Copyright The containerd Authors. | ||||||
|  |  | ||||||
|  |    Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  |    you may not use this file except in compliance with the License. | ||||||
|  |    You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |        http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  |    Unless required by applicable law or agreed to in writing, software | ||||||
|  |    distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  |    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  |    See the License for the specific language governing permissions and | ||||||
|  |    limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package types | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	containerd "github.com/containerd/containerd/v2/client" | ||||||
|  | 	"github.com/containerd/containerd/v2/pkg/cri/store" | ||||||
|  | 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | ||||||
|  | 	"github.com/containerd/containerd/v2/sandbox" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type PodSandbox struct { | ||||||
|  | 	mu         sync.Mutex | ||||||
|  | 	ID         string | ||||||
|  | 	Container  containerd.Container | ||||||
|  | 	State      sandboxstore.State | ||||||
|  | 	Metadata   sandboxstore.Metadata | ||||||
|  | 	Runtime    sandbox.RuntimeOpts | ||||||
|  | 	Pid        uint32 | ||||||
|  | 	CreatedAt  time.Time | ||||||
|  | 	stopChan   *store.StopCh | ||||||
|  | 	exitStatus *containerd.ExitStatus | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewPodSandbox(id string, status sandboxstore.Status) *PodSandbox { | ||||||
|  | 	podSandbox := &PodSandbox{ | ||||||
|  | 		ID:        id, | ||||||
|  | 		Container: nil, | ||||||
|  | 		stopChan:  store.NewStopCh(), | ||||||
|  | 		CreatedAt: status.CreatedAt, | ||||||
|  | 		State:     status.State, | ||||||
|  | 		Pid:       status.Pid, | ||||||
|  | 	} | ||||||
|  | 	if status.State == sandboxstore.StateNotReady { | ||||||
|  | 		podSandbox.Exit(*containerd.NewExitStatus(status.ExitStatus, status.ExitedAt, nil)) | ||||||
|  | 	} | ||||||
|  | 	return podSandbox | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (p *PodSandbox) Exit(status containerd.ExitStatus) { | ||||||
|  | 	p.mu.Lock() | ||||||
|  | 	defer p.mu.Unlock() | ||||||
|  | 	p.exitStatus = &status | ||||||
|  | 	p.State = sandboxstore.StateNotReady | ||||||
|  | 	p.stopChan.Stop() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (p *PodSandbox) Wait(ctx context.Context) (*containerd.ExitStatus, error) { | ||||||
|  | 	s := p.GetExitStatus() | ||||||
|  | 	if s != nil { | ||||||
|  | 		return s, nil | ||||||
|  | 	} | ||||||
|  | 	select { | ||||||
|  | 	case <-ctx.Done(): | ||||||
|  | 		return nil, ctx.Err() | ||||||
|  | 	case <-p.stopChan.Stopped(): | ||||||
|  | 		return p.GetExitStatus(), nil | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (p *PodSandbox) GetExitStatus() *containerd.ExitStatus { | ||||||
|  | 	p.mu.Lock() | ||||||
|  | 	defer p.mu.Unlock() | ||||||
|  | 	return p.exitStatus | ||||||
|  | } | ||||||
| @@ -146,6 +146,28 @@ func (c *criService) recover(ctx context.Context) error { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	for _, sb := range c.sandboxStore.List() { | ||||||
|  | 		sb := sb | ||||||
|  | 		status := sb.Status.Get() | ||||||
|  | 		if status.State == sandboxstore.StateNotReady { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		controller, err := c.sandboxService.SandboxController(sb.Config, sb.RuntimeHandler) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.G(ctx).WithError(err).Error("failed to get sandbox controller while waiting sandbox") | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		exitCh := make(chan containerd.ExitStatus, 1) | ||||||
|  | 		go func() { | ||||||
|  | 			exit, err := controller.Wait(ctrdutil.NamespacedContext(), sb.ID) | ||||||
|  | 			if err != nil { | ||||||
|  | 				log.G(ctx).WithError(err).Error("failed to wait for sandbox exit") | ||||||
|  | 				exitCh <- *containerd.NewExitStatus(containerd.UnknownExitStatus, time.Time{}, err) | ||||||
|  | 			} | ||||||
|  | 			exitCh <- *containerd.NewExitStatus(exit.ExitStatus, exit.ExitedAt, nil) | ||||||
|  | 		}() | ||||||
|  | 		c.eventMonitor.startSandboxExitMonitor(context.Background(), sb.ID, exitCh) | ||||||
|  | 	} | ||||||
| 	// Recover all containers. | 	// Recover all containers. | ||||||
| 	containers, err := c.client.Containers(ctx, filterLabel(crilabels.ContainerKindLabel, crilabels.ContainerKindContainer)) | 	containers, err := c.client.Containers(ctx, filterLabel(crilabels.ContainerKindLabel, crilabels.ContainerKindContainer)) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|   | |||||||
| @@ -27,6 +27,7 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/containerd/go-cni" | 	"github.com/containerd/go-cni" | ||||||
|  | 	"github.com/containerd/log" | ||||||
| 	"github.com/containerd/typeurl/v2" | 	"github.com/containerd/typeurl/v2" | ||||||
| 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | ||||||
|  |  | ||||||
| @@ -39,7 +40,6 @@ import ( | |||||||
| 	"github.com/containerd/containerd/v2/pkg/cri/util" | 	"github.com/containerd/containerd/v2/pkg/cri/util" | ||||||
| 	"github.com/containerd/containerd/v2/pkg/netns" | 	"github.com/containerd/containerd/v2/pkg/netns" | ||||||
| 	sb "github.com/containerd/containerd/v2/sandbox" | 	sb "github.com/containerd/containerd/v2/sandbox" | ||||||
| 	"github.com/containerd/log" |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func init() { | func init() { | ||||||
| @@ -255,7 +255,6 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox | |||||||
|  |  | ||||||
| 	ctrl, err := controller.Start(ctx, id) | 	ctrl, err := controller.Start(ctx, id) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		sandbox.Container, _ = c.client.LoadContainer(ctx, id) |  | ||||||
| 		var cerr podsandbox.CleanupErr | 		var cerr podsandbox.CleanupErr | ||||||
| 		if errors.As(err, &cerr) { | 		if errors.As(err, &cerr) { | ||||||
| 			cleanupErr = fmt.Errorf("failed to cleanup sandbox: %w", cerr) | 			cleanupErr = fmt.Errorf("failed to cleanup sandbox: %w", cerr) | ||||||
| @@ -422,7 +421,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox | |||||||
| 	// | 	// | ||||||
| 	// TaskOOM from containerd may come before sandbox is added to store, | 	// TaskOOM from containerd may come before sandbox is added to store, | ||||||
| 	// but we don't care about sandbox TaskOOM right now, so it is fine. | 	// but we don't care about sandbox TaskOOM right now, so it is fine. | ||||||
| 	c.eventMonitor.startSandboxExitMonitor(context.Background(), id, ctrl.Pid, exitCh) | 	c.eventMonitor.startSandboxExitMonitor(context.Background(), id, exitCh) | ||||||
|  |  | ||||||
| 	// Send CONTAINER_STARTED event with ContainerId equal to SandboxId. | 	// Send CONTAINER_STARTED event with ContainerId equal to SandboxId. | ||||||
| 	c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) | 	c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) | ||||||
|   | |||||||
| @@ -18,14 +18,15 @@ package server | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/containerd/containerd/v2/errdefs" |  | ||||||
| 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" |  | ||||||
| 	"github.com/containerd/go-cni" |  | ||||||
| 	runtimespec "github.com/opencontainers/runtime-spec/specs-go" |  | ||||||
| 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | ||||||
|  |  | ||||||
|  | 	"github.com/containerd/containerd/v2/errdefs" | ||||||
|  | 	"github.com/containerd/containerd/v2/pkg/cri/server/base" | ||||||
|  | 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // PodSandboxStatus returns the status of the PodSandbox. | // PodSandboxStatus returns the status of the PodSandbox. | ||||||
| @@ -64,6 +65,12 @@ func (c *criService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandbox | |||||||
| 			return nil, fmt.Errorf("failed to query controller status: %w", err) | 			return nil, fmt.Errorf("failed to query controller status: %w", err) | ||||||
| 		} | 		} | ||||||
| 		state = runtime.PodSandboxState_SANDBOX_NOTREADY.String() | 		state = runtime.PodSandboxState_SANDBOX_NOTREADY.String() | ||||||
|  | 		if r.GetVerbose() { | ||||||
|  | 			info, err = toDeletedCRISandboxInfo(sandbox) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return nil, err | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
| 	} else { | 	} else { | ||||||
| 		state = cstatus.State | 		state = cstatus.State | ||||||
| 		createdAt = cstatus.CreatedAt | 		createdAt = cstatus.CreatedAt | ||||||
| @@ -104,26 +111,6 @@ func (c *criService) getIPs(sandbox sandboxstore.Sandbox) (string, []string, err | |||||||
| 	return sandbox.IP, sandbox.AdditionalIPs, nil | 	return sandbox.IP, sandbox.AdditionalIPs, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // SandboxInfo is extra information for sandbox. |  | ||||||
| // TODO (mikebrow): discuss predefining constants structures for some or all of these field names in CRI |  | ||||||
| type SandboxInfo struct { |  | ||||||
| 	Pid         uint32 `json:"pid"` |  | ||||||
| 	Status      string `json:"processStatus"` |  | ||||||
| 	NetNSClosed bool   `json:"netNamespaceClosed"` |  | ||||||
| 	Image       string `json:"image"` |  | ||||||
| 	SnapshotKey string `json:"snapshotKey"` |  | ||||||
| 	Snapshotter string `json:"snapshotter"` |  | ||||||
| 	// Note: a new field `RuntimeHandler` has been added into the CRI PodSandboxStatus struct, and |  | ||||||
| 	// should be set. This `RuntimeHandler` field will be deprecated after containerd 1.3 (tracked |  | ||||||
| 	// in https://github.com/containerd/cri/issues/1064). |  | ||||||
| 	RuntimeHandler string                    `json:"runtimeHandler"` // see the Note above |  | ||||||
| 	RuntimeType    string                    `json:"runtimeType"` |  | ||||||
| 	RuntimeOptions interface{}               `json:"runtimeOptions"` |  | ||||||
| 	Config         *runtime.PodSandboxConfig `json:"config"` |  | ||||||
| 	RuntimeSpec    *runtimespec.Spec         `json:"runtimeSpec"` |  | ||||||
| 	CNIResult      *cni.Result               `json:"cniResult"` |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // toCRISandboxStatus converts sandbox metadata into CRI pod sandbox status. | // toCRISandboxStatus converts sandbox metadata into CRI pod sandbox status. | ||||||
| func toCRISandboxStatus(meta sandboxstore.Metadata, status string, createdAt time.Time, ip string, additionalIPs []string) *runtime.PodSandboxStatus { | func toCRISandboxStatus(meta sandboxstore.Metadata, status string, createdAt time.Time, ip string, additionalIPs []string) *runtime.PodSandboxStatus { | ||||||
| 	// Set sandbox state to NOTREADY by default. | 	// Set sandbox state to NOTREADY by default. | ||||||
| @@ -159,3 +146,40 @@ func toCRISandboxStatus(meta sandboxstore.Metadata, status string, createdAt tim | |||||||
| 		RuntimeHandler: meta.RuntimeHandler, | 		RuntimeHandler: meta.RuntimeHandler, | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // toDeletedCRISandboxInfo converts cached sandbox to CRI sandbox status response info map. | ||||||
|  | // In most cases, controller.Status() with verbose=true should have SandboxInfo in the return, | ||||||
|  | // but if controller.Status() returns a NotFound error, | ||||||
|  | // we should fallback to get SandboxInfo from cached sandbox itself. | ||||||
|  | func toDeletedCRISandboxInfo(sandbox sandboxstore.Sandbox) (map[string]string, error) { | ||||||
|  | 	si := &base.SandboxInfo{ | ||||||
|  | 		Pid:            sandbox.Status.Get().Pid, | ||||||
|  | 		Config:         sandbox.Config, | ||||||
|  | 		RuntimeHandler: sandbox.RuntimeHandler, | ||||||
|  | 		CNIResult:      sandbox.CNIResult, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// If processStatus is empty, it means that the task is deleted. Apply "deleted" | ||||||
|  | 	// status which does not exist in containerd. | ||||||
|  | 	si.Status = "deleted" | ||||||
|  |  | ||||||
|  | 	if sandbox.NetNS != nil { | ||||||
|  | 		// Add network closed information if sandbox is not using host network. | ||||||
|  | 		closed, err := sandbox.NetNS.Closed() | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, fmt.Errorf("failed to check network namespace closed: %w", err) | ||||||
|  | 		} | ||||||
|  | 		si.NetNSClosed = closed | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	si.Metadata = &sandbox.Metadata | ||||||
|  |  | ||||||
|  | 	infoBytes, err := json.Marshal(si) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, fmt.Errorf("failed to marshal info %v: %w", si, err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return map[string]string{ | ||||||
|  | 		"info": string(infoBytes), | ||||||
|  | 	}, nil | ||||||
|  | } | ||||||
|   | |||||||
| @@ -25,6 +25,7 @@ import ( | |||||||
| 	"github.com/containerd/log" | 	"github.com/containerd/log" | ||||||
| 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | 	runtime "k8s.io/cri-api/pkg/apis/runtime/v1" | ||||||
|  |  | ||||||
|  | 	"github.com/containerd/containerd/v2/errdefs" | ||||||
| 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | 	sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -74,7 +75,12 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa | |||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if err := controller.Stop(ctx, id); err != nil { | 		if err := controller.Stop(ctx, id); err != nil { | ||||||
| 			return fmt.Errorf("failed to stop sandbox %q: %w", id, err) | 			// Log and ignore the error if controller already removed the sandbox | ||||||
|  | 			if errdefs.IsNotFound(err) { | ||||||
|  | 				log.G(ctx).Warnf("sandbox %q is not found when stopping it", id) | ||||||
|  | 			} else { | ||||||
|  | 				return fmt.Errorf("failed to stop sandbox %q: %w", id, err) | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -183,7 +183,7 @@ func NewCRIService(criBase *base.CRIBase, imageService imageService, client *con | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	podSandboxController := client.SandboxController(string(criconfig.ModePodSandbox)).(*podsandbox.Controller) | 	podSandboxController := client.SandboxController(string(criconfig.ModePodSandbox)).(*podsandbox.Controller) | ||||||
| 	podSandboxController.Init(c.sandboxStore, c) | 	podSandboxController.Init(c) | ||||||
|  |  | ||||||
| 	c.nri = nri | 	c.nri = nri | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Abel Feng
					Abel Feng