From 6835a94707c3e9da641a65d31dfc3bed2daf6be8 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Thu, 16 Sep 2021 17:56:14 -0700 Subject: [PATCH] Split runc shim into plugin components Signed-off-by: Derek McGowan --- pkg/shutdown/shutdown.go | 3 +- runtime/v2/runc/service/manager.go | 276 +++++++++++++++++++++++++++ runtime/v2/runc/v2/service.go | 297 +++++------------------------ runtime/v2/shim/shim.go | 22 ++- 4 files changed, 346 insertions(+), 252 deletions(-) create mode 100644 runtime/v2/runc/service/manager.go diff --git a/pkg/shutdown/shutdown.go b/pkg/shutdown/shutdown.go index 982a95099..bc1af75ab 100644 --- a/pkg/shutdown/shutdown.go +++ b/pkg/shutdown/shutdown.go @@ -75,7 +75,8 @@ func (s *shutdownService) Shutdown() { ctx, cancel := context.WithTimeout(context.Background(), s.timeout) defer cancel() grp, ctx := errgroup.WithContext(ctx) - for _, fn := range callbacks { + for i := range callbacks { + fn := callbacks[i] grp.Go(func() error { return fn(ctx) }) } err := grp.Wait() diff --git a/runtime/v2/runc/service/manager.go b/runtime/v2/runc/service/manager.go new file mode 100644 index 000000000..8a82f37e4 --- /dev/null +++ b/runtime/v2/runc/service/manager.go @@ -0,0 +1,276 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package service + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + goruntime "runtime" + "syscall" + "time" + + "github.com/containerd/cgroups" + cgroupsv2 "github.com/containerd/cgroups/v2" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/schedcore" + "github.com/containerd/containerd/runtime/v2/runc" + "github.com/containerd/containerd/runtime/v2/runc/options" + "github.com/containerd/containerd/runtime/v2/shim" + taskAPI "github.com/containerd/containerd/runtime/v2/task" + runcC "github.com/containerd/go-runc" + "github.com/containerd/typeurl" + "github.com/gogo/protobuf/proto" + ptypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + exec "golang.org/x/sys/execabs" + "golang.org/x/sys/unix" +) + +// NewShimManager returns an implementation of the shim manager +// using runc +func NewShimManager(id string) shim.Shim { + return &manager{ + id: id, + } +} + +// group labels specifies how the shim groups services. +// currently supports a runc.v2 specific .group label and the +// standard k8s pod label. Order matters in this list +var groupLabels = []string{ + "io.containerd.runc.v2.group", + "io.kubernetes.cri.sandbox-id", +} + +type spec struct { + Annotations map[string]string `json:"annotations,omitempty"` +} + +type manager struct { + id string +} + +func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + self, err := os.Executable() + if err != nil { + return nil, err + } + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + args := []string{ + "-namespace", ns, + "-id", id, + "-address", containerdAddress, + } + cmd := exec.Command(self, args...) + cmd.Dir = cwd + cmd.Env = append(os.Environ(), "GOMAXPROCS=4") + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + return cmd, nil +} + +func readSpec() (*spec, error) { + f, err := os.Open("config.json") + if err != nil { + return nil, err + } + defer f.Close() + var s spec + if err := json.NewDecoder(f).Decode(&s); err != nil { + return nil, err + } + return &s, nil +} + +func (manager) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, retErr error) { + cmd, err := newCommand(ctx, opts.ID, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress) + if err != nil { + return "", err + } + grouping := opts.ID + spec, err := readSpec() + if err != nil { + return "", err + } + for _, group := range groupLabels { + if groupID, ok := spec.Annotations[group]; ok { + grouping = groupID + break + } + } + address, err := shim.SocketAddress(ctx, opts.Address, grouping) + if err != nil { + return "", err + } + + socket, err := shim.NewSocket(address) + if err != nil { + // the only time where this would happen is if there is a bug and the socket + // was not cleaned up in the cleanup method of the shim or we are using the + // grouping functionality where the new process should be run with the same + // shim as an existing container + if !shim.SocketEaddrinuse(err) { + return "", errors.Wrap(err, "create new shim socket") + } + if shim.CanConnect(address) { + if err := shim.WriteAddress("address", address); err != nil { + return "", errors.Wrap(err, "write existing socket for shim") + } + return address, nil + } + if err := shim.RemoveSocket(address); err != nil { + return "", errors.Wrap(err, "remove pre-existing socket") + } + if socket, err = shim.NewSocket(address); err != nil { + return "", errors.Wrap(err, "try create new shim socket 2x") + } + } + defer func() { + if retErr != nil { + socket.Close() + _ = shim.RemoveSocket(address) + } + }() + + // make sure that reexec shim-v2 binary use the value if need + if err := shim.WriteAddress("address", address); err != nil { + return "", err + } + + f, err := socket.File() + if err != nil { + return "", err + } + + cmd.ExtraFiles = append(cmd.ExtraFiles, f) + + goruntime.LockOSThread() + if os.Getenv("SCHED_CORE") != "" { + if err := schedcore.Create(schedcore.ProcessGroup); err != nil { + return "", errors.Wrap(err, "enable sched core support") + } + } + + if err := cmd.Start(); err != nil { + f.Close() + return "", err + } + + goruntime.UnlockOSThread() + + defer func() { + if retErr != nil { + cmd.Process.Kill() + } + }() + // make sure to wait after start + go cmd.Wait() + if data, err := ioutil.ReadAll(os.Stdin); err == nil { + if len(data) > 0 { + var any ptypes.Any + if err := proto.Unmarshal(data, &any); err != nil { + return "", err + } + v, err := typeurl.UnmarshalAny(&any) + if err != nil { + return "", err + } + if opts, ok := v.(*options.Options); ok { + if opts.ShimCgroup != "" { + if cgroups.Mode() == cgroups.Unified { + cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup) + if err != nil { + return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) + } + if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil { + return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) + } + } else { + cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup)) + if err != nil { + return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) + } + if err := cg.Add(cgroups.Process{ + Pid: cmd.Process.Pid, + }); err != nil { + return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) + } + } + } + } + } + } + if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil { + return "", errors.Wrap(err, "failed to adjust OOM score for shim") + } + return address, nil +} + +func (m manager) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + + path := filepath.Join(filepath.Dir(cwd), m.id) + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + runtime, err := runc.ReadRuntime(path) + if err != nil { + return nil, err + } + opts, err := runc.ReadOptions(path) + if err != nil { + return nil, err + } + root := process.RuncRoot + if opts != nil && opts.Root != "" { + root = opts.Root + } + + r := process.NewRunc(root, path, ns, runtime, "", false) + if err := r.Delete(ctx, m.id, &runcC.DeleteOpts{ + Force: true, + }); err != nil { + logrus.WithError(err).Warn("failed to remove runc container") + } + if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { + logrus.WithError(err).Warn("failed to cleanup rootfs mount") + } + return &taskAPI.DeleteResponse{ + ExitedAt: time.Now(), + ExitStatus: 128 + uint32(unix.SIGKILL), + }, nil +} diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go index 8f81bd9c9..857445471 100644 --- a/runtime/v2/runc/v2/service.go +++ b/runtime/v2/runc/v2/service.go @@ -21,42 +21,36 @@ package v2 import ( "context" - "encoding/json" - "io" "os" - "path/filepath" - goruntime "runtime" "sync" - "syscall" - "time" "github.com/containerd/cgroups" cgroupsv2 "github.com/containerd/cgroups/v2" eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/oom" oomv1 "github.com/containerd/containerd/pkg/oom/v1" oomv2 "github.com/containerd/containerd/pkg/oom/v2" "github.com/containerd/containerd/pkg/process" - "github.com/containerd/containerd/pkg/schedcore" + "github.com/containerd/containerd/pkg/shutdown" "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/pkg/userns" + "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime/v2/runc" "github.com/containerd/containerd/runtime/v2/runc/options" + runcservice "github.com/containerd/containerd/runtime/v2/runc/service" "github.com/containerd/containerd/runtime/v2/shim" + shimapi "github.com/containerd/containerd/runtime/v2/task" taskAPI "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/containerd/sys/reaper" runcC "github.com/containerd/go-runc" + "github.com/containerd/ttrpc" "github.com/containerd/typeurl" - "github.com/gogo/protobuf/proto" ptypes "github.com/gogo/protobuf/types" "github.com/pkg/errors" "github.com/sirupsen/logrus" - exec "golang.org/x/sys/execabs" - "golang.org/x/sys/unix" ) var ( @@ -64,20 +58,34 @@ var ( empty = &ptypes.Empty{} ) -// group labels specifies how the shim groups services. -// currently supports a runc.v2 specific .group label and the -// standard k8s pod label. Order matters in this list -var groupLabels = []string{ - "io.containerd.runc.v2.group", - "io.kubernetes.cri.sandbox-id", -} - -type spec struct { - Annotations map[string]string `json:"annotations,omitempty"` -} - // New returns a new shim service that can be used via GRPC -func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { +// TODO(2.0): Remove this function, rely on plugin registration +func New(_ context.Context, id string, _ shim.Publisher, _ func()) (shim.Shim, error) { + plugin.Register(&plugin.Registration{ + Type: plugin.TTRPCPlugin, + ID: "task", + Requires: []plugin.Type{ + plugin.EventPlugin, + plugin.InternalPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + pp, err := ic.GetByID(plugin.EventPlugin, "publisher") + if err != nil { + return nil, err + } + ss, err := ic.GetByID(plugin.InternalPlugin, "shutdown") + if err != nil { + return nil, err + } + return NewTaskService(ic.Context, pp.(shim.Publisher), ss.(shutdown.Service)) + }, + }) + + return runcservice.NewShimManager(id), nil +} + +// NewTaskService creates a new instance of a task service +func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) { var ( ep oom.Watcher err error @@ -92,24 +100,28 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func } go ep.Run(ctx) s := &service{ - id: id, context: ctx, events: make(chan interface{}, 128), ec: reaper.Default.Subscribe(), ep: ep, - cancel: shutdown, + shutdown: sd, containers: make(map[string]*runc.Container), } go s.processExits() runcC.Monitor = reaper.Default if err := s.initPlatform(); err != nil { - shutdown() return nil, errors.Wrap(err, "failed to initialized platform behavior") } go s.forward(ctx, publisher) + sd.RegisterCallback(func(context.Context) error { + close(s.events) + return nil + }) if address, err := shim.ReadAddress("address"); err == nil { - s.shimAddress = address + sd.RegisterCallback(func(context.Context) error { + return shim.RemoveSocket(address) + }) } return s, nil } @@ -125,216 +137,9 @@ type service struct { ec chan runcC.Exit ep oom.Watcher - // id only used in cleanup case - id string - containers map[string]*runc.Container - shimAddress string - cancel func() -} - -func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) { - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - self, err := os.Executable() - if err != nil { - return nil, err - } - cwd, err := os.Getwd() - if err != nil { - return nil, err - } - args := []string{ - "-namespace", ns, - "-id", id, - "-address", containerdAddress, - } - cmd := exec.Command(self, args...) - cmd.Dir = cwd - cmd.Env = append(os.Environ(), "GOMAXPROCS=4") - cmd.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, - } - return cmd, nil -} - -func readSpec() (*spec, error) { - f, err := os.Open("config.json") - if err != nil { - return nil, err - } - defer f.Close() - var s spec - if err := json.NewDecoder(f).Decode(&s); err != nil { - return nil, err - } - return &s, nil -} - -func (s *service) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, retErr error) { - cmd, err := newCommand(ctx, opts.ID, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress) - if err != nil { - return "", err - } - grouping := opts.ID - spec, err := readSpec() - if err != nil { - return "", err - } - for _, group := range groupLabels { - if groupID, ok := spec.Annotations[group]; ok { - grouping = groupID - break - } - } - address, err := shim.SocketAddress(ctx, opts.Address, grouping) - if err != nil { - return "", err - } - - socket, err := shim.NewSocket(address) - if err != nil { - // the only time where this would happen is if there is a bug and the socket - // was not cleaned up in the cleanup method of the shim or we are using the - // grouping functionality where the new process should be run with the same - // shim as an existing container - if !shim.SocketEaddrinuse(err) { - return "", errors.Wrap(err, "create new shim socket") - } - if shim.CanConnect(address) { - if err := shim.WriteAddress("address", address); err != nil { - return "", errors.Wrap(err, "write existing socket for shim") - } - return address, nil - } - if err := shim.RemoveSocket(address); err != nil { - return "", errors.Wrap(err, "remove pre-existing socket") - } - if socket, err = shim.NewSocket(address); err != nil { - return "", errors.Wrap(err, "try create new shim socket 2x") - } - } - defer func() { - if retErr != nil { - socket.Close() - _ = shim.RemoveSocket(address) - } - }() - - // make sure that reexec shim-v2 binary use the value if need - if err := shim.WriteAddress("address", address); err != nil { - return "", err - } - - f, err := socket.File() - if err != nil { - return "", err - } - - cmd.ExtraFiles = append(cmd.ExtraFiles, f) - - goruntime.LockOSThread() - if os.Getenv("SCHED_CORE") != "" { - if err := schedcore.Create(schedcore.ProcessGroup); err != nil { - return "", errors.Wrap(err, "enable sched core support") - } - } - - if err := cmd.Start(); err != nil { - f.Close() - return "", err - } - - goruntime.UnlockOSThread() - - defer func() { - if retErr != nil { - cmd.Process.Kill() - } - }() - // make sure to wait after start - go cmd.Wait() - if data, err := io.ReadAll(os.Stdin); err == nil { - if len(data) > 0 { - var any ptypes.Any - if err := proto.Unmarshal(data, &any); err != nil { - return "", err - } - v, err := typeurl.UnmarshalAny(&any) - if err != nil { - return "", err - } - if opts, ok := v.(*options.Options); ok { - if opts.ShimCgroup != "" { - if cgroups.Mode() == cgroups.Unified { - cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup) - if err != nil { - return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) - } - if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil { - return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) - } - } else { - cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup)) - if err != nil { - return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) - } - if err := cg.Add(cgroups.Process{ - Pid: cmd.Process.Pid, - }); err != nil { - return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) - } - } - } - } - } - } - if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil { - return "", errors.Wrap(err, "failed to adjust OOM score for shim") - } - return address, nil -} - -func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { - cwd, err := os.Getwd() - if err != nil { - return nil, err - } - - path := filepath.Join(filepath.Dir(cwd), s.id) - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - runtime, err := runc.ReadRuntime(path) - if err != nil { - return nil, err - } - opts, err := runc.ReadOptions(path) - if err != nil { - return nil, err - } - root := process.RuncRoot - if opts != nil && opts.Root != "" { - root = opts.Root - } - - r := process.NewRunc(root, path, ns, runtime, "", false) - if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{ - Force: true, - }); err != nil { - logrus.WithError(err).Warn("failed to remove runc container") - } - if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { - logrus.WithError(err).Warn("failed to cleanup rootfs mount") - } - return &taskAPI.DeleteResponse{ - ExitedAt: time.Now(), - ExitStatus: 128 + uint32(unix.SIGKILL), - }, nil + shutdown shutdown.Service } // Create a new initial process and container with the underlying OCI runtime @@ -368,6 +173,11 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * }, nil } +func (s *service) RegisterTTRPC(server *ttrpc.Server) error { + shimapi.RegisterTaskService(server, s) + return nil +} + // Start a process func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { container, err := s.getContainer(r.ID) @@ -683,18 +493,10 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt return empty, nil } - if s.platform != nil { - s.platform.Close() - } + // please make sure that temporary resource has been cleanup or registered + // for cleanup before calling shutdown + s.shutdown.Shutdown() - if s.shimAddress != "" { - _ = shim.RemoveSocket(s.shimAddress) - } - - // please make sure that temporary resource has been cleanup - // before shutdown service. - s.cancel() - close(s.events) return empty, nil } @@ -840,5 +642,6 @@ func (s *service) initPlatform() error { return err } s.platform = p + s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() }) return nil } diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index 691040bc7..18a585082 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -30,6 +30,7 @@ import ( "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/shutdown" "github.com/containerd/containerd/plugin" shimapi "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/containerd/version" @@ -58,7 +59,7 @@ type Init func(context.Context, string, Publisher, func()) (Shim, error) // Shim server interface type Shim interface { - Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) + Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) // TODO(2.0): Update interface to pass ID directly to Cleanup StartShim(ctx context.Context, opts StartOpts) (string, error) } @@ -159,6 +160,7 @@ func setLogger(ctx context.Context, id string) error { } // Run initializes and runs a shim server +// TODO(2.0): Remove initFunc from arguments func Run(id string, initFunc Init, opts ...BinaryOpts) { var config Config for _, o := range opts { @@ -209,8 +211,9 @@ func run(id string, initFunc Init, config Config) error { ctx := namespaces.WithNamespace(context.Background(), namespaceFlag) ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id)) - ctx, cancel := context.WithCancel(ctx) - service, err := initFunc(ctx, idFlag, publisher, cancel) + ctx, sd := shutdown.WithShutdown(ctx) + defer sd.Shutdown() + service, err := initFunc(ctx, idFlag, publisher, sd.Shutdown) if err != nil { return err } @@ -259,6 +262,14 @@ func run(id string, initFunc Init, config Config) error { } } + plugin.Register(&plugin.Registration{ + Type: plugin.InternalPlugin, + ID: "shutdown", + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + return sd, nil + }, + }) + // Register event plugin plugin.Register(&plugin.Registration{ Type: plugin.EventPlugin, @@ -273,6 +284,9 @@ func run(id string, initFunc Init, config Config) error { plugin.Register(&plugin.Registration{ Type: plugin.TTRPCPlugin, ID: "task", + Requires: []plugin.Type{ + plugin.EventPlugin, + }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { return &taskService{ts}, nil }, @@ -345,7 +359,7 @@ func run(id string, initFunc Init, config Config) error { } if err := serve(ctx, server, signals); err != nil { - if err != context.Canceled { + if err != shutdown.ErrShutdown { return err } }