Merge pull request #6703 from mxpv/s

Sandbox API
This commit is contained in:
Derek McGowan
2022-04-18 20:55:06 -07:00
committed by GitHub
45 changed files with 12334 additions and 75 deletions

View File

@@ -49,6 +49,8 @@ type CreateOpts struct {
// Runtime name to use (e.g. `io.containerd.NAME.VERSION`).
// As an alternative full abs path to binary may be specified instead.
Runtime string
// SandboxID is an optional ID of sandbox this container belongs to
SandboxID string
}
// Exit information for a process

View File

@@ -25,6 +25,8 @@ import (
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"
"github.com/opencontainers/runtime-spec/specs-go"
)
const configFilename = "config.json"
@@ -43,7 +45,7 @@ func LoadBundle(ctx context.Context, root, id string) (*Bundle, error) {
}
// NewBundle returns a new bundle on disk
func NewBundle(ctx context.Context, root, state, id string, spec []byte) (b *Bundle, err error) {
func NewBundle(ctx context.Context, root, state, id string, spec typeurl.Any) (b *Bundle, err error) {
if err := identifiers.Validate(id); err != nil {
return nil, fmt.Errorf("invalid task id %s: %w", id, err)
}
@@ -73,8 +75,10 @@ func NewBundle(ctx context.Context, root, state, id string, spec []byte) (b *Bun
if err := os.Mkdir(b.Path, 0700); err != nil {
return nil, err
}
if err := prepareBundleDirectoryPermissions(b.Path, spec); err != nil {
return nil, err
if typeurl.Is(spec, &specs.Spec{}) {
if err := prepareBundleDirectoryPermissions(b.Path, spec.GetValue()); err != nil {
return nil, err
}
}
paths = append(paths, b.Path)
// create working directory for the bundle
@@ -100,9 +104,14 @@ func NewBundle(ctx context.Context, root, state, id string, spec []byte) (b *Bun
if err := os.Symlink(work, filepath.Join(b.Path, "work")); err != nil {
return nil, err
}
// write the spec to the bundle
err = os.WriteFile(filepath.Join(b.Path, configFilename), spec, 0666)
return b, err
if spec := spec.GetValue(); spec != nil {
// write the spec to the bundle
err = os.WriteFile(filepath.Join(b.Path, configFilename), spec, 0666)
if err != nil {
return nil, fmt.Errorf("failed to write %s", configFilename)
}
}
return b, nil
}
// Bundle represents an OCI bundle

View File

@@ -29,6 +29,7 @@ import (
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
"github.com/containerd/containerd/pkg/testutil"
"github.com/containerd/typeurl"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -57,11 +58,11 @@ func TestNewBundle(t *testing.T) {
GIDMappings: []specs.LinuxIDMapping{{ContainerID: 0, HostID: usernsGID}},
}
}
specBytes, err := json.Marshal(&spec)
specAny, err := typeurl.MarshalAny(&spec)
require.NoError(t, err, "failed to marshal spec")
ctx := namespaces.WithNamespace(context.TODO(), namespaces.Default)
b, err := NewBundle(ctx, work, state, id, specBytes)
b, err := NewBundle(ctx, work, state, id, specAny)
require.NoError(t, err, "NewBundle should succeed")
require.NotNil(t, b, "bundle should not be nil")

View File

@@ -96,6 +96,24 @@ func init() {
return NewTaskManager(shimManager), nil
},
})
// Task manager uses shim manager as a dependency to manage shim instances.
// However, due to time limits and to avoid migration steps in 1.6 release,
// use the following workaround.
// This expected to be removed in 1.7.
plugin.Register(&plugin.Registration{
Type: plugin.RuntimePluginV2,
ID: "shim",
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
taskManagerI, err := ic.GetByID(plugin.RuntimePluginV2, "task")
if err != nil {
return nil, err
}
taskManager := taskManagerI.(*TaskManager)
return taskManager.manager, nil
},
})
}
type ManagerConfig struct {
@@ -158,7 +176,7 @@ func (m *ShimManager) ID() string {
// Start launches a new shim instance
func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimProcess, retErr error) {
bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.GetValue())
bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec)
if err != nil {
return nil, err
}
@@ -168,6 +186,40 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO
}
}()
// This container belongs to sandbox which supposed to be already started via sandbox API.
if opts.SandboxID != "" {
process, err := m.Get(ctx, opts.SandboxID)
if err != nil {
return nil, fmt.Errorf("can't find sandbox %s", opts.SandboxID)
}
// Write sandbox ID this task belongs to.
if err := os.WriteFile(filepath.Join(bundle.Path, "sandbox"), []byte(opts.SandboxID), 0600); err != nil {
return nil, err
}
address, err := shimbinary.ReadAddress(filepath.Join(m.state, process.Namespace(), opts.SandboxID, "address"))
if err != nil {
return nil, fmt.Errorf("failed to get socket address for sandbox %q: %w", opts.SandboxID, err)
}
// Use sandbox's socket address to handle task requests for this container.
if err := shimbinary.WriteAddress(filepath.Join(bundle.Path, "address"), address); err != nil {
return nil, err
}
shim, err := loadShim(ctx, bundle, func() {})
if err != nil {
return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err)
}
if err := m.shims.Add(ctx, shim); err != nil {
return nil, err
}
return shim, nil
}
shim, err := m.startShim(ctx, bundle, id, opts)
if err != nil {
return nil, err
@@ -324,7 +376,8 @@ func (m *ShimManager) Get(ctx context.Context, id string) (ShimProcess, error) {
return nil, err
}
return proc, nil
shimTask := proc.(*shimTask)
return shimTask, nil
}
// Delete a runtime task
@@ -388,7 +441,8 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel()
_, errShim := shim.delete(dctx, func(context.Context, string) {})
sandboxed := opts.SandboxID != ""
_, errShim := shim.delete(dctx, sandboxed, func(context.Context, string) {})
if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
@@ -422,8 +476,14 @@ func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit,
return nil, err
}
container, err := m.manager.containers.Get(ctx, taskID)
if err != nil {
return nil, err
}
sandboxed := container.SandboxID != ""
shimTask := item.(*shimTask)
exit, err := shimTask.delete(ctx, func(ctx context.Context, id string) {
exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) {
m.manager.shims.Delete(ctx, id)
})

View File

@@ -0,0 +1,104 @@
//go:build linux
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pause
import (
"context"
"github.com/containerd/containerd/pkg/shutdown"
"github.com/containerd/ttrpc"
log "github.com/sirupsen/logrus"
"github.com/containerd/containerd/plugin"
api "github.com/containerd/containerd/runtime/v2/task"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.TTRPCPlugin,
ID: "pause",
Requires: []plugin.Type{
plugin.InternalPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
ss, err := ic.GetByID(plugin.InternalPlugin, "shutdown")
if err != nil {
return nil, err
}
return &pauseService{
shutdown: ss.(shutdown.Service),
}, nil
},
})
}
// pauseService is an extension for task v2 runtime to support Pod "pause" containers via sandbox API.
type pauseService struct {
shutdown shutdown.Service
}
var _ api.SandboxService = (*pauseService)(nil)
func (p *pauseService) RegisterTTRPC(server *ttrpc.Server) error {
api.RegisterSandboxService(server, p)
return nil
}
func (p *pauseService) StartSandbox(ctx context.Context, req *api.StartSandboxRequest) (*api.StartSandboxResponse, error) {
log.Debugf("start sandbox request: %+v", req)
return &api.StartSandboxResponse{}, nil
}
func (p *pauseService) StopSandbox(ctx context.Context, req *api.StopSandboxRequest) (*api.StopSandboxResponse, error) {
log.Debugf("stop sandbox request: %+v", req)
p.shutdown.Shutdown()
return &api.StopSandboxResponse{}, nil
}
func (p *pauseService) WaitSandbox(ctx context.Context, req *api.WaitSandboxRequest) (*api.WaitSandboxResponse, error) {
return &api.WaitSandboxResponse{
ExitStatus: 0,
}, nil
}
func (p *pauseService) UpdateSandbox(ctx context.Context, req *api.UpdateSandboxRequest) (*api.UpdateSandboxResponse, error) {
log.Debugf("update sandbox request: %+v", req)
return &api.UpdateSandboxResponse{}, nil
}
func (p *pauseService) PauseSandbox(ctx context.Context, req *api.PauseSandboxRequest) (*api.PauseSandboxResponse, error) {
log.Debugf("pause sandbox request: %+v", req)
return &api.PauseSandboxResponse{}, nil
}
func (p *pauseService) ResumeSandbox(ctx context.Context, req *api.ResumeSandboxRequest) (*api.ResumeSandboxResponse, error) {
log.Debugf("resume sandbox request: %+v", req)
return &api.ResumeSandboxResponse{}, nil
}
func (p *pauseService) SandboxStatus(ctx context.Context, req *api.SandboxStatusRequest) (*api.SandboxStatusResponse, error) {
log.Debugf("sandbox status request: %+v", req)
return &api.SandboxStatusResponse{}, nil
}
func (p *pauseService) PingSandbox(ctx context.Context, req *api.PingRequest) (*api.PingResponse, error) {
return &api.PingResponse{}, nil
}

View File

@@ -194,6 +194,10 @@ type ShimProcess interface {
ID() string
// Namespace of this shim.
Namespace() string
// Bundle is a file system path to shim's bundle.
Bundle() string
// Client returns the underlying TTRPC client for this shim.
Client() *ttrpc.Client
}
type shim struct {
@@ -210,6 +214,10 @@ func (s *shim) Namespace() string {
return s.bundle.Namespace
}
func (s *shim) Bundle() string {
return s.bundle.Path
}
func (s *shim) Close() error {
return s.client.Close()
}
@@ -243,6 +251,10 @@ type shimTask struct {
task task.TaskService
}
func (s *shimTask) Client() *ttrpc.Client {
return s.client
}
func (s *shimTask) Shutdown(ctx context.Context) error {
_, err := s.task.Shutdown(ctx, &task.ShutdownRequest{
ID: s.ID(),
@@ -271,7 +283,7 @@ func (s *shimTask) PID(ctx context.Context) (uint32, error) {
return response.TaskPid, nil
}
func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
func (s *shimTask) delete(ctx context.Context, sandboxed bool, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{
ID: s.ID(),
})
@@ -299,8 +311,12 @@ func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Conte
removeTask(ctx, s.ID())
}
if err := s.waitShutdown(ctx); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task")
// Don't shutdown sandbox as there may be other containers running.
// Let controller decide when to shutdown.
if !sandboxed {
if err := s.waitShutdown(ctx); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task")
}
}
if err := s.shim.delete(ctx); err != nil {

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,121 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
syntax = "proto3";
package containerd.task.v2;
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
import weak "gogoproto/gogo.proto";
import "github.com/containerd/containerd/api/types/mount.proto";
// Sandbox is an optional interface that shim may implement to support sandboxes environments.
// A typical example of sandbox is microVM or pause container - an entity that groups containers and/or
// holds resources relevant for this group.
service Sandbox {
// StartSandbox will create/start a new sandbox instance
rpc StartSandbox(StartSandboxRequest) returns (StartSandboxResponse);
// StopSandbox will stop existing sandbox instance
rpc StopSandbox(StopSandboxRequest) returns (StopSandboxResponse);
// WaitSandbox blocks until sanbox exits.
rpc WaitSandbox(WaitSandboxRequest) returns (WaitSandboxResponse);
// Update can be used to amend the state of currently running sandbox instance (depending on
// implementation this can be used to resize/reacquire needed resources like RAM/CPU).
rpc UpdateSandbox(UpdateSandboxRequest) returns (UpdateSandboxResponse);
// PauseSandbox will suspend currently running sandbox instance.
rpc PauseSandbox(PauseSandboxRequest) returns (PauseSandboxResponse);
// ResumeSandbox will resuyme previously suspended sandbox instance.
rpc ResumeSandbox(ResumeSandboxRequest) returns (ResumeSandboxResponse);
// SandboxStatus will return current status of the running sandbox instance
rpc SandboxStatus(SandboxStatusRequest) returns (SandboxStatusResponse);
// PingSandbox is a lightweight API call to check whether sandbox alive.
rpc PingSandbox(PingRequest) returns (PingResponse);
}
message StartSandboxRequest {
string sandbox_id = 1;
string bundle_path = 2;
repeated containerd.types.Mount rootfs = 3;
google.protobuf.Any options = 4;
}
message StartSandboxResponse {
uint32 pid = 1;
}
message StopSandboxRequest {
string sandbox_id = 1;
uint32 timeout_secs = 2;
}
message StopSandboxResponse {}
message UpdateSandboxRequest {
string sandbox_id = 1;
google.protobuf.Any resources = 2;
map<string, string> annotations = 3;
}
message WaitSandboxRequest {
string sandbox_id = 1;
}
message WaitSandboxResponse {
uint32 exit_status = 1;
google.protobuf.Timestamp exited_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message UpdateSandboxResponse {}
message SandboxStatusRequest {
string sandbox_id = 1;
}
message PauseSandboxRequest {
string sandbox_id = 1;
}
message PauseSandboxResponse {}
message ResumeSandboxRequest {
string sandbox_id = 1;
}
message ResumeSandboxResponse {}
message SandboxStatusResponse {
string id = 1;
uint32 pid = 2;
string state = 3;
uint32 exit_status = 4;
google.protobuf.Timestamp exited_at = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Any extra = 6;
}
message PingRequest {
string sandbox_id = 1;
}
message PingResponse {}