From 6eea8f3f627672e05afc340902b3bd2931d596a2 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Thu, 16 Sep 2021 17:56:28 -0700 Subject: [PATCH 1/4] Add shutdown package Allows shutdown to handle callbacks with similar behavior as context cancel Signed-off-by: Derek McGowan --- pkg/shutdown/shutdown.go | 108 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 pkg/shutdown/shutdown.go diff --git a/pkg/shutdown/shutdown.go b/pkg/shutdown/shutdown.go new file mode 100644 index 000000000..982a95099 --- /dev/null +++ b/pkg/shutdown/shutdown.go @@ -0,0 +1,108 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package shutdown + +import ( + "context" + "errors" + "sync" + "time" + + "golang.org/x/sync/errgroup" +) + +// ErrShutdown is the error condition when a context has been fully shutdown +var ErrShutdown = errors.New("shutdown") + +// Service is used to facilitate shutdown by through callback +// registration and shutdown initiation +type Service interface { + // Shutdown initiates shutdown + Shutdown() + // RegisterCallback registers functions to be called on shutdown and before + // the shutdown channel is closed. A callback error will propagate to the + // context error + RegisterCallback(func(context.Context) error) +} + +// WithShutdown returns a context which is similar to a cancel context, but +// with callbacks which can propagate to the context error. Unlike a cancel +// context, the shutdown context cannot be canceled from the parent context. +// However, future child contexes will be canceled upon shutdown. +func WithShutdown(ctx context.Context) (context.Context, Service) { + ss := &shutdownService{ + Context: ctx, + doneC: make(chan struct{}), + timeout: 30 * time.Second, + } + return ss, ss +} + +type shutdownService struct { + context.Context + + mu sync.Mutex + isShutdown bool + callbacks []func(context.Context) error + doneC chan struct{} + err error + timeout time.Duration +} + +func (s *shutdownService) Shutdown() { + s.mu.Lock() + defer s.mu.Unlock() + if s.isShutdown { + return + } + s.isShutdown = true + + go func(callbacks []func(context.Context) error) { + ctx, cancel := context.WithTimeout(context.Background(), s.timeout) + defer cancel() + grp, ctx := errgroup.WithContext(ctx) + for _, fn := range callbacks { + grp.Go(func() error { return fn(ctx) }) + } + err := grp.Wait() + if err == nil { + err = ErrShutdown + } + s.mu.Lock() + s.err = err + close(s.doneC) + s.mu.Unlock() + }(s.callbacks) +} + +func (s *shutdownService) Done() <-chan struct{} { + return s.doneC +} + +func (s *shutdownService) Err() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.err +} +func (s *shutdownService) RegisterCallback(fn func(context.Context) error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.callbacks == nil { + s.callbacks = []func(context.Context) error{} + } + s.callbacks = append(s.callbacks, fn) +} From 6835a94707c3e9da641a65d31dfc3bed2daf6be8 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Thu, 16 Sep 2021 17:56:14 -0700 Subject: [PATCH 2/4] Split runc shim into plugin components Signed-off-by: Derek McGowan --- pkg/shutdown/shutdown.go | 3 +- runtime/v2/runc/service/manager.go | 276 +++++++++++++++++++++++++++ runtime/v2/runc/v2/service.go | 297 +++++------------------------ runtime/v2/shim/shim.go | 22 ++- 4 files changed, 346 insertions(+), 252 deletions(-) create mode 100644 runtime/v2/runc/service/manager.go diff --git a/pkg/shutdown/shutdown.go b/pkg/shutdown/shutdown.go index 982a95099..bc1af75ab 100644 --- a/pkg/shutdown/shutdown.go +++ b/pkg/shutdown/shutdown.go @@ -75,7 +75,8 @@ func (s *shutdownService) Shutdown() { ctx, cancel := context.WithTimeout(context.Background(), s.timeout) defer cancel() grp, ctx := errgroup.WithContext(ctx) - for _, fn := range callbacks { + for i := range callbacks { + fn := callbacks[i] grp.Go(func() error { return fn(ctx) }) } err := grp.Wait() diff --git a/runtime/v2/runc/service/manager.go b/runtime/v2/runc/service/manager.go new file mode 100644 index 000000000..8a82f37e4 --- /dev/null +++ b/runtime/v2/runc/service/manager.go @@ -0,0 +1,276 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package service + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + goruntime "runtime" + "syscall" + "time" + + "github.com/containerd/cgroups" + cgroupsv2 "github.com/containerd/cgroups/v2" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/schedcore" + "github.com/containerd/containerd/runtime/v2/runc" + "github.com/containerd/containerd/runtime/v2/runc/options" + "github.com/containerd/containerd/runtime/v2/shim" + taskAPI "github.com/containerd/containerd/runtime/v2/task" + runcC "github.com/containerd/go-runc" + "github.com/containerd/typeurl" + "github.com/gogo/protobuf/proto" + ptypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + exec "golang.org/x/sys/execabs" + "golang.org/x/sys/unix" +) + +// NewShimManager returns an implementation of the shim manager +// using runc +func NewShimManager(id string) shim.Shim { + return &manager{ + id: id, + } +} + +// group labels specifies how the shim groups services. +// currently supports a runc.v2 specific .group label and the +// standard k8s pod label. Order matters in this list +var groupLabels = []string{ + "io.containerd.runc.v2.group", + "io.kubernetes.cri.sandbox-id", +} + +type spec struct { + Annotations map[string]string `json:"annotations,omitempty"` +} + +type manager struct { + id string +} + +func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + self, err := os.Executable() + if err != nil { + return nil, err + } + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + args := []string{ + "-namespace", ns, + "-id", id, + "-address", containerdAddress, + } + cmd := exec.Command(self, args...) + cmd.Dir = cwd + cmd.Env = append(os.Environ(), "GOMAXPROCS=4") + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + return cmd, nil +} + +func readSpec() (*spec, error) { + f, err := os.Open("config.json") + if err != nil { + return nil, err + } + defer f.Close() + var s spec + if err := json.NewDecoder(f).Decode(&s); err != nil { + return nil, err + } + return &s, nil +} + +func (manager) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, retErr error) { + cmd, err := newCommand(ctx, opts.ID, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress) + if err != nil { + return "", err + } + grouping := opts.ID + spec, err := readSpec() + if err != nil { + return "", err + } + for _, group := range groupLabels { + if groupID, ok := spec.Annotations[group]; ok { + grouping = groupID + break + } + } + address, err := shim.SocketAddress(ctx, opts.Address, grouping) + if err != nil { + return "", err + } + + socket, err := shim.NewSocket(address) + if err != nil { + // the only time where this would happen is if there is a bug and the socket + // was not cleaned up in the cleanup method of the shim or we are using the + // grouping functionality where the new process should be run with the same + // shim as an existing container + if !shim.SocketEaddrinuse(err) { + return "", errors.Wrap(err, "create new shim socket") + } + if shim.CanConnect(address) { + if err := shim.WriteAddress("address", address); err != nil { + return "", errors.Wrap(err, "write existing socket for shim") + } + return address, nil + } + if err := shim.RemoveSocket(address); err != nil { + return "", errors.Wrap(err, "remove pre-existing socket") + } + if socket, err = shim.NewSocket(address); err != nil { + return "", errors.Wrap(err, "try create new shim socket 2x") + } + } + defer func() { + if retErr != nil { + socket.Close() + _ = shim.RemoveSocket(address) + } + }() + + // make sure that reexec shim-v2 binary use the value if need + if err := shim.WriteAddress("address", address); err != nil { + return "", err + } + + f, err := socket.File() + if err != nil { + return "", err + } + + cmd.ExtraFiles = append(cmd.ExtraFiles, f) + + goruntime.LockOSThread() + if os.Getenv("SCHED_CORE") != "" { + if err := schedcore.Create(schedcore.ProcessGroup); err != nil { + return "", errors.Wrap(err, "enable sched core support") + } + } + + if err := cmd.Start(); err != nil { + f.Close() + return "", err + } + + goruntime.UnlockOSThread() + + defer func() { + if retErr != nil { + cmd.Process.Kill() + } + }() + // make sure to wait after start + go cmd.Wait() + if data, err := ioutil.ReadAll(os.Stdin); err == nil { + if len(data) > 0 { + var any ptypes.Any + if err := proto.Unmarshal(data, &any); err != nil { + return "", err + } + v, err := typeurl.UnmarshalAny(&any) + if err != nil { + return "", err + } + if opts, ok := v.(*options.Options); ok { + if opts.ShimCgroup != "" { + if cgroups.Mode() == cgroups.Unified { + cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup) + if err != nil { + return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) + } + if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil { + return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) + } + } else { + cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup)) + if err != nil { + return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) + } + if err := cg.Add(cgroups.Process{ + Pid: cmd.Process.Pid, + }); err != nil { + return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) + } + } + } + } + } + } + if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil { + return "", errors.Wrap(err, "failed to adjust OOM score for shim") + } + return address, nil +} + +func (m manager) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + + path := filepath.Join(filepath.Dir(cwd), m.id) + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + runtime, err := runc.ReadRuntime(path) + if err != nil { + return nil, err + } + opts, err := runc.ReadOptions(path) + if err != nil { + return nil, err + } + root := process.RuncRoot + if opts != nil && opts.Root != "" { + root = opts.Root + } + + r := process.NewRunc(root, path, ns, runtime, "", false) + if err := r.Delete(ctx, m.id, &runcC.DeleteOpts{ + Force: true, + }); err != nil { + logrus.WithError(err).Warn("failed to remove runc container") + } + if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { + logrus.WithError(err).Warn("failed to cleanup rootfs mount") + } + return &taskAPI.DeleteResponse{ + ExitedAt: time.Now(), + ExitStatus: 128 + uint32(unix.SIGKILL), + }, nil +} diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go index 8f81bd9c9..857445471 100644 --- a/runtime/v2/runc/v2/service.go +++ b/runtime/v2/runc/v2/service.go @@ -21,42 +21,36 @@ package v2 import ( "context" - "encoding/json" - "io" "os" - "path/filepath" - goruntime "runtime" "sync" - "syscall" - "time" "github.com/containerd/cgroups" cgroupsv2 "github.com/containerd/cgroups/v2" eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/oom" oomv1 "github.com/containerd/containerd/pkg/oom/v1" oomv2 "github.com/containerd/containerd/pkg/oom/v2" "github.com/containerd/containerd/pkg/process" - "github.com/containerd/containerd/pkg/schedcore" + "github.com/containerd/containerd/pkg/shutdown" "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/pkg/userns" + "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime/v2/runc" "github.com/containerd/containerd/runtime/v2/runc/options" + runcservice "github.com/containerd/containerd/runtime/v2/runc/service" "github.com/containerd/containerd/runtime/v2/shim" + shimapi "github.com/containerd/containerd/runtime/v2/task" taskAPI "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/containerd/sys/reaper" runcC "github.com/containerd/go-runc" + "github.com/containerd/ttrpc" "github.com/containerd/typeurl" - "github.com/gogo/protobuf/proto" ptypes "github.com/gogo/protobuf/types" "github.com/pkg/errors" "github.com/sirupsen/logrus" - exec "golang.org/x/sys/execabs" - "golang.org/x/sys/unix" ) var ( @@ -64,20 +58,34 @@ var ( empty = &ptypes.Empty{} ) -// group labels specifies how the shim groups services. -// currently supports a runc.v2 specific .group label and the -// standard k8s pod label. Order matters in this list -var groupLabels = []string{ - "io.containerd.runc.v2.group", - "io.kubernetes.cri.sandbox-id", -} - -type spec struct { - Annotations map[string]string `json:"annotations,omitempty"` -} - // New returns a new shim service that can be used via GRPC -func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { +// TODO(2.0): Remove this function, rely on plugin registration +func New(_ context.Context, id string, _ shim.Publisher, _ func()) (shim.Shim, error) { + plugin.Register(&plugin.Registration{ + Type: plugin.TTRPCPlugin, + ID: "task", + Requires: []plugin.Type{ + plugin.EventPlugin, + plugin.InternalPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + pp, err := ic.GetByID(plugin.EventPlugin, "publisher") + if err != nil { + return nil, err + } + ss, err := ic.GetByID(plugin.InternalPlugin, "shutdown") + if err != nil { + return nil, err + } + return NewTaskService(ic.Context, pp.(shim.Publisher), ss.(shutdown.Service)) + }, + }) + + return runcservice.NewShimManager(id), nil +} + +// NewTaskService creates a new instance of a task service +func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) { var ( ep oom.Watcher err error @@ -92,24 +100,28 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func } go ep.Run(ctx) s := &service{ - id: id, context: ctx, events: make(chan interface{}, 128), ec: reaper.Default.Subscribe(), ep: ep, - cancel: shutdown, + shutdown: sd, containers: make(map[string]*runc.Container), } go s.processExits() runcC.Monitor = reaper.Default if err := s.initPlatform(); err != nil { - shutdown() return nil, errors.Wrap(err, "failed to initialized platform behavior") } go s.forward(ctx, publisher) + sd.RegisterCallback(func(context.Context) error { + close(s.events) + return nil + }) if address, err := shim.ReadAddress("address"); err == nil { - s.shimAddress = address + sd.RegisterCallback(func(context.Context) error { + return shim.RemoveSocket(address) + }) } return s, nil } @@ -125,216 +137,9 @@ type service struct { ec chan runcC.Exit ep oom.Watcher - // id only used in cleanup case - id string - containers map[string]*runc.Container - shimAddress string - cancel func() -} - -func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) { - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - self, err := os.Executable() - if err != nil { - return nil, err - } - cwd, err := os.Getwd() - if err != nil { - return nil, err - } - args := []string{ - "-namespace", ns, - "-id", id, - "-address", containerdAddress, - } - cmd := exec.Command(self, args...) - cmd.Dir = cwd - cmd.Env = append(os.Environ(), "GOMAXPROCS=4") - cmd.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, - } - return cmd, nil -} - -func readSpec() (*spec, error) { - f, err := os.Open("config.json") - if err != nil { - return nil, err - } - defer f.Close() - var s spec - if err := json.NewDecoder(f).Decode(&s); err != nil { - return nil, err - } - return &s, nil -} - -func (s *service) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, retErr error) { - cmd, err := newCommand(ctx, opts.ID, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress) - if err != nil { - return "", err - } - grouping := opts.ID - spec, err := readSpec() - if err != nil { - return "", err - } - for _, group := range groupLabels { - if groupID, ok := spec.Annotations[group]; ok { - grouping = groupID - break - } - } - address, err := shim.SocketAddress(ctx, opts.Address, grouping) - if err != nil { - return "", err - } - - socket, err := shim.NewSocket(address) - if err != nil { - // the only time where this would happen is if there is a bug and the socket - // was not cleaned up in the cleanup method of the shim or we are using the - // grouping functionality where the new process should be run with the same - // shim as an existing container - if !shim.SocketEaddrinuse(err) { - return "", errors.Wrap(err, "create new shim socket") - } - if shim.CanConnect(address) { - if err := shim.WriteAddress("address", address); err != nil { - return "", errors.Wrap(err, "write existing socket for shim") - } - return address, nil - } - if err := shim.RemoveSocket(address); err != nil { - return "", errors.Wrap(err, "remove pre-existing socket") - } - if socket, err = shim.NewSocket(address); err != nil { - return "", errors.Wrap(err, "try create new shim socket 2x") - } - } - defer func() { - if retErr != nil { - socket.Close() - _ = shim.RemoveSocket(address) - } - }() - - // make sure that reexec shim-v2 binary use the value if need - if err := shim.WriteAddress("address", address); err != nil { - return "", err - } - - f, err := socket.File() - if err != nil { - return "", err - } - - cmd.ExtraFiles = append(cmd.ExtraFiles, f) - - goruntime.LockOSThread() - if os.Getenv("SCHED_CORE") != "" { - if err := schedcore.Create(schedcore.ProcessGroup); err != nil { - return "", errors.Wrap(err, "enable sched core support") - } - } - - if err := cmd.Start(); err != nil { - f.Close() - return "", err - } - - goruntime.UnlockOSThread() - - defer func() { - if retErr != nil { - cmd.Process.Kill() - } - }() - // make sure to wait after start - go cmd.Wait() - if data, err := io.ReadAll(os.Stdin); err == nil { - if len(data) > 0 { - var any ptypes.Any - if err := proto.Unmarshal(data, &any); err != nil { - return "", err - } - v, err := typeurl.UnmarshalAny(&any) - if err != nil { - return "", err - } - if opts, ok := v.(*options.Options); ok { - if opts.ShimCgroup != "" { - if cgroups.Mode() == cgroups.Unified { - cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup) - if err != nil { - return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) - } - if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil { - return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) - } - } else { - cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup)) - if err != nil { - return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) - } - if err := cg.Add(cgroups.Process{ - Pid: cmd.Process.Pid, - }); err != nil { - return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) - } - } - } - } - } - } - if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil { - return "", errors.Wrap(err, "failed to adjust OOM score for shim") - } - return address, nil -} - -func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { - cwd, err := os.Getwd() - if err != nil { - return nil, err - } - - path := filepath.Join(filepath.Dir(cwd), s.id) - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - runtime, err := runc.ReadRuntime(path) - if err != nil { - return nil, err - } - opts, err := runc.ReadOptions(path) - if err != nil { - return nil, err - } - root := process.RuncRoot - if opts != nil && opts.Root != "" { - root = opts.Root - } - - r := process.NewRunc(root, path, ns, runtime, "", false) - if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{ - Force: true, - }); err != nil { - logrus.WithError(err).Warn("failed to remove runc container") - } - if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { - logrus.WithError(err).Warn("failed to cleanup rootfs mount") - } - return &taskAPI.DeleteResponse{ - ExitedAt: time.Now(), - ExitStatus: 128 + uint32(unix.SIGKILL), - }, nil + shutdown shutdown.Service } // Create a new initial process and container with the underlying OCI runtime @@ -368,6 +173,11 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * }, nil } +func (s *service) RegisterTTRPC(server *ttrpc.Server) error { + shimapi.RegisterTaskService(server, s) + return nil +} + // Start a process func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { container, err := s.getContainer(r.ID) @@ -683,18 +493,10 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt return empty, nil } - if s.platform != nil { - s.platform.Close() - } + // please make sure that temporary resource has been cleanup or registered + // for cleanup before calling shutdown + s.shutdown.Shutdown() - if s.shimAddress != "" { - _ = shim.RemoveSocket(s.shimAddress) - } - - // please make sure that temporary resource has been cleanup - // before shutdown service. - s.cancel() - close(s.events) return empty, nil } @@ -840,5 +642,6 @@ func (s *service) initPlatform() error { return err } s.platform = p + s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() }) return nil } diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index 691040bc7..18a585082 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -30,6 +30,7 @@ import ( "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/shutdown" "github.com/containerd/containerd/plugin" shimapi "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/containerd/version" @@ -58,7 +59,7 @@ type Init func(context.Context, string, Publisher, func()) (Shim, error) // Shim server interface type Shim interface { - Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) + Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) // TODO(2.0): Update interface to pass ID directly to Cleanup StartShim(ctx context.Context, opts StartOpts) (string, error) } @@ -159,6 +160,7 @@ func setLogger(ctx context.Context, id string) error { } // Run initializes and runs a shim server +// TODO(2.0): Remove initFunc from arguments func Run(id string, initFunc Init, opts ...BinaryOpts) { var config Config for _, o := range opts { @@ -209,8 +211,9 @@ func run(id string, initFunc Init, config Config) error { ctx := namespaces.WithNamespace(context.Background(), namespaceFlag) ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id)) - ctx, cancel := context.WithCancel(ctx) - service, err := initFunc(ctx, idFlag, publisher, cancel) + ctx, sd := shutdown.WithShutdown(ctx) + defer sd.Shutdown() + service, err := initFunc(ctx, idFlag, publisher, sd.Shutdown) if err != nil { return err } @@ -259,6 +262,14 @@ func run(id string, initFunc Init, config Config) error { } } + plugin.Register(&plugin.Registration{ + Type: plugin.InternalPlugin, + ID: "shutdown", + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + return sd, nil + }, + }) + // Register event plugin plugin.Register(&plugin.Registration{ Type: plugin.EventPlugin, @@ -273,6 +284,9 @@ func run(id string, initFunc Init, config Config) error { plugin.Register(&plugin.Registration{ Type: plugin.TTRPCPlugin, ID: "task", + Requires: []plugin.Type{ + plugin.EventPlugin, + }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { return &taskService{ts}, nil }, @@ -345,7 +359,7 @@ func run(id string, initFunc Init, config Config) error { } if err := serve(ctx, server, signals); err != nil { - if err != context.Canceled { + if err != shutdown.ErrShutdown { return err } } From 04e57d71b2397489bd961f8f63f1e8f8b6ce59fd Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Mon, 20 Sep 2021 10:48:13 -0700 Subject: [PATCH 3/4] Seperate shim manager and task service Create new shim manager interface and deprecate older shim manager interface. Signed-off-by: Derek McGowan --- .../manager.go => manager/manager_linux.go} | 45 +- runtime/v2/runc/tasks/service.go | 619 +++++++++++++++++ runtime/v2/runc/v2/service.go | 632 +----------------- runtime/v2/shim/shim.go | 160 +++-- 4 files changed, 789 insertions(+), 667 deletions(-) rename runtime/v2/runc/{service/manager.go => manager/manager_linux.go} (87%) create mode 100644 runtime/v2/runc/tasks/service.go diff --git a/runtime/v2/runc/service/manager.go b/runtime/v2/runc/manager/manager_linux.go similarity index 87% rename from runtime/v2/runc/service/manager.go rename to runtime/v2/runc/manager/manager_linux.go index 8a82f37e4..ba574beb0 100644 --- a/runtime/v2/runc/service/manager.go +++ b/runtime/v2/runc/manager/manager_linux.go @@ -1,5 +1,3 @@ -// +build linux - /* Copyright The containerd Authors. @@ -16,7 +14,7 @@ limitations under the License. */ -package service +package manager import ( "context" @@ -30,6 +28,7 @@ import ( "github.com/containerd/cgroups" cgroupsv2 "github.com/containerd/cgroups/v2" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/process" @@ -37,22 +36,20 @@ import ( "github.com/containerd/containerd/runtime/v2/runc" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/shim" - taskAPI "github.com/containerd/containerd/runtime/v2/task" runcC "github.com/containerd/go-runc" "github.com/containerd/typeurl" "github.com/gogo/protobuf/proto" ptypes "github.com/gogo/protobuf/types" "github.com/pkg/errors" - "github.com/sirupsen/logrus" exec "golang.org/x/sys/execabs" "golang.org/x/sys/unix" ) // NewShimManager returns an implementation of the shim manager // using runc -func NewShimManager(id string) shim.Shim { +func NewShimManager(name string) shim.Manager { return &manager{ - id: id, + name: name, } } @@ -69,7 +66,7 @@ type spec struct { } type manager struct { - id string + name string } func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) { @@ -112,12 +109,16 @@ func readSpec() (*spec, error) { return &s, nil } -func (manager) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, retErr error) { - cmd, err := newCommand(ctx, opts.ID, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress) +func (m manager) Name() string { + return m.name +} + +func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ string, retErr error) { + cmd, err := newCommand(ctx, id, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress) if err != nil { return "", err } - grouping := opts.ID + grouping := id spec, err := readSpec() if err != nil { return "", err @@ -236,24 +237,24 @@ func (manager) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, re return address, nil } -func (m manager) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { +func (manager) Stop(ctx context.Context, id string) (shim.StopStatus, error) { cwd, err := os.Getwd() if err != nil { - return nil, err + return shim.StopStatus{}, err } - path := filepath.Join(filepath.Dir(cwd), m.id) + path := filepath.Join(filepath.Dir(cwd), id) ns, err := namespaces.NamespaceRequired(ctx) if err != nil { - return nil, err + return shim.StopStatus{}, err } runtime, err := runc.ReadRuntime(path) if err != nil { - return nil, err + return shim.StopStatus{}, err } opts, err := runc.ReadOptions(path) if err != nil { - return nil, err + return shim.StopStatus{}, err } root := process.RuncRoot if opts != nil && opts.Root != "" { @@ -261,16 +262,16 @@ func (m manager) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { } r := process.NewRunc(root, path, ns, runtime, "", false) - if err := r.Delete(ctx, m.id, &runcC.DeleteOpts{ + if err := r.Delete(ctx, id, &runcC.DeleteOpts{ Force: true, }); err != nil { - logrus.WithError(err).Warn("failed to remove runc container") + log.G(ctx).WithError(err).Warn("failed to remove runc container") } if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { - logrus.WithError(err).Warn("failed to cleanup rootfs mount") + log.G(ctx).WithError(err).Warn("failed to cleanup rootfs mount") } - return &taskAPI.DeleteResponse{ + return shim.StopStatus{ ExitedAt: time.Now(), - ExitStatus: 128 + uint32(unix.SIGKILL), + ExitStatus: 128 + int(unix.SIGKILL), }, nil } diff --git a/runtime/v2/runc/tasks/service.go b/runtime/v2/runc/tasks/service.go new file mode 100644 index 000000000..5659affa2 --- /dev/null +++ b/runtime/v2/runc/tasks/service.go @@ -0,0 +1,619 @@ +//go:build linux +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package tasks + +import ( + "context" + "os" + "sync" + + "github.com/containerd/cgroups" + cgroupsv2 "github.com/containerd/cgroups/v2" + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/oom" + oomv1 "github.com/containerd/containerd/pkg/oom/v1" + oomv2 "github.com/containerd/containerd/pkg/oom/v2" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/shutdown" + "github.com/containerd/containerd/pkg/stdio" + "github.com/containerd/containerd/pkg/userns" + "github.com/containerd/containerd/runtime/v2/runc" + "github.com/containerd/containerd/runtime/v2/runc/options" + "github.com/containerd/containerd/runtime/v2/shim" + shimapi "github.com/containerd/containerd/runtime/v2/task" + taskAPI "github.com/containerd/containerd/runtime/v2/task" + "github.com/containerd/containerd/sys/reaper" + runcC "github.com/containerd/go-runc" + "github.com/containerd/ttrpc" + "github.com/containerd/typeurl" + ptypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var ( + _ = (taskAPI.TaskService)(&service{}) + empty = &ptypes.Empty{} +) + +// NewTaskService creates a new instance of a task service +func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) { + var ( + ep oom.Watcher + err error + ) + if cgroups.Mode() == cgroups.Unified { + ep, err = oomv2.New(publisher) + } else { + ep, err = oomv1.New(publisher) + } + if err != nil { + return nil, err + } + go ep.Run(ctx) + s := &service{ + context: ctx, + events: make(chan interface{}, 128), + ec: reaper.Default.Subscribe(), + ep: ep, + shutdown: sd, + containers: make(map[string]*runc.Container), + } + go s.processExits() + runcC.Monitor = reaper.Default + if err := s.initPlatform(); err != nil { + return nil, errors.Wrap(err, "failed to initialized platform behavior") + } + go s.forward(ctx, publisher) + sd.RegisterCallback(func(context.Context) error { + close(s.events) + return nil + }) + + if address, err := shim.ReadAddress("address"); err == nil { + sd.RegisterCallback(func(context.Context) error { + return shim.RemoveSocket(address) + }) + } + return s, nil +} + +// service is the shim implementation of a remote shim over GRPC +type service struct { + mu sync.Mutex + eventSendMu sync.Mutex + + context context.Context + events chan interface{} + platform stdio.Platform + ec chan runcC.Exit + ep oom.Watcher + + containers map[string]*runc.Container + + shutdown shutdown.Service +} + +// Create a new initial process and container with the underlying OCI runtime +func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + container, err := runc.NewContainer(ctx, s.platform, r) + if err != nil { + return nil, err + } + + s.containers[r.ID] = container + + s.send(&eventstypes.TaskCreate{ + ContainerID: r.ID, + Bundle: r.Bundle, + Rootfs: r.Rootfs, + IO: &eventstypes.TaskIO{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }, + Checkpoint: r.Checkpoint, + Pid: uint32(container.Pid()), + }) + + return &taskAPI.CreateTaskResponse{ + Pid: uint32(container.Pid()), + }, nil +} + +func (s *service) RegisterTTRPC(server *ttrpc.Server) error { + shimapi.RegisterTaskService(server, s) + return nil +} + +// Start a process +func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + + // hold the send lock so that the start events are sent before any exit events in the error case + s.eventSendMu.Lock() + p, err := container.Start(ctx, r) + if err != nil { + s.eventSendMu.Unlock() + return nil, errdefs.ToGRPC(err) + } + + switch r.ExecID { + case "": + switch cg := container.Cgroup().(type) { + case cgroups.Cgroup: + if err := s.ep.Add(container.ID, cg); err != nil { + logrus.WithError(err).Error("add cg to OOM monitor") + } + case *cgroupsv2.Manager: + allControllers, err := cg.RootControllers() + if err != nil { + logrus.WithError(err).Error("failed to get root controllers") + } else { + if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil { + if userns.RunningInUserNS() { + logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers) + } else { + logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers) + } + } + } + if err := s.ep.Add(container.ID, cg); err != nil { + logrus.WithError(err).Error("add cg to OOM monitor") + } + } + + s.send(&eventstypes.TaskStart{ + ContainerID: container.ID, + Pid: uint32(p.Pid()), + }) + default: + s.send(&eventstypes.TaskExecStarted{ + ContainerID: container.ID, + ExecID: r.ExecID, + Pid: uint32(p.Pid()), + }) + } + s.eventSendMu.Unlock() + return &taskAPI.StartResponse{ + Pid: uint32(p.Pid()), + }, nil +} + +// Delete the initial process and container +func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Delete(ctx, r) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + // if we deleted an init task, send the task delete event + if r.ExecID == "" { + s.mu.Lock() + delete(s.containers, r.ID) + s.mu.Unlock() + s.send(&eventstypes.TaskDelete{ + ContainerID: container.ID, + Pid: uint32(p.Pid()), + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }) + } + return &taskAPI.DeleteResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + Pid: uint32(p.Pid()), + }, nil +} + +// Exec an additional process inside the container +func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + ok, cancel := container.ReserveProcess(r.ExecID) + if !ok { + return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) + } + process, err := container.Exec(ctx, r) + if err != nil { + cancel() + return nil, errdefs.ToGRPC(err) + } + + s.send(&eventstypes.TaskExecAdded{ + ContainerID: container.ID, + ExecID: process.ID(), + }) + return empty, nil +} + +// ResizePty of a process +func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.ResizePty(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// State returns runtime state information for a process +func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Process(r.ExecID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + st, err := p.Status(ctx) + if err != nil { + return nil, err + } + status := task.StatusUnknown + switch st { + case "created": + status = task.StatusCreated + case "running": + status = task.StatusRunning + case "stopped": + status = task.StatusStopped + case "paused": + status = task.StatusPaused + case "pausing": + status = task.StatusPausing + } + sio := p.Stdio() + return &taskAPI.StateResponse{ + ID: p.ID(), + Bundle: container.Bundle, + Pid: uint32(p.Pid()), + Status: status, + Stdin: sio.Stdin, + Stdout: sio.Stdout, + Stderr: sio.Stderr, + Terminal: sio.Terminal, + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +// Pause the container +func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Pause(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } + s.send(&eventstypes.TaskPaused{ + ContainerID: container.ID, + }) + return empty, nil +} + +// Resume the container +func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Resume(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } + s.send(&eventstypes.TaskResumed{ + ContainerID: container.ID, + }) + return empty, nil +} + +// Kill a process with the provided signal +func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Kill(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Pids returns all pids inside the container +func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + pids, err := s.getContainerPids(ctx, r.ID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + var processes []*task.ProcessInfo + for _, pid := range pids { + pInfo := task.ProcessInfo{ + Pid: pid, + } + for _, p := range container.ExecdProcesses() { + if p.Pid() == int(pid) { + d := &options.ProcessDetails{ + ExecID: p.ID(), + } + a, err := typeurl.MarshalAny(d) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal process %d info", pid) + } + pInfo.Info = a + break + } + } + processes = append(processes, &pInfo) + } + return &taskAPI.PidsResponse{ + Processes: processes, + }, nil +} + +// CloseIO of a process +func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.CloseIO(ctx, r); err != nil { + return nil, err + } + return empty, nil +} + +// Checkpoint the container +func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Checkpoint(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Update a running container +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Update(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Wait for a process to exit +func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Process(r.ExecID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + p.Wait() + + return &taskAPI.WaitResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +// Connect returns shim information such as the shim's pid +func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { + var pid int + if container, err := s.getContainer(r.ID); err == nil { + pid = container.Pid() + } + return &taskAPI.ConnectResponse{ + ShimPid: uint32(os.Getpid()), + TaskPid: uint32(pid), + }, nil +} + +func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { + s.mu.Lock() + defer s.mu.Unlock() + + // return out if the shim is still servicing containers + if len(s.containers) > 0 { + return empty, nil + } + + // please make sure that temporary resource has been cleanup or registered + // for cleanup before calling shutdown + s.shutdown.Shutdown() + + return empty, nil +} + +func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + cgx := container.Cgroup() + if cgx == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") + } + var statsx interface{} + switch cg := cgx.(type) { + case cgroups.Cgroup: + stats, err := cg.Stat(cgroups.IgnoreNotExist) + if err != nil { + return nil, err + } + statsx = stats + case *cgroupsv2.Manager: + stats, err := cg.Stat() + if err != nil { + return nil, err + } + statsx = stats + default: + return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg) + } + data, err := typeurl.MarshalAny(statsx) + if err != nil { + return nil, err + } + return &taskAPI.StatsResponse{ + Stats: data, + }, nil +} + +func (s *service) processExits() { + for e := range s.ec { + s.checkProcesses(e) + } +} + +func (s *service) send(evt interface{}) { + s.events <- evt +} + +func (s *service) sendL(evt interface{}) { + s.eventSendMu.Lock() + s.events <- evt + s.eventSendMu.Unlock() +} + +func (s *service) checkProcesses(e runcC.Exit) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, container := range s.containers { + if !container.HasPid(e.Pid) { + continue + } + + for _, p := range container.All() { + if p.Pid() != e.Pid { + continue + } + + if ip, ok := p.(*process.Init); ok { + // Ensure all children are killed + if runc.ShouldKillAllOnExit(s.context, container.Bundle) { + if err := ip.KillAll(s.context); err != nil { + logrus.WithError(err).WithField("id", ip.ID()). + Error("failed to kill init's children") + } + } + } + + p.SetExited(e.Status) + s.sendL(&eventstypes.TaskExit{ + ContainerID: container.ID, + ID: p.ID(), + Pid: uint32(e.Pid), + ExitStatus: uint32(e.Status), + ExitedAt: p.ExitedAt(), + }) + return + } + return + } +} + +func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { + container, err := s.getContainer(id) + if err != nil { + return nil, err + } + p, err := container.Process("") + if err != nil { + return nil, errdefs.ToGRPC(err) + } + ps, err := p.(*process.Init).Runtime().Ps(ctx, id) + if err != nil { + return nil, err + } + pids := make([]uint32, 0, len(ps)) + for _, pid := range ps { + pids = append(pids, uint32(pid)) + } + return pids, nil +} + +func (s *service) forward(ctx context.Context, publisher shim.Publisher) { + ns, _ := namespaces.Namespace(ctx) + ctx = namespaces.WithNamespace(context.Background(), ns) + for e := range s.events { + err := publisher.Publish(ctx, runc.GetTopic(e), e) + if err != nil { + logrus.WithError(err).Error("post event") + } + } + publisher.Close() +} + +func (s *service) getContainer(id string) (*runc.Container, error) { + s.mu.Lock() + container := s.containers[id] + s.mu.Unlock() + if container == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") + } + return container, nil +} + +// initialize a single epoll fd to manage our consoles. `initPlatform` should +// only be called once. +func (s *service) initPlatform() error { + if s.platform != nil { + return nil + } + p, err := runc.NewPlatform() + if err != nil { + return err + } + s.platform = p + s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() }) + return nil +} diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go index 857445471..17ab82075 100644 --- a/runtime/v2/runc/v2/service.go +++ b/runtime/v2/runc/v2/service.go @@ -21,627 +21,59 @@ package v2 import ( "context" - "os" - "sync" - "github.com/containerd/cgroups" - cgroupsv2 "github.com/containerd/cgroups/v2" - eventstypes "github.com/containerd/containerd/api/events" - "github.com/containerd/containerd/api/types/task" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/pkg/oom" - oomv1 "github.com/containerd/containerd/pkg/oom/v1" - oomv2 "github.com/containerd/containerd/pkg/oom/v2" - "github.com/containerd/containerd/pkg/process" "github.com/containerd/containerd/pkg/shutdown" - "github.com/containerd/containerd/pkg/stdio" - "github.com/containerd/containerd/pkg/userns" - "github.com/containerd/containerd/plugin" - "github.com/containerd/containerd/runtime/v2/runc" - "github.com/containerd/containerd/runtime/v2/runc/options" - runcservice "github.com/containerd/containerd/runtime/v2/runc/service" + "github.com/containerd/containerd/runtime/v2/runc/manager" + "github.com/containerd/containerd/runtime/v2/runc/tasks" "github.com/containerd/containerd/runtime/v2/shim" shimapi "github.com/containerd/containerd/runtime/v2/task" - taskAPI "github.com/containerd/containerd/runtime/v2/task" - "github.com/containerd/containerd/sys/reaper" - runcC "github.com/containerd/go-runc" - "github.com/containerd/ttrpc" - "github.com/containerd/typeurl" - ptypes "github.com/gogo/protobuf/types" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) -var ( - _ = (taskAPI.TaskService)(&service{}) - empty = &ptypes.Empty{} -) +// TODO(2.0): Remove this package -// New returns a new shim service that can be used via GRPC -// TODO(2.0): Remove this function, rely on plugin registration -func New(_ context.Context, id string, _ shim.Publisher, _ func()) (shim.Shim, error) { - plugin.Register(&plugin.Registration{ - Type: plugin.TTRPCPlugin, - ID: "task", - Requires: []plugin.Type{ - plugin.EventPlugin, - plugin.InternalPlugin, - }, - InitFn: func(ic *plugin.InitContext) (interface{}, error) { - pp, err := ic.GetByID(plugin.EventPlugin, "publisher") - if err != nil { - return nil, err - } - ss, err := ic.GetByID(plugin.InternalPlugin, "shutdown") - if err != nil { - return nil, err - } - return NewTaskService(ic.Context, pp.(shim.Publisher), ss.(shutdown.Service)) - }, - }) - - return runcservice.NewShimManager(id), nil +type shimTaskManager struct { + shimapi.TaskService + id string + manager shim.Manager } -// NewTaskService creates a new instance of a task service -func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) { - var ( - ep oom.Watcher - err error - ) - if cgroups.Mode() == cgroups.Unified { - ep, err = oomv2.New(publisher) - } else { - ep, err = oomv1.New(publisher) - } +func (stm *shimTaskManager) Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) { + ss, err := stm.manager.Stop(ctx, stm.id) if err != nil { return nil, err } - go ep.Run(ctx) - s := &service{ - context: ctx, - events: make(chan interface{}, 128), - ec: reaper.Default.Subscribe(), - ep: ep, - shutdown: sd, - containers: make(map[string]*runc.Container), - } - go s.processExits() - runcC.Monitor = reaper.Default - if err := s.initPlatform(); err != nil { - return nil, errors.Wrap(err, "failed to initialized platform behavior") - } - go s.forward(ctx, publisher) - sd.RegisterCallback(func(context.Context) error { - close(s.events) - return nil - }) - - if address, err := shim.ReadAddress("address"); err == nil { - sd.RegisterCallback(func(context.Context) error { - return shim.RemoveSocket(address) - }) - } - return s, nil -} - -// service is the shim implementation of a remote shim over GRPC -type service struct { - mu sync.Mutex - eventSendMu sync.Mutex - - context context.Context - events chan interface{} - platform stdio.Platform - ec chan runcC.Exit - ep oom.Watcher - - containers map[string]*runc.Container - - shutdown shutdown.Service -} - -// Create a new initial process and container with the underlying OCI runtime -func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { - s.mu.Lock() - defer s.mu.Unlock() - - container, err := runc.NewContainer(ctx, s.platform, r) - if err != nil { - return nil, err - } - - s.containers[r.ID] = container - - s.send(&eventstypes.TaskCreate{ - ContainerID: r.ID, - Bundle: r.Bundle, - Rootfs: r.Rootfs, - IO: &eventstypes.TaskIO{ - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Terminal: r.Terminal, - }, - Checkpoint: r.Checkpoint, - Pid: uint32(container.Pid()), - }) - - return &taskAPI.CreateTaskResponse{ - Pid: uint32(container.Pid()), + return &shimapi.DeleteResponse{ + Pid: uint32(ss.Pid), + ExitStatus: uint32(ss.ExitStatus), + ExitedAt: ss.ExitedAt, }, nil } -func (s *service) RegisterTTRPC(server *ttrpc.Server) error { - shimapi.RegisterTaskService(server, s) - return nil +func (stm *shimTaskManager) StartShim(ctx context.Context, opts shim.StartOpts) (string, error) { + return stm.manager.Start(ctx, opts.ID, opts) } -// Start a process -func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - - // hold the send lock so that the start events are sent before any exit events in the error case - s.eventSendMu.Lock() - p, err := container.Start(ctx, r) - if err != nil { - s.eventSendMu.Unlock() - return nil, errdefs.ToGRPC(err) - } - - switch r.ExecID { - case "": - switch cg := container.Cgroup().(type) { - case cgroups.Cgroup: - if err := s.ep.Add(container.ID, cg); err != nil { - logrus.WithError(err).Error("add cg to OOM monitor") - } - case *cgroupsv2.Manager: - allControllers, err := cg.RootControllers() - if err != nil { - logrus.WithError(err).Error("failed to get root controllers") - } else { - if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil { - if userns.RunningInUserNS() { - logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers) - } else { - logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers) - } - } - } - if err := s.ep.Add(container.ID, cg); err != nil { - logrus.WithError(err).Error("add cg to OOM monitor") - } - } - - s.send(&eventstypes.TaskStart{ - ContainerID: container.ID, - Pid: uint32(p.Pid()), - }) - default: - s.send(&eventstypes.TaskExecStarted{ - ContainerID: container.ID, - ExecID: r.ExecID, - Pid: uint32(p.Pid()), - }) - } - s.eventSendMu.Unlock() - return &taskAPI.StartResponse{ - Pid: uint32(p.Pid()), - }, nil -} - -// Delete the initial process and container -func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - p, err := container.Delete(ctx, r) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - // if we deleted an init task, send the task delete event - if r.ExecID == "" { - s.mu.Lock() - delete(s.containers, r.ID) - s.mu.Unlock() - s.send(&eventstypes.TaskDelete{ - ContainerID: container.ID, - Pid: uint32(p.Pid()), - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - }) - } - return &taskAPI.DeleteResponse{ - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - Pid: uint32(p.Pid()), - }, nil -} - -// Exec an additional process inside the container -func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - ok, cancel := container.ReserveProcess(r.ExecID) +// New returns a new shim service that can be used for +// - serving the task service over grpc/ttrpc +// - shim management +// This function is deprecated in favor direct creation +// of shim manager and registering task service via plugins. +func New(ctx context.Context, id string, publisher shim.Publisher, fn func()) (shim.Shim, error) { + sd, ok := ctx.(shutdown.Service) if !ok { - return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) + ctx, sd = shutdown.WithShutdown(ctx) + sd.RegisterCallback(func(context.Context) error { + fn() + return nil + }) } - process, err := container.Exec(ctx, r) - if err != nil { - cancel() - return nil, errdefs.ToGRPC(err) - } - - s.send(&eventstypes.TaskExecAdded{ - ContainerID: container.ID, - ExecID: process.ID(), - }) - return empty, nil -} - -// ResizePty of a process -func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) + ts, err := tasks.NewTaskService(ctx, publisher, sd) if err != nil { return nil, err } - if err := container.ResizePty(ctx, r); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// State returns runtime state information for a process -func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - p, err := container.Process(r.ExecID) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - st, err := p.Status(ctx) - if err != nil { - return nil, err - } - status := task.StatusUnknown - switch st { - case "created": - status = task.StatusCreated - case "running": - status = task.StatusRunning - case "stopped": - status = task.StatusStopped - case "paused": - status = task.StatusPaused - case "pausing": - status = task.StatusPausing - } - sio := p.Stdio() - return &taskAPI.StateResponse{ - ID: p.ID(), - Bundle: container.Bundle, - Pid: uint32(p.Pid()), - Status: status, - Stdin: sio.Stdin, - Stdout: sio.Stdout, - Stderr: sio.Stderr, - Terminal: sio.Terminal, - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), + return &shimTaskManager{ + TaskService: ts, + id: id, + manager: manager.NewShimManager("runc"), }, nil } - -// Pause the container -func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Pause(ctx); err != nil { - return nil, errdefs.ToGRPC(err) - } - s.send(&eventstypes.TaskPaused{ - ContainerID: container.ID, - }) - return empty, nil -} - -// Resume the container -func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Resume(ctx); err != nil { - return nil, errdefs.ToGRPC(err) - } - s.send(&eventstypes.TaskResumed{ - ContainerID: container.ID, - }) - return empty, nil -} - -// Kill a process with the provided signal -func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Kill(ctx, r); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// Pids returns all pids inside the container -func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - pids, err := s.getContainerPids(ctx, r.ID) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - var processes []*task.ProcessInfo - for _, pid := range pids { - pInfo := task.ProcessInfo{ - Pid: pid, - } - for _, p := range container.ExecdProcesses() { - if p.Pid() == int(pid) { - d := &options.ProcessDetails{ - ExecID: p.ID(), - } - a, err := typeurl.MarshalAny(d) - if err != nil { - return nil, errors.Wrapf(err, "failed to marshal process %d info", pid) - } - pInfo.Info = a - break - } - } - processes = append(processes, &pInfo) - } - return &taskAPI.PidsResponse{ - Processes: processes, - }, nil -} - -// CloseIO of a process -func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.CloseIO(ctx, r); err != nil { - return nil, err - } - return empty, nil -} - -// Checkpoint the container -func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Checkpoint(ctx, r); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// Update a running container -func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Update(ctx, r); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// Wait for a process to exit -func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - p, err := container.Process(r.ExecID) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - p.Wait() - - return &taskAPI.WaitResponse{ - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - }, nil -} - -// Connect returns shim information such as the shim's pid -func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { - var pid int - if container, err := s.getContainer(r.ID); err == nil { - pid = container.Pid() - } - return &taskAPI.ConnectResponse{ - ShimPid: uint32(os.Getpid()), - TaskPid: uint32(pid), - }, nil -} - -func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { - s.mu.Lock() - defer s.mu.Unlock() - - // return out if the shim is still servicing containers - if len(s.containers) > 0 { - return empty, nil - } - - // please make sure that temporary resource has been cleanup or registered - // for cleanup before calling shutdown - s.shutdown.Shutdown() - - return empty, nil -} - -func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - cgx := container.Cgroup() - if cgx == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") - } - var statsx interface{} - switch cg := cgx.(type) { - case cgroups.Cgroup: - stats, err := cg.Stat(cgroups.IgnoreNotExist) - if err != nil { - return nil, err - } - statsx = stats - case *cgroupsv2.Manager: - stats, err := cg.Stat() - if err != nil { - return nil, err - } - statsx = stats - default: - return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg) - } - data, err := typeurl.MarshalAny(statsx) - if err != nil { - return nil, err - } - return &taskAPI.StatsResponse{ - Stats: data, - }, nil -} - -func (s *service) processExits() { - for e := range s.ec { - s.checkProcesses(e) - } -} - -func (s *service) send(evt interface{}) { - s.events <- evt -} - -func (s *service) sendL(evt interface{}) { - s.eventSendMu.Lock() - s.events <- evt - s.eventSendMu.Unlock() -} - -func (s *service) checkProcesses(e runcC.Exit) { - s.mu.Lock() - defer s.mu.Unlock() - - for _, container := range s.containers { - if !container.HasPid(e.Pid) { - continue - } - - for _, p := range container.All() { - if p.Pid() != e.Pid { - continue - } - - if ip, ok := p.(*process.Init); ok { - // Ensure all children are killed - if runc.ShouldKillAllOnExit(s.context, container.Bundle) { - if err := ip.KillAll(s.context); err != nil { - logrus.WithError(err).WithField("id", ip.ID()). - Error("failed to kill init's children") - } - } - } - - p.SetExited(e.Status) - s.sendL(&eventstypes.TaskExit{ - ContainerID: container.ID, - ID: p.ID(), - Pid: uint32(e.Pid), - ExitStatus: uint32(e.Status), - ExitedAt: p.ExitedAt(), - }) - return - } - return - } -} - -func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { - container, err := s.getContainer(id) - if err != nil { - return nil, err - } - p, err := container.Process("") - if err != nil { - return nil, errdefs.ToGRPC(err) - } - ps, err := p.(*process.Init).Runtime().Ps(ctx, id) - if err != nil { - return nil, err - } - pids := make([]uint32, 0, len(ps)) - for _, pid := range ps { - pids = append(pids, uint32(pid)) - } - return pids, nil -} - -func (s *service) forward(ctx context.Context, publisher shim.Publisher) { - ns, _ := namespaces.Namespace(ctx) - ctx = namespaces.WithNamespace(context.Background(), ns) - for e := range s.events { - err := publisher.Publish(ctx, runc.GetTopic(e), e) - if err != nil { - logrus.WithError(err).Error("post event") - } - } - publisher.Close() -} - -func (s *service) getContainer(id string) (*runc.Container, error) { - s.mu.Lock() - container := s.containers[id] - s.mu.Unlock() - if container == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") - } - return container, nil -} - -// initialize a single epoll fd to manage our consoles. `initPlatform` should -// only be called once. -func (s *service) initPlatform() error { - if s.platform != nil { - return nil - } - p, err := runc.NewPlatform() - if err != nil { - return err - } - s.platform = p - s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() }) - return nil -} diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index 18a585082..e25626cf3 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -48,21 +48,37 @@ type Publisher interface { // StartOpts describes shim start configuration received from containerd type StartOpts struct { - ID string + ID string // TODO(2.0): Remove ID, passed directly to start for call symmetry ContainerdBinary string Address string TTRPCAddress string } +type StopStatus struct { + Pid int + ExitStatus int + ExitedAt time.Time +} + // Init func for the creation of a shim server +// TODO(2.0): Remove init function type Init func(context.Context, string, Publisher, func()) (Shim, error) // Shim server interface +// TODO(2.0): Remove unified shim interface type Shim interface { - Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) // TODO(2.0): Update interface to pass ID directly to Cleanup + shimapi.TaskService + Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) StartShim(ctx context.Context, opts StartOpts) (string, error) } +// Manager is the interface which manages the shim process +type Manager interface { + Name() string + Start(ctx context.Context, id string, opts StartOpts) (string, error) + Stop(ctx context.Context, id string) (StopStatus, error) +} + // OptsKey is the context key for the Opts value. type OptsKey struct{} @@ -90,18 +106,18 @@ type ttrpcService interface { } type taskService struct { - local shimapi.TaskService + shimapi.TaskService } -func (t *taskService) RegisterTTRPC(server *ttrpc.Server) error { - shimapi.RegisterTaskService(server, t.local) +func (t taskService) RegisterTTRPC(server *ttrpc.Server) error { + shimapi.RegisterTaskService(server, t.TaskService) return nil } var ( debugFlag bool versionFlag bool - idFlag string + id string namespaceFlag string socketFlag string bundlePath string @@ -118,7 +134,7 @@ func parseFlags() { flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs") flag.BoolVar(&versionFlag, "v", false, "show the shim version and exit") flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim") - flag.StringVar(&idFlag, "id", "", "id of the task") + flag.StringVar(&id, "id", "", "id of the task") flag.StringVar(&socketFlag, "socket", "", "socket path to serve") flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir") @@ -143,37 +159,85 @@ func setRuntime() { } } -func setLogger(ctx context.Context, id string) error { - logrus.SetFormatter(&logrus.TextFormatter{ +func setLogger(ctx context.Context, id string) (context.Context, error) { + l := log.G(ctx) + l.Logger.SetFormatter(&logrus.TextFormatter{ TimestampFormat: log.RFC3339NanoFixed, FullTimestamp: true, }) if debugFlag { - logrus.SetLevel(logrus.DebugLevel) + l.Logger.SetLevel(logrus.DebugLevel) } f, err := openLog(ctx, id) if err != nil { - return err + return ctx, err } - logrus.SetOutput(f) - return nil + l.Logger.SetOutput(f) + return log.WithLogger(ctx, l), nil } // Run initializes and runs a shim server -// TODO(2.0): Remove initFunc from arguments -func Run(id string, initFunc Init, opts ...BinaryOpts) { +// TODO(2.0): Remove function +func Run(name string, initFunc Init, opts ...BinaryOpts) { var config Config for _, o := range opts { o(&config) } - if err := run(id, initFunc, config); err != nil { - fmt.Fprintf(os.Stderr, "%s: %s\n", id, err) + ctx := context.Background() + ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", name)) + + if err := run(ctx, nil, initFunc, name, config); err != nil { + fmt.Fprintf(os.Stderr, "%s: %s", name, err) os.Exit(1) } } -func run(id string, initFunc Init, config Config) error { +// TODO(2.0): Remove this type +type shimToManager struct { + shim Shim + name string +} + +func (stm shimToManager) Name() string { + return stm.name +} + +func (stm shimToManager) Start(ctx context.Context, id string, opts StartOpts) (string, error) { + opts.ID = id + return stm.shim.StartShim(ctx, opts) +} + +func (stm shimToManager) Stop(ctx context.Context, id string) (StopStatus, error) { + // shim must already have id + dr, err := stm.shim.Cleanup(ctx) + if err != nil { + return StopStatus{}, err + } + return StopStatus{ + Pid: int(dr.Pid), + ExitStatus: int(dr.ExitStatus), + ExitedAt: dr.ExitedAt, + }, nil +} + +// RunManager initialzes and runs a shim server +// TODO(2.0): Rename to Run +func RunManager(ctx context.Context, manager Manager, opts ...BinaryOpts) { + var config Config + for _, o := range opts { + o(&config) + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", manager.Name())) + + if err := run(ctx, manager, nil, "", config); err != nil { + fmt.Fprintf(os.Stderr, "%s: %s", manager.Name(), err) + os.Exit(1) + } +} + +func run(ctx context.Context, manager Manager, initFunc Init, name string, config Config) error { parseFlags() if versionFlag { fmt.Printf("%s:\n", os.Args[0]) @@ -208,29 +272,49 @@ func run(id string, initFunc Init, config Config) error { } defer publisher.Close() - ctx := namespaces.WithNamespace(context.Background(), namespaceFlag) + ctx = namespaces.WithNamespace(ctx, namespaceFlag) ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) - ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id)) ctx, sd := shutdown.WithShutdown(ctx) defer sd.Shutdown() - service, err := initFunc(ctx, idFlag, publisher, sd.Shutdown) - if err != nil { - return err + + if manager == nil { + service, err := initFunc(ctx, id, publisher, sd.Shutdown) + if err != nil { + return err + } + plugin.Register(&plugin.Registration{ + Type: plugin.TTRPCPlugin, + ID: "task", + Requires: []plugin.Type{ + plugin.EventPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + return taskService{service}, nil + }, + }) + manager = shimToManager{ + shim: service, + name: name, + } } // Handle explicit actions switch action { case "delete": - logger := logrus.WithFields(logrus.Fields{ + logger := log.G(ctx).WithFields(logrus.Fields{ "pid": os.Getpid(), "namespace": namespaceFlag, }) go handleSignals(ctx, logger, signals) - response, err := service.Cleanup(ctx) + ss, err := manager.Stop(ctx, id) if err != nil { return err } - data, err := proto.Marshal(response) + data, err := proto.Marshal(&shimapi.DeleteResponse{ + Pid: uint32(ss.Pid), + ExitStatus: uint32(ss.ExitStatus), + ExitedAt: ss.ExitedAt, + }) if err != nil { return err } @@ -240,13 +324,12 @@ func run(id string, initFunc Init, config Config) error { return nil case "start": opts := StartOpts{ - ID: idFlag, ContainerdBinary: containerdBinaryFlag, Address: addressFlag, TTRPCAddress: ttrpcAddress, } - address, err := service.StartShim(ctx, opts) + address, err := manager.Start(ctx, id, opts) if err != nil { return err } @@ -257,7 +340,8 @@ func run(id string, initFunc Init, config Config) error { } if !config.NoSetupLogger { - if err := setLogger(ctx, idFlag); err != nil { + ctx, err = setLogger(ctx, id) + if err != nil { return err } } @@ -279,20 +363,6 @@ func run(id string, initFunc Init, config Config) error { }, }) - // If service is an implementation of the task service, register it as a plugin - if ts, ok := service.(shimapi.TaskService); ok { - plugin.Register(&plugin.Registration{ - Type: plugin.TTRPCPlugin, - ID: "task", - Requires: []plugin.Type{ - plugin.EventPlugin, - }, - InitFn: func(ic *plugin.InitContext) (interface{}, error) { - return &taskService{ts}, nil - }, - }) - } - var ( initialized = plugin.NewPluginSet() ttrpcServices = []ttrpcService{} @@ -397,10 +467,10 @@ func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal) er defer l.Close() if err := server.Serve(ctx, l); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure") + log.G(ctx).WithError(err).Fatal("containerd-shim: ttrpc server failure") } }() - logger := logrus.WithFields(logrus.Fields{ + logger := log.G(ctx).WithFields(logrus.Fields{ "pid": os.Getpid(), "path": path, "namespace": namespaceFlag, From f83ab813d24ff83f755e35b838cf7d6df2ff358b Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Tue, 16 Nov 2021 12:13:44 -0800 Subject: [PATCH 4/4] Use task plugin for runc shim Signed-off-by: Derek McGowan --- cmd/containerd-shim-runc-v2/main.go | 7 ++- runtime/v2/runc/task/plugin/plugin_linux.go | 47 +++++++++++++++++++++ runtime/v2/runc/{tasks => task}/service.go | 2 +- runtime/v2/runc/v2/service.go | 4 +- 4 files changed, 55 insertions(+), 5 deletions(-) create mode 100644 runtime/v2/runc/task/plugin/plugin_linux.go rename runtime/v2/runc/{tasks => task}/service.go (99%) diff --git a/cmd/containerd-shim-runc-v2/main.go b/cmd/containerd-shim-runc-v2/main.go index d2173c86e..c94942e3a 100644 --- a/cmd/containerd-shim-runc-v2/main.go +++ b/cmd/containerd-shim-runc-v2/main.go @@ -20,10 +20,13 @@ package main import ( - v2 "github.com/containerd/containerd/runtime/v2/runc/v2" + "context" + + "github.com/containerd/containerd/runtime/v2/runc/manager" + _ "github.com/containerd/containerd/runtime/v2/runc/task/plugin" "github.com/containerd/containerd/runtime/v2/shim" ) func main() { - shim.Run("io.containerd.runc.v2", v2.New) + shim.RunManager(context.Background(), manager.NewShimManager("io.containerd.runc.v2")) } diff --git a/runtime/v2/runc/task/plugin/plugin_linux.go b/runtime/v2/runc/task/plugin/plugin_linux.go new file mode 100644 index 000000000..603a27459 --- /dev/null +++ b/runtime/v2/runc/task/plugin/plugin_linux.go @@ -0,0 +1,47 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package plugin + +import ( + "github.com/containerd/containerd/pkg/shutdown" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/runtime/v2/runc/task" + "github.com/containerd/containerd/runtime/v2/shim" +) + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.TTRPCPlugin, + ID: "task", + Requires: []plugin.Type{ + plugin.EventPlugin, + plugin.InternalPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + pp, err := ic.GetByID(plugin.EventPlugin, "publisher") + if err != nil { + return nil, err + } + ss, err := ic.GetByID(plugin.InternalPlugin, "shutdown") + if err != nil { + return nil, err + } + return task.NewTaskService(ic.Context, pp.(shim.Publisher), ss.(shutdown.Service)) + }, + }) + +} diff --git a/runtime/v2/runc/tasks/service.go b/runtime/v2/runc/task/service.go similarity index 99% rename from runtime/v2/runc/tasks/service.go rename to runtime/v2/runc/task/service.go index 5659affa2..a1b1c468a 100644 --- a/runtime/v2/runc/tasks/service.go +++ b/runtime/v2/runc/task/service.go @@ -17,7 +17,7 @@ limitations under the License. */ -package tasks +package task import ( "context" diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go index 17ab82075..6dd8d6d14 100644 --- a/runtime/v2/runc/v2/service.go +++ b/runtime/v2/runc/v2/service.go @@ -24,7 +24,7 @@ import ( "github.com/containerd/containerd/pkg/shutdown" "github.com/containerd/containerd/runtime/v2/runc/manager" - "github.com/containerd/containerd/runtime/v2/runc/tasks" + "github.com/containerd/containerd/runtime/v2/runc/task" "github.com/containerd/containerd/runtime/v2/shim" shimapi "github.com/containerd/containerd/runtime/v2/task" ) @@ -67,7 +67,7 @@ func New(ctx context.Context, id string, publisher shim.Publisher, fn func()) (s return nil }) } - ts, err := tasks.NewTaskService(ctx, publisher, sd) + ts, err := task.NewTaskService(ctx, publisher, sd) if err != nil { return nil, err }