Restore sandboxes on daemon restart

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2022-01-27 14:00:42 -08:00
parent 0c5e5c3579
commit 0d165e6544
12 changed files with 165 additions and 53 deletions

View File

@ -928,6 +928,13 @@ file {
}
json_name: "extensions"
}
field {
name: "sandbox"
number: 11
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "sandbox"
}
nested_type {
name: "LabelsEntry"
field {

View File

@ -81,10 +81,12 @@ type Container struct {
// that should be unique against other extensions. When updating extension
// data, one should only update the specified extension using field paths
// to select a specific map key.
Extensions map[string]types.Any `protobuf:"bytes,10,rep,name=extensions,proto3" json:"extensions" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Extensions map[string]types.Any `protobuf:"bytes,10,rep,name=extensions,proto3" json:"extensions" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// Sandbox ID this container belongs to.
Sandbox string `protobuf:"bytes,11,opt,name=sandbox,proto3" json:"sandbox,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Container) Reset() { *m = Container{} }
@ -960,6 +962,13 @@ func (m *Container) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Sandbox) > 0 {
i -= len(m.Sandbox)
copy(dAtA[i:], m.Sandbox)
i = encodeVarintContainers(dAtA, i, uint64(len(m.Sandbox)))
i--
dAtA[i] = 0x5a
}
if len(m.Extensions) > 0 {
for k := range m.Extensions {
v := m.Extensions[k]
@ -1563,6 +1572,10 @@ func (m *Container) Size() (n int) {
n += mapEntrySize + 1 + sovContainers(uint64(mapEntrySize))
}
}
l = len(m.Sandbox)
if l > 0 {
n += 1 + l + sovContainers(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -1788,6 +1801,7 @@ func (this *Container) String() string {
`CreatedAt:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.CreatedAt), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`UpdatedAt:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.UpdatedAt), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`Extensions:` + mapStringForExtensions + `,`,
`Sandbox:` + fmt.Sprintf("%v", this.Sandbox) + `,`,
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
`}`,
}, "")
@ -2480,6 +2494,38 @@ func (m *Container) Unmarshal(dAtA []byte) error {
}
m.Extensions[mapkey] = *mapvalue
iNdEx = postIndex
case 11:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Sandbox", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowContainers
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthContainers
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthContainers
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Sandbox = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipContainers(dAtA[iNdEx:])

View File

@ -114,6 +114,9 @@ message Container {
// data, one should only update the specified extension using field paths
// to select a specific map key.
map<string, google.protobuf.Any> extensions = 10 [(gogoproto.nullable) = false];
// Sandbox ID this container belongs to.
string sandbox = 11;
}
message GetContainerRequest {

View File

@ -22,8 +22,8 @@ package main
import (
"context"
_ "github.com/containerd/containerd/runtime/v2/pause"
"github.com/containerd/containerd/runtime/v2/runc/manager"
_ "github.com/containerd/containerd/runtime/v2/runc/pause"
_ "github.com/containerd/containerd/runtime/v2/runc/task/plugin"
"github.com/containerd/containerd/runtime/v2/shim"
)

View File

@ -148,6 +148,8 @@ var removeCommand = cli.Command{
}
defer cancel()
force := context.Bool("force")
for _, id := range context.Args() {
sandbox, err := client.LoadSandbox(ctx, id)
if err != nil {
@ -155,7 +157,15 @@ var removeCommand = cli.Command{
continue
}
err = sandbox.Shutdown(ctx, context.Bool("force"))
err = sandbox.Stop(ctx)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to stop sandbox %s", id)
if !force {
continue
}
}
err = sandbox.Delete(ctx)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to shutdown sandbox %s", id)
continue

View File

@ -166,6 +166,7 @@ func containerToProto(container *containers.Container) containersapi.Container {
Snapshotter: container.Snapshotter,
SnapshotKey: container.SnapshotKey,
Extensions: extensions,
Sandbox: container.SandboxID,
}
}
@ -193,6 +194,7 @@ func containerFromProto(containerpb *containersapi.Container) containers.Contain
CreatedAt: containerpb.CreatedAt,
UpdatedAt: containerpb.UpdatedAt,
Extensions: extensions,
SandboxID: containerpb.Sandbox,
}
}

View File

@ -186,6 +186,40 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO
}
}()
// This container belongs to sandbox which supposed to be already started via sandbox API.
if opts.SandboxID != "" {
process, err := m.Get(ctx, opts.SandboxID)
if err != nil {
return nil, fmt.Errorf("can't find sandbox %s", opts.SandboxID)
}
// Write sandbox ID this task belongs to.
if err := os.WriteFile(filepath.Join(bundle.Path, "sandbox"), []byte(opts.SandboxID), 0600); err != nil {
return nil, err
}
address, err := shimbinary.ReadAddress(filepath.Join(m.state, process.Namespace(), opts.SandboxID, "address"))
if err != nil {
return nil, fmt.Errorf("failed to get socket address for sandbox %q: %w", opts.SandboxID, err)
}
// Use sandbox's socket address to handle task requests for this container.
if err := shimbinary.WriteAddress(filepath.Join(bundle.Path, "address"), address); err != nil {
return nil, err
}
shim, err := loadShim(ctx, bundle, func() {})
if err != nil {
return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err)
}
if err := m.shims.Add(ctx, shim); err != nil {
return nil, err
}
return shim, nil
}
shim, err := m.startShim(ctx, bundle, id, opts)
if err != nil {
return nil, err
@ -391,22 +425,9 @@ func (m *TaskManager) ID() string {
// Create launches new shim instance and creates new task
func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) {
var (
process ShimProcess
err error
)
if opts.SandboxID != "" {
// This container belongs to sandbox which supposed to be already started via sandbox API.
process, err = m.manager.Get(ctx, opts.SandboxID)
if err != nil {
return nil, fmt.Errorf("can't find sandbox %s", opts.SandboxID)
}
} else {
process, err = m.manager.Start(ctx, taskID, opts)
if err != nil {
return nil, fmt.Errorf("failed to start shim: %w", err)
}
process, err := m.manager.Start(ctx, taskID, opts)
if err != nil {
return nil, fmt.Errorf("failed to start shim: %w", err)
}
// Cast to shim task and call task service to create a new container task instance.
@ -420,7 +441,8 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel()
_, errShim := shim.delete(dctx, func(context.Context, string) {})
sandboxed := opts.SandboxID != ""
_, errShim := shim.delete(dctx, sandboxed, func(context.Context, string) {})
if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
@ -454,8 +476,14 @@ func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit,
return nil, err
}
container, err := m.manager.containers.Get(ctx, taskID)
if err != nil {
return nil, err
}
sandboxed := container.SandboxID != ""
shimTask := item.(*shimTask)
exit, err := shimTask.delete(ctx, func(ctx context.Context, id string) {
exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) {
m.manager.shims.Delete(ctx, id)
})

View File

@ -22,6 +22,7 @@ package pause
import (
"context"
"github.com/containerd/containerd/pkg/shutdown"
"github.com/containerd/ttrpc"
log "github.com/sirupsen/logrus"
@ -33,14 +34,26 @@ func init() {
plugin.Register(&plugin.Registration{
Type: plugin.TTRPCPlugin,
ID: "pause",
Requires: []plugin.Type{
plugin.InternalPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return &pauseService{}, nil
ss, err := ic.GetByID(plugin.InternalPlugin, "shutdown")
if err != nil {
return nil, err
}
return &pauseService{
shutdown: ss.(shutdown.Service),
}, nil
},
})
}
// pauseService is an extension for task v2 runtime to support Pod "pause" containers via sandbox API.
type pauseService struct{}
type pauseService struct {
shutdown shutdown.Service
}
var _ api.SandboxService = (*pauseService)(nil)
@ -56,6 +69,7 @@ func (p *pauseService) StartSandbox(ctx context.Context, req *api.StartSandboxRe
func (p *pauseService) StopSandbox(ctx context.Context, req *api.StopSandboxRequest) (*api.StopSandboxResponse, error) {
log.Debugf("stop sandbox request: %+v", req)
p.shutdown.Shutdown()
return &api.StopSandboxResponse{}, nil
}

View File

@ -277,7 +277,7 @@ func (s *shimTask) PID(ctx context.Context) (uint32, error) {
return response.TaskPid, nil
}
func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
func (s *shimTask) delete(ctx context.Context, sandboxed bool, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{
ID: s.ID(),
})
@ -305,8 +305,12 @@ func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Conte
removeTask(ctx, s.ID())
}
if err := s.waitShutdown(ctx); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task")
// Don't shutdown sandbox as there may be other containers running.
// Let controller decide when to shutdown.
if !sandboxed {
if err := s.waitShutdown(ctx); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task")
}
}
if err := s.shim.delete(ctx); err != nil {

View File

@ -18,9 +18,10 @@ package containerd
import (
"context"
"fmt"
"time"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/oci"
api "github.com/containerd/containerd/sandbox"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/types"
@ -37,9 +38,10 @@ type Sandbox interface {
Labels(ctx context.Context) (map[string]string, error)
// Start starts new sandbox instance
Start(ctx context.Context) error
// Shutdown will turn down existing sandbox instance.
// If using force, the client will ignore shutdown errors.
Shutdown(ctx context.Context, force bool) error
// Stop sends stop request to the shim instance.
Stop(ctx context.Context) error
// Delete removes sandbox from the metadata store.
Delete(ctx context.Context) error
// Pause will freeze running sandbox instance
Pause(ctx context.Context) error
// Resume will unfreeze previously paused sandbox instance
@ -76,23 +78,12 @@ func (s *sandboxClient) Start(ctx context.Context) error {
return s.client.SandboxController().Start(ctx, s.ID())
}
func (s *sandboxClient) Shutdown(ctx context.Context, force bool) error {
var (
controller = s.client.SandboxController()
store = s.client.SandboxStore()
)
func (s *sandboxClient) Stop(ctx context.Context) error {
return s.client.SandboxController().Shutdown(ctx, s.ID())
}
err := controller.Shutdown(ctx, s.ID())
if err != nil && !force {
return fmt.Errorf("failed to shutdown sandbox: %w", err)
}
err = store.Delete(ctx, s.ID())
if err != nil {
return fmt.Errorf("failed to delete sandbox from metadata store: %w", err)
}
return nil
func (s *sandboxClient) Delete(ctx context.Context) error {
return s.client.SandboxStore().Delete(ctx, s.ID())
}
func (s *sandboxClient) Pause(ctx context.Context) error {
@ -187,9 +178,15 @@ func WithSandboxRuntime(name string, options interface{}) NewSandboxOpts {
}
// WithSandboxSpec will provide the sandbox runtime spec
func WithSandboxSpec(spec interface{}) NewSandboxOpts {
func WithSandboxSpec(s *oci.Spec, opts ...oci.SpecOpts) NewSandboxOpts {
return func(ctx context.Context, client *Client, sandbox *api.Sandbox) error {
spec, err := typeurl.MarshalAny(spec)
c := &containers.Container{ID: sandbox.ID}
if err := oci.ApplyOpts(ctx, client, c, s, opts...); err != nil {
return err
}
spec, err := typeurl.MarshalAny(s)
if err != nil {
return errors.Wrap(err, "failed to marshal spec")
}

View File

@ -54,6 +54,7 @@ func containerToProto(container *containers.Container) api.Container {
CreatedAt: container.CreatedAt,
UpdatedAt: container.UpdatedAt,
Extensions: extensions,
Sandbox: container.SandboxID,
}
}
@ -79,5 +80,6 @@ func containerFromProto(containerpb *api.Container) containers.Container {
Snapshotter: containerpb.Snapshotter,
SnapshotKey: containerpb.SnapshotKey,
Extensions: extensions,
SandboxID: containerpb.Sandbox,
}
}

View File

@ -109,8 +109,7 @@ func (c *controllerLocal) Start(ctx context.Context, in *api.ControllerStartRequ
svc := task.NewSandboxClient(shim.Client())
_, err = svc.StartSandbox(ctx, &proto.StartSandboxRequest{
SandboxID: in.SandboxID,
BundlePath: "",
SandboxID: in.SandboxID,
})
if err != nil {