diff --git a/cmd/containerd-shim-runc-v2/main.go b/cmd/containerd-shim-runc-v2/main.go index d2173c86e..c94942e3a 100644 --- a/cmd/containerd-shim-runc-v2/main.go +++ b/cmd/containerd-shim-runc-v2/main.go @@ -20,10 +20,13 @@ package main import ( - v2 "github.com/containerd/containerd/runtime/v2/runc/v2" + "context" + + "github.com/containerd/containerd/runtime/v2/runc/manager" + _ "github.com/containerd/containerd/runtime/v2/runc/task/plugin" "github.com/containerd/containerd/runtime/v2/shim" ) func main() { - shim.Run("io.containerd.runc.v2", v2.New) + shim.RunManager(context.Background(), manager.NewShimManager("io.containerd.runc.v2")) } diff --git a/pkg/shutdown/shutdown.go b/pkg/shutdown/shutdown.go new file mode 100644 index 000000000..bc1af75ab --- /dev/null +++ b/pkg/shutdown/shutdown.go @@ -0,0 +1,109 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package shutdown + +import ( + "context" + "errors" + "sync" + "time" + + "golang.org/x/sync/errgroup" +) + +// ErrShutdown is the error condition when a context has been fully shutdown +var ErrShutdown = errors.New("shutdown") + +// Service is used to facilitate shutdown by through callback +// registration and shutdown initiation +type Service interface { + // Shutdown initiates shutdown + Shutdown() + // RegisterCallback registers functions to be called on shutdown and before + // the shutdown channel is closed. A callback error will propagate to the + // context error + RegisterCallback(func(context.Context) error) +} + +// WithShutdown returns a context which is similar to a cancel context, but +// with callbacks which can propagate to the context error. Unlike a cancel +// context, the shutdown context cannot be canceled from the parent context. +// However, future child contexes will be canceled upon shutdown. +func WithShutdown(ctx context.Context) (context.Context, Service) { + ss := &shutdownService{ + Context: ctx, + doneC: make(chan struct{}), + timeout: 30 * time.Second, + } + return ss, ss +} + +type shutdownService struct { + context.Context + + mu sync.Mutex + isShutdown bool + callbacks []func(context.Context) error + doneC chan struct{} + err error + timeout time.Duration +} + +func (s *shutdownService) Shutdown() { + s.mu.Lock() + defer s.mu.Unlock() + if s.isShutdown { + return + } + s.isShutdown = true + + go func(callbacks []func(context.Context) error) { + ctx, cancel := context.WithTimeout(context.Background(), s.timeout) + defer cancel() + grp, ctx := errgroup.WithContext(ctx) + for i := range callbacks { + fn := callbacks[i] + grp.Go(func() error { return fn(ctx) }) + } + err := grp.Wait() + if err == nil { + err = ErrShutdown + } + s.mu.Lock() + s.err = err + close(s.doneC) + s.mu.Unlock() + }(s.callbacks) +} + +func (s *shutdownService) Done() <-chan struct{} { + return s.doneC +} + +func (s *shutdownService) Err() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.err +} +func (s *shutdownService) RegisterCallback(fn func(context.Context) error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.callbacks == nil { + s.callbacks = []func(context.Context) error{} + } + s.callbacks = append(s.callbacks, fn) +} diff --git a/runtime/v2/runc/manager/manager_linux.go b/runtime/v2/runc/manager/manager_linux.go new file mode 100644 index 000000000..ba574beb0 --- /dev/null +++ b/runtime/v2/runc/manager/manager_linux.go @@ -0,0 +1,277 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package manager + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + goruntime "runtime" + "syscall" + "time" + + "github.com/containerd/cgroups" + cgroupsv2 "github.com/containerd/cgroups/v2" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/schedcore" + "github.com/containerd/containerd/runtime/v2/runc" + "github.com/containerd/containerd/runtime/v2/runc/options" + "github.com/containerd/containerd/runtime/v2/shim" + runcC "github.com/containerd/go-runc" + "github.com/containerd/typeurl" + "github.com/gogo/protobuf/proto" + ptypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" + exec "golang.org/x/sys/execabs" + "golang.org/x/sys/unix" +) + +// NewShimManager returns an implementation of the shim manager +// using runc +func NewShimManager(name string) shim.Manager { + return &manager{ + name: name, + } +} + +// group labels specifies how the shim groups services. +// currently supports a runc.v2 specific .group label and the +// standard k8s pod label. Order matters in this list +var groupLabels = []string{ + "io.containerd.runc.v2.group", + "io.kubernetes.cri.sandbox-id", +} + +type spec struct { + Annotations map[string]string `json:"annotations,omitempty"` +} + +type manager struct { + name string +} + +func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + self, err := os.Executable() + if err != nil { + return nil, err + } + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + args := []string{ + "-namespace", ns, + "-id", id, + "-address", containerdAddress, + } + cmd := exec.Command(self, args...) + cmd.Dir = cwd + cmd.Env = append(os.Environ(), "GOMAXPROCS=4") + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + return cmd, nil +} + +func readSpec() (*spec, error) { + f, err := os.Open("config.json") + if err != nil { + return nil, err + } + defer f.Close() + var s spec + if err := json.NewDecoder(f).Decode(&s); err != nil { + return nil, err + } + return &s, nil +} + +func (m manager) Name() string { + return m.name +} + +func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ string, retErr error) { + cmd, err := newCommand(ctx, id, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress) + if err != nil { + return "", err + } + grouping := id + spec, err := readSpec() + if err != nil { + return "", err + } + for _, group := range groupLabels { + if groupID, ok := spec.Annotations[group]; ok { + grouping = groupID + break + } + } + address, err := shim.SocketAddress(ctx, opts.Address, grouping) + if err != nil { + return "", err + } + + socket, err := shim.NewSocket(address) + if err != nil { + // the only time where this would happen is if there is a bug and the socket + // was not cleaned up in the cleanup method of the shim or we are using the + // grouping functionality where the new process should be run with the same + // shim as an existing container + if !shim.SocketEaddrinuse(err) { + return "", errors.Wrap(err, "create new shim socket") + } + if shim.CanConnect(address) { + if err := shim.WriteAddress("address", address); err != nil { + return "", errors.Wrap(err, "write existing socket for shim") + } + return address, nil + } + if err := shim.RemoveSocket(address); err != nil { + return "", errors.Wrap(err, "remove pre-existing socket") + } + if socket, err = shim.NewSocket(address); err != nil { + return "", errors.Wrap(err, "try create new shim socket 2x") + } + } + defer func() { + if retErr != nil { + socket.Close() + _ = shim.RemoveSocket(address) + } + }() + + // make sure that reexec shim-v2 binary use the value if need + if err := shim.WriteAddress("address", address); err != nil { + return "", err + } + + f, err := socket.File() + if err != nil { + return "", err + } + + cmd.ExtraFiles = append(cmd.ExtraFiles, f) + + goruntime.LockOSThread() + if os.Getenv("SCHED_CORE") != "" { + if err := schedcore.Create(schedcore.ProcessGroup); err != nil { + return "", errors.Wrap(err, "enable sched core support") + } + } + + if err := cmd.Start(); err != nil { + f.Close() + return "", err + } + + goruntime.UnlockOSThread() + + defer func() { + if retErr != nil { + cmd.Process.Kill() + } + }() + // make sure to wait after start + go cmd.Wait() + if data, err := ioutil.ReadAll(os.Stdin); err == nil { + if len(data) > 0 { + var any ptypes.Any + if err := proto.Unmarshal(data, &any); err != nil { + return "", err + } + v, err := typeurl.UnmarshalAny(&any) + if err != nil { + return "", err + } + if opts, ok := v.(*options.Options); ok { + if opts.ShimCgroup != "" { + if cgroups.Mode() == cgroups.Unified { + cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup) + if err != nil { + return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) + } + if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil { + return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) + } + } else { + cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup)) + if err != nil { + return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) + } + if err := cg.Add(cgroups.Process{ + Pid: cmd.Process.Pid, + }); err != nil { + return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) + } + } + } + } + } + } + if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil { + return "", errors.Wrap(err, "failed to adjust OOM score for shim") + } + return address, nil +} + +func (manager) Stop(ctx context.Context, id string) (shim.StopStatus, error) { + cwd, err := os.Getwd() + if err != nil { + return shim.StopStatus{}, err + } + + path := filepath.Join(filepath.Dir(cwd), id) + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return shim.StopStatus{}, err + } + runtime, err := runc.ReadRuntime(path) + if err != nil { + return shim.StopStatus{}, err + } + opts, err := runc.ReadOptions(path) + if err != nil { + return shim.StopStatus{}, err + } + root := process.RuncRoot + if opts != nil && opts.Root != "" { + root = opts.Root + } + + r := process.NewRunc(root, path, ns, runtime, "", false) + if err := r.Delete(ctx, id, &runcC.DeleteOpts{ + Force: true, + }); err != nil { + log.G(ctx).WithError(err).Warn("failed to remove runc container") + } + if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { + log.G(ctx).WithError(err).Warn("failed to cleanup rootfs mount") + } + return shim.StopStatus{ + ExitedAt: time.Now(), + ExitStatus: 128 + int(unix.SIGKILL), + }, nil +} diff --git a/runtime/v2/runc/task/plugin/plugin_linux.go b/runtime/v2/runc/task/plugin/plugin_linux.go new file mode 100644 index 000000000..603a27459 --- /dev/null +++ b/runtime/v2/runc/task/plugin/plugin_linux.go @@ -0,0 +1,47 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package plugin + +import ( + "github.com/containerd/containerd/pkg/shutdown" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/runtime/v2/runc/task" + "github.com/containerd/containerd/runtime/v2/shim" +) + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.TTRPCPlugin, + ID: "task", + Requires: []plugin.Type{ + plugin.EventPlugin, + plugin.InternalPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + pp, err := ic.GetByID(plugin.EventPlugin, "publisher") + if err != nil { + return nil, err + } + ss, err := ic.GetByID(plugin.InternalPlugin, "shutdown") + if err != nil { + return nil, err + } + return task.NewTaskService(ic.Context, pp.(shim.Publisher), ss.(shutdown.Service)) + }, + }) + +} diff --git a/runtime/v2/runc/task/service.go b/runtime/v2/runc/task/service.go new file mode 100644 index 000000000..a1b1c468a --- /dev/null +++ b/runtime/v2/runc/task/service.go @@ -0,0 +1,619 @@ +//go:build linux +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package task + +import ( + "context" + "os" + "sync" + + "github.com/containerd/cgroups" + cgroupsv2 "github.com/containerd/cgroups/v2" + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/oom" + oomv1 "github.com/containerd/containerd/pkg/oom/v1" + oomv2 "github.com/containerd/containerd/pkg/oom/v2" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/shutdown" + "github.com/containerd/containerd/pkg/stdio" + "github.com/containerd/containerd/pkg/userns" + "github.com/containerd/containerd/runtime/v2/runc" + "github.com/containerd/containerd/runtime/v2/runc/options" + "github.com/containerd/containerd/runtime/v2/shim" + shimapi "github.com/containerd/containerd/runtime/v2/task" + taskAPI "github.com/containerd/containerd/runtime/v2/task" + "github.com/containerd/containerd/sys/reaper" + runcC "github.com/containerd/go-runc" + "github.com/containerd/ttrpc" + "github.com/containerd/typeurl" + ptypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var ( + _ = (taskAPI.TaskService)(&service{}) + empty = &ptypes.Empty{} +) + +// NewTaskService creates a new instance of a task service +func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) { + var ( + ep oom.Watcher + err error + ) + if cgroups.Mode() == cgroups.Unified { + ep, err = oomv2.New(publisher) + } else { + ep, err = oomv1.New(publisher) + } + if err != nil { + return nil, err + } + go ep.Run(ctx) + s := &service{ + context: ctx, + events: make(chan interface{}, 128), + ec: reaper.Default.Subscribe(), + ep: ep, + shutdown: sd, + containers: make(map[string]*runc.Container), + } + go s.processExits() + runcC.Monitor = reaper.Default + if err := s.initPlatform(); err != nil { + return nil, errors.Wrap(err, "failed to initialized platform behavior") + } + go s.forward(ctx, publisher) + sd.RegisterCallback(func(context.Context) error { + close(s.events) + return nil + }) + + if address, err := shim.ReadAddress("address"); err == nil { + sd.RegisterCallback(func(context.Context) error { + return shim.RemoveSocket(address) + }) + } + return s, nil +} + +// service is the shim implementation of a remote shim over GRPC +type service struct { + mu sync.Mutex + eventSendMu sync.Mutex + + context context.Context + events chan interface{} + platform stdio.Platform + ec chan runcC.Exit + ep oom.Watcher + + containers map[string]*runc.Container + + shutdown shutdown.Service +} + +// Create a new initial process and container with the underlying OCI runtime +func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + container, err := runc.NewContainer(ctx, s.platform, r) + if err != nil { + return nil, err + } + + s.containers[r.ID] = container + + s.send(&eventstypes.TaskCreate{ + ContainerID: r.ID, + Bundle: r.Bundle, + Rootfs: r.Rootfs, + IO: &eventstypes.TaskIO{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }, + Checkpoint: r.Checkpoint, + Pid: uint32(container.Pid()), + }) + + return &taskAPI.CreateTaskResponse{ + Pid: uint32(container.Pid()), + }, nil +} + +func (s *service) RegisterTTRPC(server *ttrpc.Server) error { + shimapi.RegisterTaskService(server, s) + return nil +} + +// Start a process +func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + + // hold the send lock so that the start events are sent before any exit events in the error case + s.eventSendMu.Lock() + p, err := container.Start(ctx, r) + if err != nil { + s.eventSendMu.Unlock() + return nil, errdefs.ToGRPC(err) + } + + switch r.ExecID { + case "": + switch cg := container.Cgroup().(type) { + case cgroups.Cgroup: + if err := s.ep.Add(container.ID, cg); err != nil { + logrus.WithError(err).Error("add cg to OOM monitor") + } + case *cgroupsv2.Manager: + allControllers, err := cg.RootControllers() + if err != nil { + logrus.WithError(err).Error("failed to get root controllers") + } else { + if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil { + if userns.RunningInUserNS() { + logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers) + } else { + logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers) + } + } + } + if err := s.ep.Add(container.ID, cg); err != nil { + logrus.WithError(err).Error("add cg to OOM monitor") + } + } + + s.send(&eventstypes.TaskStart{ + ContainerID: container.ID, + Pid: uint32(p.Pid()), + }) + default: + s.send(&eventstypes.TaskExecStarted{ + ContainerID: container.ID, + ExecID: r.ExecID, + Pid: uint32(p.Pid()), + }) + } + s.eventSendMu.Unlock() + return &taskAPI.StartResponse{ + Pid: uint32(p.Pid()), + }, nil +} + +// Delete the initial process and container +func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Delete(ctx, r) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + // if we deleted an init task, send the task delete event + if r.ExecID == "" { + s.mu.Lock() + delete(s.containers, r.ID) + s.mu.Unlock() + s.send(&eventstypes.TaskDelete{ + ContainerID: container.ID, + Pid: uint32(p.Pid()), + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }) + } + return &taskAPI.DeleteResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + Pid: uint32(p.Pid()), + }, nil +} + +// Exec an additional process inside the container +func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + ok, cancel := container.ReserveProcess(r.ExecID) + if !ok { + return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) + } + process, err := container.Exec(ctx, r) + if err != nil { + cancel() + return nil, errdefs.ToGRPC(err) + } + + s.send(&eventstypes.TaskExecAdded{ + ContainerID: container.ID, + ExecID: process.ID(), + }) + return empty, nil +} + +// ResizePty of a process +func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.ResizePty(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// State returns runtime state information for a process +func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Process(r.ExecID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + st, err := p.Status(ctx) + if err != nil { + return nil, err + } + status := task.StatusUnknown + switch st { + case "created": + status = task.StatusCreated + case "running": + status = task.StatusRunning + case "stopped": + status = task.StatusStopped + case "paused": + status = task.StatusPaused + case "pausing": + status = task.StatusPausing + } + sio := p.Stdio() + return &taskAPI.StateResponse{ + ID: p.ID(), + Bundle: container.Bundle, + Pid: uint32(p.Pid()), + Status: status, + Stdin: sio.Stdin, + Stdout: sio.Stdout, + Stderr: sio.Stderr, + Terminal: sio.Terminal, + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +// Pause the container +func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Pause(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } + s.send(&eventstypes.TaskPaused{ + ContainerID: container.ID, + }) + return empty, nil +} + +// Resume the container +func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Resume(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } + s.send(&eventstypes.TaskResumed{ + ContainerID: container.ID, + }) + return empty, nil +} + +// Kill a process with the provided signal +func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Kill(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Pids returns all pids inside the container +func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + pids, err := s.getContainerPids(ctx, r.ID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + var processes []*task.ProcessInfo + for _, pid := range pids { + pInfo := task.ProcessInfo{ + Pid: pid, + } + for _, p := range container.ExecdProcesses() { + if p.Pid() == int(pid) { + d := &options.ProcessDetails{ + ExecID: p.ID(), + } + a, err := typeurl.MarshalAny(d) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal process %d info", pid) + } + pInfo.Info = a + break + } + } + processes = append(processes, &pInfo) + } + return &taskAPI.PidsResponse{ + Processes: processes, + }, nil +} + +// CloseIO of a process +func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.CloseIO(ctx, r); err != nil { + return nil, err + } + return empty, nil +} + +// Checkpoint the container +func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Checkpoint(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Update a running container +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Update(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Wait for a process to exit +func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Process(r.ExecID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + p.Wait() + + return &taskAPI.WaitResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +// Connect returns shim information such as the shim's pid +func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { + var pid int + if container, err := s.getContainer(r.ID); err == nil { + pid = container.Pid() + } + return &taskAPI.ConnectResponse{ + ShimPid: uint32(os.Getpid()), + TaskPid: uint32(pid), + }, nil +} + +func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { + s.mu.Lock() + defer s.mu.Unlock() + + // return out if the shim is still servicing containers + if len(s.containers) > 0 { + return empty, nil + } + + // please make sure that temporary resource has been cleanup or registered + // for cleanup before calling shutdown + s.shutdown.Shutdown() + + return empty, nil +} + +func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + cgx := container.Cgroup() + if cgx == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") + } + var statsx interface{} + switch cg := cgx.(type) { + case cgroups.Cgroup: + stats, err := cg.Stat(cgroups.IgnoreNotExist) + if err != nil { + return nil, err + } + statsx = stats + case *cgroupsv2.Manager: + stats, err := cg.Stat() + if err != nil { + return nil, err + } + statsx = stats + default: + return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg) + } + data, err := typeurl.MarshalAny(statsx) + if err != nil { + return nil, err + } + return &taskAPI.StatsResponse{ + Stats: data, + }, nil +} + +func (s *service) processExits() { + for e := range s.ec { + s.checkProcesses(e) + } +} + +func (s *service) send(evt interface{}) { + s.events <- evt +} + +func (s *service) sendL(evt interface{}) { + s.eventSendMu.Lock() + s.events <- evt + s.eventSendMu.Unlock() +} + +func (s *service) checkProcesses(e runcC.Exit) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, container := range s.containers { + if !container.HasPid(e.Pid) { + continue + } + + for _, p := range container.All() { + if p.Pid() != e.Pid { + continue + } + + if ip, ok := p.(*process.Init); ok { + // Ensure all children are killed + if runc.ShouldKillAllOnExit(s.context, container.Bundle) { + if err := ip.KillAll(s.context); err != nil { + logrus.WithError(err).WithField("id", ip.ID()). + Error("failed to kill init's children") + } + } + } + + p.SetExited(e.Status) + s.sendL(&eventstypes.TaskExit{ + ContainerID: container.ID, + ID: p.ID(), + Pid: uint32(e.Pid), + ExitStatus: uint32(e.Status), + ExitedAt: p.ExitedAt(), + }) + return + } + return + } +} + +func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { + container, err := s.getContainer(id) + if err != nil { + return nil, err + } + p, err := container.Process("") + if err != nil { + return nil, errdefs.ToGRPC(err) + } + ps, err := p.(*process.Init).Runtime().Ps(ctx, id) + if err != nil { + return nil, err + } + pids := make([]uint32, 0, len(ps)) + for _, pid := range ps { + pids = append(pids, uint32(pid)) + } + return pids, nil +} + +func (s *service) forward(ctx context.Context, publisher shim.Publisher) { + ns, _ := namespaces.Namespace(ctx) + ctx = namespaces.WithNamespace(context.Background(), ns) + for e := range s.events { + err := publisher.Publish(ctx, runc.GetTopic(e), e) + if err != nil { + logrus.WithError(err).Error("post event") + } + } + publisher.Close() +} + +func (s *service) getContainer(id string) (*runc.Container, error) { + s.mu.Lock() + container := s.containers[id] + s.mu.Unlock() + if container == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") + } + return container, nil +} + +// initialize a single epoll fd to manage our consoles. `initPlatform` should +// only be called once. +func (s *service) initPlatform() error { + if s.platform != nil { + return nil + } + p, err := runc.NewPlatform() + if err != nil { + return err + } + s.platform = p + s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() }) + return nil +} diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go index 8f81bd9c9..6dd8d6d14 100644 --- a/runtime/v2/runc/v2/service.go +++ b/runtime/v2/runc/v2/service.go @@ -21,824 +21,59 @@ package v2 import ( "context" - "encoding/json" - "io" - "os" - "path/filepath" - goruntime "runtime" - "sync" - "syscall" - "time" - "github.com/containerd/cgroups" - cgroupsv2 "github.com/containerd/cgroups/v2" - eventstypes "github.com/containerd/containerd/api/events" - "github.com/containerd/containerd/api/types/task" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/mount" - "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/pkg/oom" - oomv1 "github.com/containerd/containerd/pkg/oom/v1" - oomv2 "github.com/containerd/containerd/pkg/oom/v2" - "github.com/containerd/containerd/pkg/process" - "github.com/containerd/containerd/pkg/schedcore" - "github.com/containerd/containerd/pkg/stdio" - "github.com/containerd/containerd/pkg/userns" - "github.com/containerd/containerd/runtime/v2/runc" - "github.com/containerd/containerd/runtime/v2/runc/options" + "github.com/containerd/containerd/pkg/shutdown" + "github.com/containerd/containerd/runtime/v2/runc/manager" + "github.com/containerd/containerd/runtime/v2/runc/task" "github.com/containerd/containerd/runtime/v2/shim" - taskAPI "github.com/containerd/containerd/runtime/v2/task" - "github.com/containerd/containerd/sys/reaper" - runcC "github.com/containerd/go-runc" - "github.com/containerd/typeurl" - "github.com/gogo/protobuf/proto" - ptypes "github.com/gogo/protobuf/types" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - exec "golang.org/x/sys/execabs" - "golang.org/x/sys/unix" + shimapi "github.com/containerd/containerd/runtime/v2/task" ) -var ( - _ = (taskAPI.TaskService)(&service{}) - empty = &ptypes.Empty{} -) +// TODO(2.0): Remove this package -// group labels specifies how the shim groups services. -// currently supports a runc.v2 specific .group label and the -// standard k8s pod label. Order matters in this list -var groupLabels = []string{ - "io.containerd.runc.v2.group", - "io.kubernetes.cri.sandbox-id", +type shimTaskManager struct { + shimapi.TaskService + id string + manager shim.Manager } -type spec struct { - Annotations map[string]string `json:"annotations,omitempty"` -} - -// New returns a new shim service that can be used via GRPC -func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { - var ( - ep oom.Watcher - err error - ) - if cgroups.Mode() == cgroups.Unified { - ep, err = oomv2.New(publisher) - } else { - ep, err = oomv1.New(publisher) - } +func (stm *shimTaskManager) Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) { + ss, err := stm.manager.Stop(ctx, stm.id) if err != nil { return nil, err } - go ep.Run(ctx) - s := &service{ - id: id, - context: ctx, - events: make(chan interface{}, 128), - ec: reaper.Default.Subscribe(), - ep: ep, - cancel: shutdown, - containers: make(map[string]*runc.Container), - } - go s.processExits() - runcC.Monitor = reaper.Default - if err := s.initPlatform(); err != nil { - shutdown() - return nil, errors.Wrap(err, "failed to initialized platform behavior") - } - go s.forward(ctx, publisher) - - if address, err := shim.ReadAddress("address"); err == nil { - s.shimAddress = address - } - return s, nil -} - -// service is the shim implementation of a remote shim over GRPC -type service struct { - mu sync.Mutex - eventSendMu sync.Mutex - - context context.Context - events chan interface{} - platform stdio.Platform - ec chan runcC.Exit - ep oom.Watcher - - // id only used in cleanup case - id string - - containers map[string]*runc.Container - - shimAddress string - cancel func() -} - -func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) { - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - self, err := os.Executable() - if err != nil { - return nil, err - } - cwd, err := os.Getwd() - if err != nil { - return nil, err - } - args := []string{ - "-namespace", ns, - "-id", id, - "-address", containerdAddress, - } - cmd := exec.Command(self, args...) - cmd.Dir = cwd - cmd.Env = append(os.Environ(), "GOMAXPROCS=4") - cmd.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, - } - return cmd, nil -} - -func readSpec() (*spec, error) { - f, err := os.Open("config.json") - if err != nil { - return nil, err - } - defer f.Close() - var s spec - if err := json.NewDecoder(f).Decode(&s); err != nil { - return nil, err - } - return &s, nil -} - -func (s *service) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, retErr error) { - cmd, err := newCommand(ctx, opts.ID, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress) - if err != nil { - return "", err - } - grouping := opts.ID - spec, err := readSpec() - if err != nil { - return "", err - } - for _, group := range groupLabels { - if groupID, ok := spec.Annotations[group]; ok { - grouping = groupID - break - } - } - address, err := shim.SocketAddress(ctx, opts.Address, grouping) - if err != nil { - return "", err - } - - socket, err := shim.NewSocket(address) - if err != nil { - // the only time where this would happen is if there is a bug and the socket - // was not cleaned up in the cleanup method of the shim or we are using the - // grouping functionality where the new process should be run with the same - // shim as an existing container - if !shim.SocketEaddrinuse(err) { - return "", errors.Wrap(err, "create new shim socket") - } - if shim.CanConnect(address) { - if err := shim.WriteAddress("address", address); err != nil { - return "", errors.Wrap(err, "write existing socket for shim") - } - return address, nil - } - if err := shim.RemoveSocket(address); err != nil { - return "", errors.Wrap(err, "remove pre-existing socket") - } - if socket, err = shim.NewSocket(address); err != nil { - return "", errors.Wrap(err, "try create new shim socket 2x") - } - } - defer func() { - if retErr != nil { - socket.Close() - _ = shim.RemoveSocket(address) - } - }() - - // make sure that reexec shim-v2 binary use the value if need - if err := shim.WriteAddress("address", address); err != nil { - return "", err - } - - f, err := socket.File() - if err != nil { - return "", err - } - - cmd.ExtraFiles = append(cmd.ExtraFiles, f) - - goruntime.LockOSThread() - if os.Getenv("SCHED_CORE") != "" { - if err := schedcore.Create(schedcore.ProcessGroup); err != nil { - return "", errors.Wrap(err, "enable sched core support") - } - } - - if err := cmd.Start(); err != nil { - f.Close() - return "", err - } - - goruntime.UnlockOSThread() - - defer func() { - if retErr != nil { - cmd.Process.Kill() - } - }() - // make sure to wait after start - go cmd.Wait() - if data, err := io.ReadAll(os.Stdin); err == nil { - if len(data) > 0 { - var any ptypes.Any - if err := proto.Unmarshal(data, &any); err != nil { - return "", err - } - v, err := typeurl.UnmarshalAny(&any) - if err != nil { - return "", err - } - if opts, ok := v.(*options.Options); ok { - if opts.ShimCgroup != "" { - if cgroups.Mode() == cgroups.Unified { - cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup) - if err != nil { - return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) - } - if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil { - return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) - } - } else { - cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup)) - if err != nil { - return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) - } - if err := cg.Add(cgroups.Process{ - Pid: cmd.Process.Pid, - }); err != nil { - return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) - } - } - } - } - } - } - if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil { - return "", errors.Wrap(err, "failed to adjust OOM score for shim") - } - return address, nil -} - -func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { - cwd, err := os.Getwd() - if err != nil { - return nil, err - } - - path := filepath.Join(filepath.Dir(cwd), s.id) - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - runtime, err := runc.ReadRuntime(path) - if err != nil { - return nil, err - } - opts, err := runc.ReadOptions(path) - if err != nil { - return nil, err - } - root := process.RuncRoot - if opts != nil && opts.Root != "" { - root = opts.Root - } - - r := process.NewRunc(root, path, ns, runtime, "", false) - if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{ - Force: true, - }); err != nil { - logrus.WithError(err).Warn("failed to remove runc container") - } - if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { - logrus.WithError(err).Warn("failed to cleanup rootfs mount") - } - return &taskAPI.DeleteResponse{ - ExitedAt: time.Now(), - ExitStatus: 128 + uint32(unix.SIGKILL), + return &shimapi.DeleteResponse{ + Pid: uint32(ss.Pid), + ExitStatus: uint32(ss.ExitStatus), + ExitedAt: ss.ExitedAt, }, nil } -// Create a new initial process and container with the underlying OCI runtime -func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { - s.mu.Lock() - defer s.mu.Unlock() - - container, err := runc.NewContainer(ctx, s.platform, r) - if err != nil { - return nil, err - } - - s.containers[r.ID] = container - - s.send(&eventstypes.TaskCreate{ - ContainerID: r.ID, - Bundle: r.Bundle, - Rootfs: r.Rootfs, - IO: &eventstypes.TaskIO{ - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Terminal: r.Terminal, - }, - Checkpoint: r.Checkpoint, - Pid: uint32(container.Pid()), - }) - - return &taskAPI.CreateTaskResponse{ - Pid: uint32(container.Pid()), - }, nil +func (stm *shimTaskManager) StartShim(ctx context.Context, opts shim.StartOpts) (string, error) { + return stm.manager.Start(ctx, opts.ID, opts) } -// Start a process -func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - - // hold the send lock so that the start events are sent before any exit events in the error case - s.eventSendMu.Lock() - p, err := container.Start(ctx, r) - if err != nil { - s.eventSendMu.Unlock() - return nil, errdefs.ToGRPC(err) - } - - switch r.ExecID { - case "": - switch cg := container.Cgroup().(type) { - case cgroups.Cgroup: - if err := s.ep.Add(container.ID, cg); err != nil { - logrus.WithError(err).Error("add cg to OOM monitor") - } - case *cgroupsv2.Manager: - allControllers, err := cg.RootControllers() - if err != nil { - logrus.WithError(err).Error("failed to get root controllers") - } else { - if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil { - if userns.RunningInUserNS() { - logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers) - } else { - logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers) - } - } - } - if err := s.ep.Add(container.ID, cg); err != nil { - logrus.WithError(err).Error("add cg to OOM monitor") - } - } - - s.send(&eventstypes.TaskStart{ - ContainerID: container.ID, - Pid: uint32(p.Pid()), - }) - default: - s.send(&eventstypes.TaskExecStarted{ - ContainerID: container.ID, - ExecID: r.ExecID, - Pid: uint32(p.Pid()), - }) - } - s.eventSendMu.Unlock() - return &taskAPI.StartResponse{ - Pid: uint32(p.Pid()), - }, nil -} - -// Delete the initial process and container -func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - p, err := container.Delete(ctx, r) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - // if we deleted an init task, send the task delete event - if r.ExecID == "" { - s.mu.Lock() - delete(s.containers, r.ID) - s.mu.Unlock() - s.send(&eventstypes.TaskDelete{ - ContainerID: container.ID, - Pid: uint32(p.Pid()), - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - }) - } - return &taskAPI.DeleteResponse{ - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - Pid: uint32(p.Pid()), - }, nil -} - -// Exec an additional process inside the container -func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - ok, cancel := container.ReserveProcess(r.ExecID) +// New returns a new shim service that can be used for +// - serving the task service over grpc/ttrpc +// - shim management +// This function is deprecated in favor direct creation +// of shim manager and registering task service via plugins. +func New(ctx context.Context, id string, publisher shim.Publisher, fn func()) (shim.Shim, error) { + sd, ok := ctx.(shutdown.Service) if !ok { - return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) + ctx, sd = shutdown.WithShutdown(ctx) + sd.RegisterCallback(func(context.Context) error { + fn() + return nil + }) } - process, err := container.Exec(ctx, r) - if err != nil { - cancel() - return nil, errdefs.ToGRPC(err) - } - - s.send(&eventstypes.TaskExecAdded{ - ContainerID: container.ID, - ExecID: process.ID(), - }) - return empty, nil -} - -// ResizePty of a process -func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) + ts, err := task.NewTaskService(ctx, publisher, sd) if err != nil { return nil, err } - if err := container.ResizePty(ctx, r); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// State returns runtime state information for a process -func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - p, err := container.Process(r.ExecID) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - st, err := p.Status(ctx) - if err != nil { - return nil, err - } - status := task.StatusUnknown - switch st { - case "created": - status = task.StatusCreated - case "running": - status = task.StatusRunning - case "stopped": - status = task.StatusStopped - case "paused": - status = task.StatusPaused - case "pausing": - status = task.StatusPausing - } - sio := p.Stdio() - return &taskAPI.StateResponse{ - ID: p.ID(), - Bundle: container.Bundle, - Pid: uint32(p.Pid()), - Status: status, - Stdin: sio.Stdin, - Stdout: sio.Stdout, - Stderr: sio.Stderr, - Terminal: sio.Terminal, - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), + return &shimTaskManager{ + TaskService: ts, + id: id, + manager: manager.NewShimManager("runc"), }, nil } - -// Pause the container -func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Pause(ctx); err != nil { - return nil, errdefs.ToGRPC(err) - } - s.send(&eventstypes.TaskPaused{ - ContainerID: container.ID, - }) - return empty, nil -} - -// Resume the container -func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Resume(ctx); err != nil { - return nil, errdefs.ToGRPC(err) - } - s.send(&eventstypes.TaskResumed{ - ContainerID: container.ID, - }) - return empty, nil -} - -// Kill a process with the provided signal -func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Kill(ctx, r); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// Pids returns all pids inside the container -func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - pids, err := s.getContainerPids(ctx, r.ID) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - var processes []*task.ProcessInfo - for _, pid := range pids { - pInfo := task.ProcessInfo{ - Pid: pid, - } - for _, p := range container.ExecdProcesses() { - if p.Pid() == int(pid) { - d := &options.ProcessDetails{ - ExecID: p.ID(), - } - a, err := typeurl.MarshalAny(d) - if err != nil { - return nil, errors.Wrapf(err, "failed to marshal process %d info", pid) - } - pInfo.Info = a - break - } - } - processes = append(processes, &pInfo) - } - return &taskAPI.PidsResponse{ - Processes: processes, - }, nil -} - -// CloseIO of a process -func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.CloseIO(ctx, r); err != nil { - return nil, err - } - return empty, nil -} - -// Checkpoint the container -func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Checkpoint(ctx, r); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// Update a running container -func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Update(ctx, r); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// Wait for a process to exit -func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - p, err := container.Process(r.ExecID) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - p.Wait() - - return &taskAPI.WaitResponse{ - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - }, nil -} - -// Connect returns shim information such as the shim's pid -func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { - var pid int - if container, err := s.getContainer(r.ID); err == nil { - pid = container.Pid() - } - return &taskAPI.ConnectResponse{ - ShimPid: uint32(os.Getpid()), - TaskPid: uint32(pid), - }, nil -} - -func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { - s.mu.Lock() - defer s.mu.Unlock() - - // return out if the shim is still servicing containers - if len(s.containers) > 0 { - return empty, nil - } - - if s.platform != nil { - s.platform.Close() - } - - if s.shimAddress != "" { - _ = shim.RemoveSocket(s.shimAddress) - } - - // please make sure that temporary resource has been cleanup - // before shutdown service. - s.cancel() - close(s.events) - return empty, nil -} - -func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - cgx := container.Cgroup() - if cgx == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") - } - var statsx interface{} - switch cg := cgx.(type) { - case cgroups.Cgroup: - stats, err := cg.Stat(cgroups.IgnoreNotExist) - if err != nil { - return nil, err - } - statsx = stats - case *cgroupsv2.Manager: - stats, err := cg.Stat() - if err != nil { - return nil, err - } - statsx = stats - default: - return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg) - } - data, err := typeurl.MarshalAny(statsx) - if err != nil { - return nil, err - } - return &taskAPI.StatsResponse{ - Stats: data, - }, nil -} - -func (s *service) processExits() { - for e := range s.ec { - s.checkProcesses(e) - } -} - -func (s *service) send(evt interface{}) { - s.events <- evt -} - -func (s *service) sendL(evt interface{}) { - s.eventSendMu.Lock() - s.events <- evt - s.eventSendMu.Unlock() -} - -func (s *service) checkProcesses(e runcC.Exit) { - s.mu.Lock() - defer s.mu.Unlock() - - for _, container := range s.containers { - if !container.HasPid(e.Pid) { - continue - } - - for _, p := range container.All() { - if p.Pid() != e.Pid { - continue - } - - if ip, ok := p.(*process.Init); ok { - // Ensure all children are killed - if runc.ShouldKillAllOnExit(s.context, container.Bundle) { - if err := ip.KillAll(s.context); err != nil { - logrus.WithError(err).WithField("id", ip.ID()). - Error("failed to kill init's children") - } - } - } - - p.SetExited(e.Status) - s.sendL(&eventstypes.TaskExit{ - ContainerID: container.ID, - ID: p.ID(), - Pid: uint32(e.Pid), - ExitStatus: uint32(e.Status), - ExitedAt: p.ExitedAt(), - }) - return - } - return - } -} - -func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { - container, err := s.getContainer(id) - if err != nil { - return nil, err - } - p, err := container.Process("") - if err != nil { - return nil, errdefs.ToGRPC(err) - } - ps, err := p.(*process.Init).Runtime().Ps(ctx, id) - if err != nil { - return nil, err - } - pids := make([]uint32, 0, len(ps)) - for _, pid := range ps { - pids = append(pids, uint32(pid)) - } - return pids, nil -} - -func (s *service) forward(ctx context.Context, publisher shim.Publisher) { - ns, _ := namespaces.Namespace(ctx) - ctx = namespaces.WithNamespace(context.Background(), ns) - for e := range s.events { - err := publisher.Publish(ctx, runc.GetTopic(e), e) - if err != nil { - logrus.WithError(err).Error("post event") - } - } - publisher.Close() -} - -func (s *service) getContainer(id string) (*runc.Container, error) { - s.mu.Lock() - container := s.containers[id] - s.mu.Unlock() - if container == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") - } - return container, nil -} - -// initialize a single epoll fd to manage our consoles. `initPlatform` should -// only be called once. -func (s *service) initPlatform() error { - if s.platform != nil { - return nil - } - p, err := runc.NewPlatform() - if err != nil { - return err - } - s.platform = p - return nil -} diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index 691040bc7..e25626cf3 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -30,6 +30,7 @@ import ( "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/shutdown" "github.com/containerd/containerd/plugin" shimapi "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/containerd/version" @@ -47,21 +48,37 @@ type Publisher interface { // StartOpts describes shim start configuration received from containerd type StartOpts struct { - ID string + ID string // TODO(2.0): Remove ID, passed directly to start for call symmetry ContainerdBinary string Address string TTRPCAddress string } +type StopStatus struct { + Pid int + ExitStatus int + ExitedAt time.Time +} + // Init func for the creation of a shim server +// TODO(2.0): Remove init function type Init func(context.Context, string, Publisher, func()) (Shim, error) // Shim server interface +// TODO(2.0): Remove unified shim interface type Shim interface { + shimapi.TaskService Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) StartShim(ctx context.Context, opts StartOpts) (string, error) } +// Manager is the interface which manages the shim process +type Manager interface { + Name() string + Start(ctx context.Context, id string, opts StartOpts) (string, error) + Stop(ctx context.Context, id string) (StopStatus, error) +} + // OptsKey is the context key for the Opts value. type OptsKey struct{} @@ -89,18 +106,18 @@ type ttrpcService interface { } type taskService struct { - local shimapi.TaskService + shimapi.TaskService } -func (t *taskService) RegisterTTRPC(server *ttrpc.Server) error { - shimapi.RegisterTaskService(server, t.local) +func (t taskService) RegisterTTRPC(server *ttrpc.Server) error { + shimapi.RegisterTaskService(server, t.TaskService) return nil } var ( debugFlag bool versionFlag bool - idFlag string + id string namespaceFlag string socketFlag string bundlePath string @@ -117,7 +134,7 @@ func parseFlags() { flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs") flag.BoolVar(&versionFlag, "v", false, "show the shim version and exit") flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim") - flag.StringVar(&idFlag, "id", "", "id of the task") + flag.StringVar(&id, "id", "", "id of the task") flag.StringVar(&socketFlag, "socket", "", "socket path to serve") flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir") @@ -142,36 +159,85 @@ func setRuntime() { } } -func setLogger(ctx context.Context, id string) error { - logrus.SetFormatter(&logrus.TextFormatter{ +func setLogger(ctx context.Context, id string) (context.Context, error) { + l := log.G(ctx) + l.Logger.SetFormatter(&logrus.TextFormatter{ TimestampFormat: log.RFC3339NanoFixed, FullTimestamp: true, }) if debugFlag { - logrus.SetLevel(logrus.DebugLevel) + l.Logger.SetLevel(logrus.DebugLevel) } f, err := openLog(ctx, id) if err != nil { - return err + return ctx, err } - logrus.SetOutput(f) - return nil + l.Logger.SetOutput(f) + return log.WithLogger(ctx, l), nil } // Run initializes and runs a shim server -func Run(id string, initFunc Init, opts ...BinaryOpts) { +// TODO(2.0): Remove function +func Run(name string, initFunc Init, opts ...BinaryOpts) { var config Config for _, o := range opts { o(&config) } - if err := run(id, initFunc, config); err != nil { - fmt.Fprintf(os.Stderr, "%s: %s\n", id, err) + ctx := context.Background() + ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", name)) + + if err := run(ctx, nil, initFunc, name, config); err != nil { + fmt.Fprintf(os.Stderr, "%s: %s", name, err) os.Exit(1) } } -func run(id string, initFunc Init, config Config) error { +// TODO(2.0): Remove this type +type shimToManager struct { + shim Shim + name string +} + +func (stm shimToManager) Name() string { + return stm.name +} + +func (stm shimToManager) Start(ctx context.Context, id string, opts StartOpts) (string, error) { + opts.ID = id + return stm.shim.StartShim(ctx, opts) +} + +func (stm shimToManager) Stop(ctx context.Context, id string) (StopStatus, error) { + // shim must already have id + dr, err := stm.shim.Cleanup(ctx) + if err != nil { + return StopStatus{}, err + } + return StopStatus{ + Pid: int(dr.Pid), + ExitStatus: int(dr.ExitStatus), + ExitedAt: dr.ExitedAt, + }, nil +} + +// RunManager initialzes and runs a shim server +// TODO(2.0): Rename to Run +func RunManager(ctx context.Context, manager Manager, opts ...BinaryOpts) { + var config Config + for _, o := range opts { + o(&config) + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", manager.Name())) + + if err := run(ctx, manager, nil, "", config); err != nil { + fmt.Fprintf(os.Stderr, "%s: %s", manager.Name(), err) + os.Exit(1) + } +} + +func run(ctx context.Context, manager Manager, initFunc Init, name string, config Config) error { parseFlags() if versionFlag { fmt.Printf("%s:\n", os.Args[0]) @@ -206,28 +272,49 @@ func run(id string, initFunc Init, config Config) error { } defer publisher.Close() - ctx := namespaces.WithNamespace(context.Background(), namespaceFlag) + ctx = namespaces.WithNamespace(ctx, namespaceFlag) ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) - ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id)) - ctx, cancel := context.WithCancel(ctx) - service, err := initFunc(ctx, idFlag, publisher, cancel) - if err != nil { - return err + ctx, sd := shutdown.WithShutdown(ctx) + defer sd.Shutdown() + + if manager == nil { + service, err := initFunc(ctx, id, publisher, sd.Shutdown) + if err != nil { + return err + } + plugin.Register(&plugin.Registration{ + Type: plugin.TTRPCPlugin, + ID: "task", + Requires: []plugin.Type{ + plugin.EventPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + return taskService{service}, nil + }, + }) + manager = shimToManager{ + shim: service, + name: name, + } } // Handle explicit actions switch action { case "delete": - logger := logrus.WithFields(logrus.Fields{ + logger := log.G(ctx).WithFields(logrus.Fields{ "pid": os.Getpid(), "namespace": namespaceFlag, }) go handleSignals(ctx, logger, signals) - response, err := service.Cleanup(ctx) + ss, err := manager.Stop(ctx, id) if err != nil { return err } - data, err := proto.Marshal(response) + data, err := proto.Marshal(&shimapi.DeleteResponse{ + Pid: uint32(ss.Pid), + ExitStatus: uint32(ss.ExitStatus), + ExitedAt: ss.ExitedAt, + }) if err != nil { return err } @@ -237,13 +324,12 @@ func run(id string, initFunc Init, config Config) error { return nil case "start": opts := StartOpts{ - ID: idFlag, ContainerdBinary: containerdBinaryFlag, Address: addressFlag, TTRPCAddress: ttrpcAddress, } - address, err := service.StartShim(ctx, opts) + address, err := manager.Start(ctx, id, opts) if err != nil { return err } @@ -254,11 +340,20 @@ func run(id string, initFunc Init, config Config) error { } if !config.NoSetupLogger { - if err := setLogger(ctx, idFlag); err != nil { + ctx, err = setLogger(ctx, id) + if err != nil { return err } } + plugin.Register(&plugin.Registration{ + Type: plugin.InternalPlugin, + ID: "shutdown", + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + return sd, nil + }, + }) + // Register event plugin plugin.Register(&plugin.Registration{ Type: plugin.EventPlugin, @@ -268,17 +363,6 @@ func run(id string, initFunc Init, config Config) error { }, }) - // If service is an implementation of the task service, register it as a plugin - if ts, ok := service.(shimapi.TaskService); ok { - plugin.Register(&plugin.Registration{ - Type: plugin.TTRPCPlugin, - ID: "task", - InitFn: func(ic *plugin.InitContext) (interface{}, error) { - return &taskService{ts}, nil - }, - }) - } - var ( initialized = plugin.NewPluginSet() ttrpcServices = []ttrpcService{} @@ -345,7 +429,7 @@ func run(id string, initFunc Init, config Config) error { } if err := serve(ctx, server, signals); err != nil { - if err != context.Canceled { + if err != shutdown.ErrShutdown { return err } } @@ -383,10 +467,10 @@ func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal) er defer l.Close() if err := server.Serve(ctx, l); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure") + log.G(ctx).WithError(err).Fatal("containerd-shim: ttrpc server failure") } }() - logger := logrus.WithFields(logrus.Fields{ + logger := log.G(ctx).WithFields(logrus.Fields{ "pid": os.Getpid(), "path": path, "namespace": namespaceFlag,