diff --git a/api/next.pb.txt b/api/next.pb.txt index a670015bd..efaeb9930 100644 --- a/api/next.pb.txt +++ b/api/next.pb.txt @@ -928,6 +928,13 @@ file { } json_name: "extensions" } + field { + name: "sandbox" + number: 11 + label: LABEL_OPTIONAL + type: TYPE_STRING + json_name: "sandbox" + } nested_type { name: "LabelsEntry" field { diff --git a/api/services/containers/v1/containers.pb.go b/api/services/containers/v1/containers.pb.go index e2319a66a..47af85b4a 100644 --- a/api/services/containers/v1/containers.pb.go +++ b/api/services/containers/v1/containers.pb.go @@ -81,10 +81,12 @@ type Container struct { // that should be unique against other extensions. When updating extension // data, one should only update the specified extension using field paths // to select a specific map key. - Extensions map[string]types.Any `protobuf:"bytes,10,rep,name=extensions,proto3" json:"extensions" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Extensions map[string]types.Any `protobuf:"bytes,10,rep,name=extensions,proto3" json:"extensions" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // Sandbox ID this container belongs to. + Sandbox string `protobuf:"bytes,11,opt,name=sandbox,proto3" json:"sandbox,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *Container) Reset() { *m = Container{} } @@ -960,6 +962,13 @@ func (m *Container) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if len(m.Sandbox) > 0 { + i -= len(m.Sandbox) + copy(dAtA[i:], m.Sandbox) + i = encodeVarintContainers(dAtA, i, uint64(len(m.Sandbox))) + i-- + dAtA[i] = 0x5a + } if len(m.Extensions) > 0 { for k := range m.Extensions { v := m.Extensions[k] @@ -1563,6 +1572,10 @@ func (m *Container) Size() (n int) { n += mapEntrySize + 1 + sovContainers(uint64(mapEntrySize)) } } + l = len(m.Sandbox) + if l > 0 { + n += 1 + l + sovContainers(uint64(l)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1788,6 +1801,7 @@ func (this *Container) String() string { `CreatedAt:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.CreatedAt), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `UpdatedAt:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.UpdatedAt), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `Extensions:` + mapStringForExtensions + `,`, + `Sandbox:` + fmt.Sprintf("%v", this.Sandbox) + `,`, `XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`, `}`, }, "") @@ -2480,6 +2494,38 @@ func (m *Container) Unmarshal(dAtA []byte) error { } m.Extensions[mapkey] = *mapvalue iNdEx = postIndex + case 11: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Sandbox", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContainers + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthContainers + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthContainers + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Sandbox = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipContainers(dAtA[iNdEx:]) diff --git a/api/services/containers/v1/containers.proto b/api/services/containers/v1/containers.proto index 8b51e3393..28bc5628d 100644 --- a/api/services/containers/v1/containers.proto +++ b/api/services/containers/v1/containers.proto @@ -114,6 +114,9 @@ message Container { // data, one should only update the specified extension using field paths // to select a specific map key. map extensions = 10 [(gogoproto.nullable) = false]; + + // Sandbox ID this container belongs to. + string sandbox = 11; } message GetContainerRequest { diff --git a/cmd/containerd-shim-runc-v2/main.go b/cmd/containerd-shim-runc-v2/main.go index 24f84e835..c5454bba3 100644 --- a/cmd/containerd-shim-runc-v2/main.go +++ b/cmd/containerd-shim-runc-v2/main.go @@ -22,8 +22,8 @@ package main import ( "context" - _ "github.com/containerd/containerd/runtime/v2/pause" "github.com/containerd/containerd/runtime/v2/runc/manager" + _ "github.com/containerd/containerd/runtime/v2/runc/pause" _ "github.com/containerd/containerd/runtime/v2/runc/task/plugin" "github.com/containerd/containerd/runtime/v2/shim" ) diff --git a/cmd/ctr/commands/sandboxes/sandboxes.go b/cmd/ctr/commands/sandboxes/sandboxes.go index 73156f660..9d1935bf3 100644 --- a/cmd/ctr/commands/sandboxes/sandboxes.go +++ b/cmd/ctr/commands/sandboxes/sandboxes.go @@ -148,6 +148,8 @@ var removeCommand = cli.Command{ } defer cancel() + force := context.Bool("force") + for _, id := range context.Args() { sandbox, err := client.LoadSandbox(ctx, id) if err != nil { @@ -155,7 +157,15 @@ var removeCommand = cli.Command{ continue } - err = sandbox.Shutdown(ctx, context.Bool("force")) + err = sandbox.Stop(ctx) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to stop sandbox %s", id) + if !force { + continue + } + } + + err = sandbox.Delete(ctx) if err != nil { log.G(ctx).WithError(err).Errorf("failed to shutdown sandbox %s", id) continue diff --git a/containerstore.go b/containerstore.go index 2a154e265..a9a82a2f5 100644 --- a/containerstore.go +++ b/containerstore.go @@ -166,6 +166,7 @@ func containerToProto(container *containers.Container) containersapi.Container { Snapshotter: container.Snapshotter, SnapshotKey: container.SnapshotKey, Extensions: extensions, + Sandbox: container.SandboxID, } } @@ -193,6 +194,7 @@ func containerFromProto(containerpb *containersapi.Container) containers.Contain CreatedAt: containerpb.CreatedAt, UpdatedAt: containerpb.UpdatedAt, Extensions: extensions, + SandboxID: containerpb.Sandbox, } } diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 708310aeb..3ff177512 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -186,6 +186,40 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO } }() + // This container belongs to sandbox which supposed to be already started via sandbox API. + if opts.SandboxID != "" { + process, err := m.Get(ctx, opts.SandboxID) + if err != nil { + return nil, fmt.Errorf("can't find sandbox %s", opts.SandboxID) + } + + // Write sandbox ID this task belongs to. + if err := os.WriteFile(filepath.Join(bundle.Path, "sandbox"), []byte(opts.SandboxID), 0600); err != nil { + return nil, err + } + + address, err := shimbinary.ReadAddress(filepath.Join(m.state, process.Namespace(), opts.SandboxID, "address")) + if err != nil { + return nil, fmt.Errorf("failed to get socket address for sandbox %q: %w", opts.SandboxID, err) + } + + // Use sandbox's socket address to handle task requests for this container. + if err := shimbinary.WriteAddress(filepath.Join(bundle.Path, "address"), address); err != nil { + return nil, err + } + + shim, err := loadShim(ctx, bundle, func() {}) + if err != nil { + return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err) + } + + if err := m.shims.Add(ctx, shim); err != nil { + return nil, err + } + + return shim, nil + } + shim, err := m.startShim(ctx, bundle, id, opts) if err != nil { return nil, err @@ -391,22 +425,9 @@ func (m *TaskManager) ID() string { // Create launches new shim instance and creates new task func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) { - var ( - process ShimProcess - err error - ) - - if opts.SandboxID != "" { - // This container belongs to sandbox which supposed to be already started via sandbox API. - process, err = m.manager.Get(ctx, opts.SandboxID) - if err != nil { - return nil, fmt.Errorf("can't find sandbox %s", opts.SandboxID) - } - } else { - process, err = m.manager.Start(ctx, taskID, opts) - if err != nil { - return nil, fmt.Errorf("failed to start shim: %w", err) - } + process, err := m.manager.Start(ctx, taskID, opts) + if err != nil { + return nil, fmt.Errorf("failed to start shim: %w", err) } // Cast to shim task and call task service to create a new container task instance. @@ -420,7 +441,8 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) defer cancel() - _, errShim := shim.delete(dctx, func(context.Context, string) {}) + sandboxed := opts.SandboxID != "" + _, errShim := shim.delete(dctx, sandboxed, func(context.Context, string) {}) if errShim != nil { if errdefs.IsDeadlineExceeded(errShim) { dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) @@ -454,8 +476,14 @@ func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, return nil, err } + container, err := m.manager.containers.Get(ctx, taskID) + if err != nil { + return nil, err + } + + sandboxed := container.SandboxID != "" shimTask := item.(*shimTask) - exit, err := shimTask.delete(ctx, func(ctx context.Context, id string) { + exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) { m.manager.shims.Delete(ctx, id) }) diff --git a/runtime/v2/pause/sandbox.go b/runtime/v2/runc/pause/sandbox.go similarity index 88% rename from runtime/v2/pause/sandbox.go rename to runtime/v2/runc/pause/sandbox.go index c94ff44a5..dc08c33bb 100644 --- a/runtime/v2/pause/sandbox.go +++ b/runtime/v2/runc/pause/sandbox.go @@ -22,6 +22,7 @@ package pause import ( "context" + "github.com/containerd/containerd/pkg/shutdown" "github.com/containerd/ttrpc" log "github.com/sirupsen/logrus" @@ -33,14 +34,26 @@ func init() { plugin.Register(&plugin.Registration{ Type: plugin.TTRPCPlugin, ID: "pause", + Requires: []plugin.Type{ + plugin.InternalPlugin, + }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { - return &pauseService{}, nil + ss, err := ic.GetByID(plugin.InternalPlugin, "shutdown") + if err != nil { + return nil, err + } + + return &pauseService{ + shutdown: ss.(shutdown.Service), + }, nil }, }) } // pauseService is an extension for task v2 runtime to support Pod "pause" containers via sandbox API. -type pauseService struct{} +type pauseService struct { + shutdown shutdown.Service +} var _ api.SandboxService = (*pauseService)(nil) @@ -56,6 +69,7 @@ func (p *pauseService) StartSandbox(ctx context.Context, req *api.StartSandboxRe func (p *pauseService) StopSandbox(ctx context.Context, req *api.StopSandboxRequest) (*api.StopSandboxResponse, error) { log.Debugf("stop sandbox request: %+v", req) + p.shutdown.Shutdown() return &api.StopSandboxResponse{}, nil } diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 037d64c9d..3c8e2b9a2 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -277,7 +277,7 @@ func (s *shimTask) PID(ctx context.Context) (uint32, error) { return response.TaskPid, nil } -func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) { +func (s *shimTask) delete(ctx context.Context, sandboxed bool, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) { response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{ ID: s.ID(), }) @@ -305,8 +305,12 @@ func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Conte removeTask(ctx, s.ID()) } - if err := s.waitShutdown(ctx); err != nil { - log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task") + // Don't shutdown sandbox as there may be other containers running. + // Let controller decide when to shutdown. + if !sandboxed { + if err := s.waitShutdown(ctx); err != nil { + log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task") + } } if err := s.shim.delete(ctx); err != nil { diff --git a/sandbox.go b/sandbox.go index 805002aff..c3795649f 100644 --- a/sandbox.go +++ b/sandbox.go @@ -18,9 +18,10 @@ package containerd import ( "context" - "fmt" "time" + "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/oci" api "github.com/containerd/containerd/sandbox" "github.com/containerd/typeurl" "github.com/gogo/protobuf/types" @@ -37,9 +38,10 @@ type Sandbox interface { Labels(ctx context.Context) (map[string]string, error) // Start starts new sandbox instance Start(ctx context.Context) error - // Shutdown will turn down existing sandbox instance. - // If using force, the client will ignore shutdown errors. - Shutdown(ctx context.Context, force bool) error + // Stop sends stop request to the shim instance. + Stop(ctx context.Context) error + // Delete removes sandbox from the metadata store. + Delete(ctx context.Context) error // Pause will freeze running sandbox instance Pause(ctx context.Context) error // Resume will unfreeze previously paused sandbox instance @@ -76,23 +78,12 @@ func (s *sandboxClient) Start(ctx context.Context) error { return s.client.SandboxController().Start(ctx, s.ID()) } -func (s *sandboxClient) Shutdown(ctx context.Context, force bool) error { - var ( - controller = s.client.SandboxController() - store = s.client.SandboxStore() - ) +func (s *sandboxClient) Stop(ctx context.Context) error { + return s.client.SandboxController().Shutdown(ctx, s.ID()) +} - err := controller.Shutdown(ctx, s.ID()) - if err != nil && !force { - return fmt.Errorf("failed to shutdown sandbox: %w", err) - } - - err = store.Delete(ctx, s.ID()) - if err != nil { - return fmt.Errorf("failed to delete sandbox from metadata store: %w", err) - } - - return nil +func (s *sandboxClient) Delete(ctx context.Context) error { + return s.client.SandboxStore().Delete(ctx, s.ID()) } func (s *sandboxClient) Pause(ctx context.Context) error { @@ -187,9 +178,15 @@ func WithSandboxRuntime(name string, options interface{}) NewSandboxOpts { } // WithSandboxSpec will provide the sandbox runtime spec -func WithSandboxSpec(spec interface{}) NewSandboxOpts { +func WithSandboxSpec(s *oci.Spec, opts ...oci.SpecOpts) NewSandboxOpts { return func(ctx context.Context, client *Client, sandbox *api.Sandbox) error { - spec, err := typeurl.MarshalAny(spec) + c := &containers.Container{ID: sandbox.ID} + + if err := oci.ApplyOpts(ctx, client, c, s, opts...); err != nil { + return err + } + + spec, err := typeurl.MarshalAny(s) if err != nil { return errors.Wrap(err, "failed to marshal spec") } diff --git a/services/containers/helpers.go b/services/containers/helpers.go index a75d5d62b..ceb2ca5dd 100644 --- a/services/containers/helpers.go +++ b/services/containers/helpers.go @@ -54,6 +54,7 @@ func containerToProto(container *containers.Container) api.Container { CreatedAt: container.CreatedAt, UpdatedAt: container.UpdatedAt, Extensions: extensions, + Sandbox: container.SandboxID, } } @@ -79,5 +80,6 @@ func containerFromProto(containerpb *api.Container) containers.Container { Snapshotter: containerpb.Snapshotter, SnapshotKey: containerpb.SnapshotKey, Extensions: extensions, + SandboxID: containerpb.Sandbox, } } diff --git a/services/sandbox/controller_local.go b/services/sandbox/controller_local.go index e01124bee..0fd448fac 100644 --- a/services/sandbox/controller_local.go +++ b/services/sandbox/controller_local.go @@ -109,8 +109,7 @@ func (c *controllerLocal) Start(ctx context.Context, in *api.ControllerStartRequ svc := task.NewSandboxClient(shim.Client()) _, err = svc.StartSandbox(ctx, &proto.StartSandboxRequest{ - SandboxID: in.SandboxID, - BundlePath: "", + SandboxID: in.SandboxID, }) if err != nil {