Split runc shim into plugin components

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2021-09-16 17:56:14 -07:00
parent 6eea8f3f62
commit 6835a94707
4 changed files with 346 additions and 252 deletions

View File

@@ -21,42 +21,36 @@ package v2
import (
"context"
"encoding/json"
"io"
"os"
"path/filepath"
goruntime "runtime"
"sync"
"syscall"
"time"
"github.com/containerd/cgroups"
cgroupsv2 "github.com/containerd/cgroups/v2"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/oom"
oomv1 "github.com/containerd/containerd/pkg/oom/v1"
oomv2 "github.com/containerd/containerd/pkg/oom/v2"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/pkg/schedcore"
"github.com/containerd/containerd/pkg/shutdown"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/pkg/userns"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime/v2/runc"
"github.com/containerd/containerd/runtime/v2/runc/options"
runcservice "github.com/containerd/containerd/runtime/v2/runc/service"
"github.com/containerd/containerd/runtime/v2/shim"
shimapi "github.com/containerd/containerd/runtime/v2/task"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/containerd/sys/reaper"
runcC "github.com/containerd/go-runc"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/proto"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
exec "golang.org/x/sys/execabs"
"golang.org/x/sys/unix"
)
var (
@@ -64,20 +58,34 @@ var (
empty = &ptypes.Empty{}
)
// group labels specifies how the shim groups services.
// currently supports a runc.v2 specific .group label and the
// standard k8s pod label. Order matters in this list
var groupLabels = []string{
"io.containerd.runc.v2.group",
"io.kubernetes.cri.sandbox-id",
}
type spec struct {
Annotations map[string]string `json:"annotations,omitempty"`
}
// New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
// TODO(2.0): Remove this function, rely on plugin registration
func New(_ context.Context, id string, _ shim.Publisher, _ func()) (shim.Shim, error) {
plugin.Register(&plugin.Registration{
Type: plugin.TTRPCPlugin,
ID: "task",
Requires: []plugin.Type{
plugin.EventPlugin,
plugin.InternalPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
pp, err := ic.GetByID(plugin.EventPlugin, "publisher")
if err != nil {
return nil, err
}
ss, err := ic.GetByID(plugin.InternalPlugin, "shutdown")
if err != nil {
return nil, err
}
return NewTaskService(ic.Context, pp.(shim.Publisher), ss.(shutdown.Service))
},
})
return runcservice.NewShimManager(id), nil
}
// NewTaskService creates a new instance of a task service
func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) {
var (
ep oom.Watcher
err error
@@ -92,24 +100,28 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func
}
go ep.Run(ctx)
s := &service{
id: id,
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
cancel: shutdown,
shutdown: sd,
containers: make(map[string]*runc.Container),
}
go s.processExits()
runcC.Monitor = reaper.Default
if err := s.initPlatform(); err != nil {
shutdown()
return nil, errors.Wrap(err, "failed to initialized platform behavior")
}
go s.forward(ctx, publisher)
sd.RegisterCallback(func(context.Context) error {
close(s.events)
return nil
})
if address, err := shim.ReadAddress("address"); err == nil {
s.shimAddress = address
sd.RegisterCallback(func(context.Context) error {
return shim.RemoveSocket(address)
})
}
return s, nil
}
@@ -125,216 +137,9 @@ type service struct {
ec chan runcC.Exit
ep oom.Watcher
// id only used in cleanup case
id string
containers map[string]*runc.Container
shimAddress string
cancel func()
}
func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
self, err := os.Executable()
if err != nil {
return nil, err
}
cwd, err := os.Getwd()
if err != nil {
return nil, err
}
args := []string{
"-namespace", ns,
"-id", id,
"-address", containerdAddress,
}
cmd := exec.Command(self, args...)
cmd.Dir = cwd
cmd.Env = append(os.Environ(), "GOMAXPROCS=4")
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
return cmd, nil
}
func readSpec() (*spec, error) {
f, err := os.Open("config.json")
if err != nil {
return nil, err
}
defer f.Close()
var s spec
if err := json.NewDecoder(f).Decode(&s); err != nil {
return nil, err
}
return &s, nil
}
func (s *service) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, retErr error) {
cmd, err := newCommand(ctx, opts.ID, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress)
if err != nil {
return "", err
}
grouping := opts.ID
spec, err := readSpec()
if err != nil {
return "", err
}
for _, group := range groupLabels {
if groupID, ok := spec.Annotations[group]; ok {
grouping = groupID
break
}
}
address, err := shim.SocketAddress(ctx, opts.Address, grouping)
if err != nil {
return "", err
}
socket, err := shim.NewSocket(address)
if err != nil {
// the only time where this would happen is if there is a bug and the socket
// was not cleaned up in the cleanup method of the shim or we are using the
// grouping functionality where the new process should be run with the same
// shim as an existing container
if !shim.SocketEaddrinuse(err) {
return "", errors.Wrap(err, "create new shim socket")
}
if shim.CanConnect(address) {
if err := shim.WriteAddress("address", address); err != nil {
return "", errors.Wrap(err, "write existing socket for shim")
}
return address, nil
}
if err := shim.RemoveSocket(address); err != nil {
return "", errors.Wrap(err, "remove pre-existing socket")
}
if socket, err = shim.NewSocket(address); err != nil {
return "", errors.Wrap(err, "try create new shim socket 2x")
}
}
defer func() {
if retErr != nil {
socket.Close()
_ = shim.RemoveSocket(address)
}
}()
// make sure that reexec shim-v2 binary use the value if need
if err := shim.WriteAddress("address", address); err != nil {
return "", err
}
f, err := socket.File()
if err != nil {
return "", err
}
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
goruntime.LockOSThread()
if os.Getenv("SCHED_CORE") != "" {
if err := schedcore.Create(schedcore.ProcessGroup); err != nil {
return "", errors.Wrap(err, "enable sched core support")
}
}
if err := cmd.Start(); err != nil {
f.Close()
return "", err
}
goruntime.UnlockOSThread()
defer func() {
if retErr != nil {
cmd.Process.Kill()
}
}()
// make sure to wait after start
go cmd.Wait()
if data, err := io.ReadAll(os.Stdin); err == nil {
if len(data) > 0 {
var any ptypes.Any
if err := proto.Unmarshal(data, &any); err != nil {
return "", err
}
v, err := typeurl.UnmarshalAny(&any)
if err != nil {
return "", err
}
if opts, ok := v.(*options.Options); ok {
if opts.ShimCgroup != "" {
if cgroups.Mode() == cgroups.Unified {
cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup)
if err != nil {
return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
}
if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil {
return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
}
} else {
cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup))
if err != nil {
return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
}
if err := cg.Add(cgroups.Process{
Pid: cmd.Process.Pid,
}); err != nil {
return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
}
}
}
}
}
}
if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil {
return "", errors.Wrap(err, "failed to adjust OOM score for shim")
}
return address, nil
}
func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
cwd, err := os.Getwd()
if err != nil {
return nil, err
}
path := filepath.Join(filepath.Dir(cwd), s.id)
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
runtime, err := runc.ReadRuntime(path)
if err != nil {
return nil, err
}
opts, err := runc.ReadOptions(path)
if err != nil {
return nil, err
}
root := process.RuncRoot
if opts != nil && opts.Root != "" {
root = opts.Root
}
r := process.NewRunc(root, path, ns, runtime, "", false)
if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{
Force: true,
}); err != nil {
logrus.WithError(err).Warn("failed to remove runc container")
}
if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil {
logrus.WithError(err).Warn("failed to cleanup rootfs mount")
}
return &taskAPI.DeleteResponse{
ExitedAt: time.Now(),
ExitStatus: 128 + uint32(unix.SIGKILL),
}, nil
shutdown shutdown.Service
}
// Create a new initial process and container with the underlying OCI runtime
@@ -368,6 +173,11 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
}, nil
}
func (s *service) RegisterTTRPC(server *ttrpc.Server) error {
shimapi.RegisterTaskService(server, s)
return nil
}
// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
container, err := s.getContainer(r.ID)
@@ -683,18 +493,10 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
return empty, nil
}
if s.platform != nil {
s.platform.Close()
}
// please make sure that temporary resource has been cleanup or registered
// for cleanup before calling shutdown
s.shutdown.Shutdown()
if s.shimAddress != "" {
_ = shim.RemoveSocket(s.shimAddress)
}
// please make sure that temporary resource has been cleanup
// before shutdown service.
s.cancel()
close(s.events)
return empty, nil
}
@@ -840,5 +642,6 @@ func (s *service) initPlatform() error {
return err
}
s.platform = p
s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() })
return nil
}