[Sandbox] Add Wait and PID

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2022-02-09 12:52:37 -08:00
parent 0d165e6544
commit b7a36950f6
12 changed files with 1996 additions and 220 deletions

View File

@ -3085,9 +3085,11 @@ file {
file {
name: "github.com/containerd/containerd/api/services/sandbox/v1/sandbox.proto"
package: "containerd.services.sandbox.v1"
dependency: "gogoproto/gogo.proto"
dependency: "google/protobuf/any.proto"
dependency: "google/protobuf/timestamp.proto"
dependency: "gogoproto/gogo.proto"
dependency: "github.com/containerd/containerd/api/types/sandbox.proto"
dependency: "github.com/containerd/containerd/api/types/mount.proto"
message_type {
name: "StoreCreateRequest"
field {
@ -3219,16 +3221,38 @@ file {
json_name: "sandboxId"
}
field {
name: "spec"
number: 4
name: "rootfs"
number: 2
label: LABEL_REPEATED
type: TYPE_MESSAGE
type_name: ".containerd.types.Mount"
json_name: "rootfs"
}
field {
name: "options"
number: 3
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".google.protobuf.Any"
json_name: "spec"
json_name: "options"
}
}
message_type {
name: "ControllerStartResponse"
field {
name: "sandbox_id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "sandboxId"
}
field {
name: "pid"
number: 2
label: LABEL_OPTIONAL
type: TYPE_UINT32
json_name: "pid"
}
}
message_type {
name: "ControllerShutdownRequest"
@ -3239,10 +3263,49 @@ file {
type: TYPE_STRING
json_name: "sandboxId"
}
field {
name: "timeout_secs"
number: 2
label: LABEL_OPTIONAL
type: TYPE_UINT32
json_name: "timeoutSecs"
}
}
message_type {
name: "ControllerShutdownResponse"
}
message_type {
name: "ControllerWaitRequest"
field {
name: "sandbox_id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "sandboxId"
}
}
message_type {
name: "ControllerWaitResponse"
field {
name: "exit_status"
number: 1
label: LABEL_OPTIONAL
type: TYPE_UINT32
json_name: "exitStatus"
}
field {
name: "exited_at"
number: 2
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".google.protobuf.Timestamp"
options {
65001: 0
65010: 1
}
json_name: "exitedAt"
}
}
message_type {
name: "ControllerPauseRequest"
field {
@ -3295,12 +3358,52 @@ file {
message_type {
name: "ControllerStatusResponse"
field {
name: "status"
name: "id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "id"
}
field {
name: "pid"
number: 2
label: LABEL_OPTIONAL
type: TYPE_UINT32
json_name: "pid"
}
field {
name: "state"
number: 3
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "state"
}
field {
name: "exit_status"
number: 4
label: LABEL_OPTIONAL
type: TYPE_UINT32
json_name: "exitStatus"
}
field {
name: "exited_at"
number: 5
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".google.protobuf.Timestamp"
options {
65001: 0
65010: 1
}
json_name: "exitedAt"
}
field {
name: "extra"
number: 6
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".google.protobuf.Any"
json_name: "status"
json_name: "extra"
}
}
service {
@ -3343,6 +3446,11 @@ file {
input_type: ".containerd.services.sandbox.v1.ControllerShutdownRequest"
output_type: ".containerd.services.sandbox.v1.ControllerShutdownResponse"
}
method {
name: "Wait"
input_type: ".containerd.services.sandbox.v1.ControllerWaitRequest"
output_type: ".containerd.services.sandbox.v1.ControllerWaitResponse"
}
method {
name: "Pause"
input_type: ".containerd.services.sandbox.v1.ControllerPauseRequest"
@ -3367,7 +3475,7 @@ file {
options {
go_package: "github.com/containerd/containerd/api/services/sandbox/v1;sandbox"
}
weak_dependency: 0
weak_dependency: 2
syntax: "proto3"
}
file {

File diff suppressed because it is too large Load Diff

View File

@ -25,9 +25,12 @@ syntax = "proto3";
// See proposal and discussion here: https://github.com/containerd/containerd/issues/4131
package containerd.services.sandbox.v1;
import weak "gogoproto/gogo.proto";
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
import weak "gogoproto/gogo.proto";
import "github.com/containerd/containerd/api/types/sandbox.proto";
import "github.com/containerd/containerd/api/types/mount.proto";
option go_package = "github.com/containerd/containerd/api/services/sandbox/v1;sandbox";
@ -85,6 +88,7 @@ message StoreGetResponse {
service Controller {
rpc Start(ControllerStartRequest) returns (ControllerStartResponse);
rpc Shutdown(ControllerShutdownRequest) returns (ControllerShutdownResponse);
rpc Wait(ControllerWaitRequest) returns (ControllerWaitResponse);
rpc Pause(ControllerPauseRequest) returns (ControllerPauseResponse);
rpc Resume(ControllerResumeRequest) returns (ControllerResumeResponse);
rpc Ping(ControllerPingRequest) returns (ControllerPingResponse);
@ -93,18 +97,31 @@ service Controller {
message ControllerStartRequest {
string sandbox_id = 1;
google.protobuf.Any spec = 4;
repeated containerd.types.Mount rootfs = 2;
google.protobuf.Any options = 3;
}
message ControllerStartResponse {
string sandbox_id = 1;
uint32 pid = 2;
}
message ControllerShutdownRequest {
string sandbox_id = 1;
uint32 timeout_secs = 2;
}
message ControllerShutdownResponse {}
message ControllerWaitRequest {
string sandbox_id = 1;
}
message ControllerWaitResponse {
uint32 exit_status = 1;
google.protobuf.Timestamp exited_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message ControllerPauseRequest {
string sandbox_id = 1;
}
@ -128,5 +145,10 @@ message ControllerStatusRequest {
}
message ControllerStatusResponse {
google.protobuf.Any status = 1;
string id = 1;
uint32 pid = 2;
string state = 3;
uint32 exit_status = 4;
google.protobuf.Timestamp exited_at = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Any extra = 6;
}

View File

@ -73,6 +73,12 @@ func (p *pauseService) StopSandbox(ctx context.Context, req *api.StopSandboxRequ
return &api.StopSandboxResponse{}, nil
}
func (p *pauseService) WaitSandbox(ctx context.Context, req *api.WaitSandboxRequest) (*api.WaitSandboxResponse, error) {
return &api.WaitSandboxResponse{
ExitStatus: 0,
}, nil
}
func (p *pauseService) UpdateSandbox(ctx context.Context, req *api.UpdateSandboxRequest) (*api.UpdateSandboxResponse, error) {
log.Debugf("update sandbox request: %+v", req)
return &api.UpdateSandboxResponse{}, nil

View File

@ -194,6 +194,8 @@ type ShimProcess interface {
ID() string
// Namespace of this shim.
Namespace() string
// Bundle is a file system path to shim's bundle.
Bundle() string
// Client returns the underlying TTRPC client for this shim.
Client() *ttrpc.Client
}
@ -212,6 +214,10 @@ func (s *shim) Namespace() string {
return s.bundle.Namespace
}
func (s *shim) Bundle() string {
return s.bundle.Path
}
func (s *shim) Close() error {
return s.client.Close()
}

File diff suppressed because it is too large Load Diff

View File

@ -19,6 +19,10 @@ syntax = "proto3";
package containerd.task.v2;
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
import weak "gogoproto/gogo.proto";
import "github.com/containerd/containerd/api/types/mount.proto";
// Sandbox is an optional interface that shim may implement to support sandboxes environments.
// A typical example of sandbox is microVM or pause container - an entity that groups containers and/or
@ -30,6 +34,9 @@ service Sandbox {
// StopSandbox will stop existing sandbox instance
rpc StopSandbox(StopSandboxRequest) returns (StopSandboxResponse);
// WaitSandbox blocks until sanbox exits.
rpc WaitSandbox(WaitSandboxRequest) returns (WaitSandboxResponse);
// Update can be used to amend the state of currently running sandbox instance (depending on
// implementation this can be used to resize/reacquire needed resources like RAM/CPU).
rpc UpdateSandbox(UpdateSandboxRequest) returns (UpdateSandboxResponse);
@ -50,10 +57,12 @@ service Sandbox {
message StartSandboxRequest {
string sandbox_id = 1;
string bundle_path = 2;
repeated containerd.types.Mount rootfs = 3;
google.protobuf.Any options = 4;
}
message StartSandboxResponse {
string pid = 1;
uint32 pid = 1;
}
message StopSandboxRequest {
@ -69,6 +78,15 @@ message UpdateSandboxRequest {
map<string, string> annotations = 3;
}
message WaitSandboxRequest {
string sandbox_id = 1;
}
message WaitSandboxResponse {
uint32 exit_status = 1;
google.protobuf.Timestamp exited_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message UpdateSandboxResponse {}
message SandboxStatusRequest {
@ -88,7 +106,12 @@ message ResumeSandboxRequest {
message ResumeSandboxResponse {}
message SandboxStatusResponse {
google.protobuf.Any status = 1;
string id = 1;
uint32 pid = 2;
string state = 3;
uint32 exit_status = 4;
google.protobuf.Timestamp exited_at = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Any extra = 6;
}
message PingRequest {

View File

@ -18,6 +18,7 @@ package containerd
import (
"context"
"fmt"
"time"
"github.com/containerd/containerd/containers"
@ -32,6 +33,8 @@ import (
type Sandbox interface {
// ID is a sandbox identifier
ID() string
// PID returns sandbox's process PID or error if its not yet started.
PID() (uint32, error)
// NewContainer creates new container that will belong to this sandbox
NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error)
// Labels returns the labels set on the sandbox
@ -40,19 +43,20 @@ type Sandbox interface {
Start(ctx context.Context) error
// Stop sends stop request to the shim instance.
Stop(ctx context.Context) error
// Wait blocks until sandbox process exits.
Wait(ctx context.Context) (<-chan ExitStatus, error)
// Delete removes sandbox from the metadata store.
Delete(ctx context.Context) error
// Pause will freeze running sandbox instance
Pause(ctx context.Context) error
// Resume will unfreeze previously paused sandbox instance
Resume(ctx context.Context) error
// Status will return current sandbox status (provided by shim runtime)
Status(ctx context.Context, status interface{}) error
// Ping will check whether existing sandbox instance alive
Ping(ctx context.Context) error
}
type sandboxClient struct {
pid *uint32
client *Client
metadata api.Sandbox
}
@ -61,6 +65,14 @@ func (s *sandboxClient) ID() string {
return s.metadata.ID
}
func (s *sandboxClient) PID() (uint32, error) {
if s.pid == nil {
return 0, fmt.Errorf("sandbox not started")
}
return *s.pid, nil
}
func (s *sandboxClient) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
return s.client.NewContainer(ctx, id, append(opts, WithSandbox(s.ID()))...)
}
@ -75,7 +87,36 @@ func (s *sandboxClient) Labels(ctx context.Context) (map[string]string, error) {
}
func (s *sandboxClient) Start(ctx context.Context) error {
return s.client.SandboxController().Start(ctx, s.ID())
pid, err := s.client.SandboxController().Start(ctx, s.ID())
if err != nil {
return err
}
s.pid = &pid
return nil
}
func (s *sandboxClient) Wait(ctx context.Context) (<-chan ExitStatus, error) {
c := make(chan ExitStatus, 1)
go func() {
defer close(c)
resp, err := s.client.SandboxController().Wait(ctx, s.ID())
if err != nil {
c <- ExitStatus{
code: UnknownExitStatus,
err: err,
}
return
}
c <- ExitStatus{
code: resp.ExitStatus,
exitedAt: resp.ExitedAt,
}
}()
return c, nil
}
func (s *sandboxClient) Stop(ctx context.Context) error {
@ -98,19 +139,6 @@ func (s *sandboxClient) Ping(ctx context.Context) error {
return s.client.SandboxController().Ping(ctx, s.ID())
}
func (s *sandboxClient) Status(ctx context.Context, status interface{}) error {
any, err := s.client.SandboxController().Status(ctx, s.ID())
if err != nil {
return err
}
if err := typeurl.UnmarshalTo(any, status); err != nil {
return errors.Wrap(err, "failed to unmarshal sandbox status")
}
return nil
}
// NewSandbox creates new sandbox client
func (c *Client) NewSandbox(ctx context.Context, sandboxID string, opts ...NewSandboxOpts) (Sandbox, error) {
if sandboxID == "" {
@ -135,6 +163,7 @@ func (c *Client) NewSandbox(ctx context.Context, sandboxID string, opts ...NewSa
}
return &sandboxClient{
pid: nil, // Not yet started
client: c,
metadata: metadata,
}, nil
@ -147,7 +176,13 @@ func (c *Client) LoadSandbox(ctx context.Context, id string) (Sandbox, error) {
return nil, err
}
status, err := c.SandboxController().Status(ctx, id)
if err != nil {
return nil, fmt.Errorf("failed to load sandbox %s, status request failed: %w", id, err)
}
return &sandboxClient{
pid: &status.Pid,
client: c,
metadata: sandbox,
}, nil

View File

@ -19,7 +19,7 @@ package sandbox
import (
"context"
"github.com/gogo/protobuf/types"
"github.com/containerd/containerd/api/services/sandbox/v1"
)
// Controller is an interface to manage sandboxes at runtime.
@ -44,9 +44,11 @@ type Controller interface {
// containerd will run new shim runtime instance and will invoke Start to create a sandbox process.
// This routine must be invoked before scheduling containers on this instance.
// Once started clients may run containers via Task service (additionally specifying sandbox id the container will belong to).
Start(ctx context.Context, sandboxID string) error
Start(ctx context.Context, sandboxID string) (uint32, error)
// Shutdown deletes and cleans all tasks and sandbox instance.
Shutdown(ctx context.Context, sandboxID string) error
// Wait blocks until sandbox process exits.
Wait(ctx context.Context, sandboxID string) (*sandbox.ControllerWaitResponse, error)
// Pause will freeze running sandbox instance.
// Shim implementations may return ErrNotImplemented if this is out of scope of a given sandbox.
Pause(ctx context.Context, sandboxID string) error
@ -57,5 +59,5 @@ type Controller interface {
Ping(ctx context.Context, sandboxID string) error
// Status will query sandbox process status. It is heavier than Ping call and must be used whenever you need to
// gather metadata about current sandbox state (status, uptime, resource use, etc).
Status(ctx context.Context, sandboxID string) (*types.Any, error)
Status(ctx context.Context, sandboxID string) (*sandbox.ControllerStatusResponse, error)
}

View File

@ -22,7 +22,6 @@ import (
api "github.com/containerd/containerd/api/services/sandbox/v1"
"github.com/containerd/containerd/errdefs"
sb "github.com/containerd/containerd/sandbox"
"github.com/gogo/protobuf/types"
)
// sandboxRemoteController is a low level GRPC client for containerd's sandbox controller service
@ -37,14 +36,13 @@ func NewSandboxRemoteController(client api.ControllerClient) sb.Controller {
return &sandboxRemoteController{client: client}
}
func (s *sandboxRemoteController) Start(ctx context.Context, sandboxID string) error {
if _, err := s.client.Start(ctx, &api.ControllerStartRequest{
SandboxID: sandboxID,
}); err != nil {
return errdefs.FromGRPC(err)
func (s *sandboxRemoteController) Start(ctx context.Context, sandboxID string) (uint32, error) {
resp, err := s.client.Start(ctx, &api.ControllerStartRequest{SandboxID: sandboxID})
if err != nil {
return 0, errdefs.FromGRPC(err)
}
return nil
return resp.Pid, nil
}
func (s *sandboxRemoteController) Shutdown(ctx context.Context, sandboxID string) error {
@ -56,6 +54,15 @@ func (s *sandboxRemoteController) Shutdown(ctx context.Context, sandboxID string
return nil
}
func (s *sandboxRemoteController) Wait(ctx context.Context, sandboxID string) (*api.ControllerWaitResponse, error) {
resp, err := s.client.Wait(ctx, &api.ControllerWaitRequest{SandboxID: sandboxID})
if err != nil {
return nil, errdefs.FromGRPC(err)
}
return resp, nil
}
func (s *sandboxRemoteController) Pause(ctx context.Context, sandboxID string) error {
_, err := s.client.Pause(ctx, &api.ControllerPauseRequest{SandboxID: sandboxID})
if err != nil {
@ -82,11 +89,11 @@ func (s *sandboxRemoteController) Ping(ctx context.Context, sandboxID string) er
return nil
}
func (s *sandboxRemoteController) Status(ctx context.Context, sandboxID string) (*types.Any, error) {
func (s *sandboxRemoteController) Status(ctx context.Context, sandboxID string) (*api.ControllerStatusResponse, error) {
resp, err := s.client.Status(ctx, &api.ControllerStatusRequest{SandboxID: sandboxID})
if err != nil {
return nil, errdefs.FromGRPC(err)
}
return resp.Status, nil
return resp, nil
}

View File

@ -108,15 +108,21 @@ func (c *controllerLocal) Start(ctx context.Context, in *api.ControllerStartRequ
svc := task.NewSandboxClient(shim.Client())
_, err = svc.StartSandbox(ctx, &proto.StartSandboxRequest{
SandboxID: in.SandboxID,
resp, err := svc.StartSandbox(ctx, &proto.StartSandboxRequest{
SandboxID: in.SandboxID,
BundlePath: shim.Bundle(),
Rootfs: in.Rootfs,
Options: in.Options,
})
if err != nil {
return nil, fmt.Errorf("failed to start sandbox %s: %w", in.SandboxID, err)
}
return &api.ControllerStartResponse{}, nil
return &api.ControllerStartResponse{
SandboxID: in.SandboxID,
Pid: resp.Pid,
}, nil
}
func (c *controllerLocal) Shutdown(ctx context.Context, in *api.ControllerShutdownRequest, opts ...grpc.CallOption) (*api.ControllerShutdownResponse, error) {
@ -127,7 +133,7 @@ func (c *controllerLocal) Shutdown(ctx context.Context, in *api.ControllerShutdo
if _, err := svc.StopSandbox(ctx, &proto.StopSandboxRequest{
SandboxID: in.SandboxID,
TimeoutSecs: 0,
TimeoutSecs: in.TimeoutSecs,
}); err != nil {
return nil, fmt.Errorf("failed to stop sandbox: %w", err)
}
@ -139,6 +145,26 @@ func (c *controllerLocal) Shutdown(ctx context.Context, in *api.ControllerShutdo
return &api.ControllerShutdownResponse{}, nil
}
func (c *controllerLocal) Wait(ctx context.Context, in *api.ControllerWaitRequest, opts ...grpc.CallOption) (*api.ControllerWaitResponse, error) {
svc, err := c.getSandbox(ctx, in.SandboxID)
if err != nil {
return nil, err
}
resp, err := svc.WaitSandbox(ctx, &proto.WaitSandboxRequest{
SandboxID: in.SandboxID,
})
if err != nil {
return nil, fmt.Errorf("failed to wait sandbox %s: %w", in.SandboxID, err)
}
return &api.ControllerWaitResponse{
ExitStatus: resp.ExitStatus,
ExitedAt: resp.ExitedAt,
}, nil
}
func (c *controllerLocal) Pause(ctx context.Context, in *api.ControllerPauseRequest, opts ...grpc.CallOption) (*api.ControllerPauseResponse, error) {
svc, err := c.getSandbox(ctx, in.SandboxID)
if err != nil {
@ -195,7 +221,14 @@ func (c *controllerLocal) Status(ctx context.Context, in *api.ControllerStatusRe
return nil, fmt.Errorf("failed to query sandbox %s status: %w", in.SandboxID, err)
}
return &api.ControllerStatusResponse{Status: resp.Status}, nil
return &api.ControllerStatusResponse{
ID: resp.ID,
Pid: resp.Pid,
State: resp.State,
ExitStatus: resp.ExitStatus,
ExitedAt: resp.ExitedAt,
Extra: resp.Extra,
}, nil
}
func (c *controllerLocal) getSandbox(ctx context.Context, id string) (task.SandboxService, error) {

View File

@ -78,6 +78,11 @@ func (s *controllerService) Shutdown(ctx context.Context, req *api.ControllerShu
return s.local.Shutdown(ctx, req)
}
func (s *controllerService) Wait(ctx context.Context, req *api.ControllerWaitRequest) (*api.ControllerWaitResponse, error) {
log.G(ctx).WithField("req", req).Debug("wait sandbox")
return s.local.Wait(ctx, req)
}
func (s *controllerService) Pause(ctx context.Context, req *api.ControllerPauseRequest) (*api.ControllerPauseResponse, error) {
log.G(ctx).WithField("req", req).Debug("pause sandbox")
return s.local.Pause(ctx, req)