From 04e57d71b2397489bd961f8f63f1e8f8b6ce59fd Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Mon, 20 Sep 2021 10:48:13 -0700 Subject: [PATCH] Seperate shim manager and task service Create new shim manager interface and deprecate older shim manager interface. Signed-off-by: Derek McGowan --- .../manager.go => manager/manager_linux.go} | 45 +- runtime/v2/runc/tasks/service.go | 619 +++++++++++++++++ runtime/v2/runc/v2/service.go | 632 +----------------- runtime/v2/shim/shim.go | 160 +++-- 4 files changed, 789 insertions(+), 667 deletions(-) rename runtime/v2/runc/{service/manager.go => manager/manager_linux.go} (87%) create mode 100644 runtime/v2/runc/tasks/service.go diff --git a/runtime/v2/runc/service/manager.go b/runtime/v2/runc/manager/manager_linux.go similarity index 87% rename from runtime/v2/runc/service/manager.go rename to runtime/v2/runc/manager/manager_linux.go index 8a82f37e4..ba574beb0 100644 --- a/runtime/v2/runc/service/manager.go +++ b/runtime/v2/runc/manager/manager_linux.go @@ -1,5 +1,3 @@ -// +build linux - /* Copyright The containerd Authors. @@ -16,7 +14,7 @@ limitations under the License. */ -package service +package manager import ( "context" @@ -30,6 +28,7 @@ import ( "github.com/containerd/cgroups" cgroupsv2 "github.com/containerd/cgroups/v2" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/process" @@ -37,22 +36,20 @@ import ( "github.com/containerd/containerd/runtime/v2/runc" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/shim" - taskAPI "github.com/containerd/containerd/runtime/v2/task" runcC "github.com/containerd/go-runc" "github.com/containerd/typeurl" "github.com/gogo/protobuf/proto" ptypes "github.com/gogo/protobuf/types" "github.com/pkg/errors" - "github.com/sirupsen/logrus" exec "golang.org/x/sys/execabs" "golang.org/x/sys/unix" ) // NewShimManager returns an implementation of the shim manager // using runc -func NewShimManager(id string) shim.Shim { +func NewShimManager(name string) shim.Manager { return &manager{ - id: id, + name: name, } } @@ -69,7 +66,7 @@ type spec struct { } type manager struct { - id string + name string } func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) { @@ -112,12 +109,16 @@ func readSpec() (*spec, error) { return &s, nil } -func (manager) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, retErr error) { - cmd, err := newCommand(ctx, opts.ID, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress) +func (m manager) Name() string { + return m.name +} + +func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ string, retErr error) { + cmd, err := newCommand(ctx, id, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress) if err != nil { return "", err } - grouping := opts.ID + grouping := id spec, err := readSpec() if err != nil { return "", err @@ -236,24 +237,24 @@ func (manager) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, re return address, nil } -func (m manager) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { +func (manager) Stop(ctx context.Context, id string) (shim.StopStatus, error) { cwd, err := os.Getwd() if err != nil { - return nil, err + return shim.StopStatus{}, err } - path := filepath.Join(filepath.Dir(cwd), m.id) + path := filepath.Join(filepath.Dir(cwd), id) ns, err := namespaces.NamespaceRequired(ctx) if err != nil { - return nil, err + return shim.StopStatus{}, err } runtime, err := runc.ReadRuntime(path) if err != nil { - return nil, err + return shim.StopStatus{}, err } opts, err := runc.ReadOptions(path) if err != nil { - return nil, err + return shim.StopStatus{}, err } root := process.RuncRoot if opts != nil && opts.Root != "" { @@ -261,16 +262,16 @@ func (m manager) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { } r := process.NewRunc(root, path, ns, runtime, "", false) - if err := r.Delete(ctx, m.id, &runcC.DeleteOpts{ + if err := r.Delete(ctx, id, &runcC.DeleteOpts{ Force: true, }); err != nil { - logrus.WithError(err).Warn("failed to remove runc container") + log.G(ctx).WithError(err).Warn("failed to remove runc container") } if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { - logrus.WithError(err).Warn("failed to cleanup rootfs mount") + log.G(ctx).WithError(err).Warn("failed to cleanup rootfs mount") } - return &taskAPI.DeleteResponse{ + return shim.StopStatus{ ExitedAt: time.Now(), - ExitStatus: 128 + uint32(unix.SIGKILL), + ExitStatus: 128 + int(unix.SIGKILL), }, nil } diff --git a/runtime/v2/runc/tasks/service.go b/runtime/v2/runc/tasks/service.go new file mode 100644 index 000000000..5659affa2 --- /dev/null +++ b/runtime/v2/runc/tasks/service.go @@ -0,0 +1,619 @@ +//go:build linux +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package tasks + +import ( + "context" + "os" + "sync" + + "github.com/containerd/cgroups" + cgroupsv2 "github.com/containerd/cgroups/v2" + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/oom" + oomv1 "github.com/containerd/containerd/pkg/oom/v1" + oomv2 "github.com/containerd/containerd/pkg/oom/v2" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/shutdown" + "github.com/containerd/containerd/pkg/stdio" + "github.com/containerd/containerd/pkg/userns" + "github.com/containerd/containerd/runtime/v2/runc" + "github.com/containerd/containerd/runtime/v2/runc/options" + "github.com/containerd/containerd/runtime/v2/shim" + shimapi "github.com/containerd/containerd/runtime/v2/task" + taskAPI "github.com/containerd/containerd/runtime/v2/task" + "github.com/containerd/containerd/sys/reaper" + runcC "github.com/containerd/go-runc" + "github.com/containerd/ttrpc" + "github.com/containerd/typeurl" + ptypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var ( + _ = (taskAPI.TaskService)(&service{}) + empty = &ptypes.Empty{} +) + +// NewTaskService creates a new instance of a task service +func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) { + var ( + ep oom.Watcher + err error + ) + if cgroups.Mode() == cgroups.Unified { + ep, err = oomv2.New(publisher) + } else { + ep, err = oomv1.New(publisher) + } + if err != nil { + return nil, err + } + go ep.Run(ctx) + s := &service{ + context: ctx, + events: make(chan interface{}, 128), + ec: reaper.Default.Subscribe(), + ep: ep, + shutdown: sd, + containers: make(map[string]*runc.Container), + } + go s.processExits() + runcC.Monitor = reaper.Default + if err := s.initPlatform(); err != nil { + return nil, errors.Wrap(err, "failed to initialized platform behavior") + } + go s.forward(ctx, publisher) + sd.RegisterCallback(func(context.Context) error { + close(s.events) + return nil + }) + + if address, err := shim.ReadAddress("address"); err == nil { + sd.RegisterCallback(func(context.Context) error { + return shim.RemoveSocket(address) + }) + } + return s, nil +} + +// service is the shim implementation of a remote shim over GRPC +type service struct { + mu sync.Mutex + eventSendMu sync.Mutex + + context context.Context + events chan interface{} + platform stdio.Platform + ec chan runcC.Exit + ep oom.Watcher + + containers map[string]*runc.Container + + shutdown shutdown.Service +} + +// Create a new initial process and container with the underlying OCI runtime +func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + container, err := runc.NewContainer(ctx, s.platform, r) + if err != nil { + return nil, err + } + + s.containers[r.ID] = container + + s.send(&eventstypes.TaskCreate{ + ContainerID: r.ID, + Bundle: r.Bundle, + Rootfs: r.Rootfs, + IO: &eventstypes.TaskIO{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }, + Checkpoint: r.Checkpoint, + Pid: uint32(container.Pid()), + }) + + return &taskAPI.CreateTaskResponse{ + Pid: uint32(container.Pid()), + }, nil +} + +func (s *service) RegisterTTRPC(server *ttrpc.Server) error { + shimapi.RegisterTaskService(server, s) + return nil +} + +// Start a process +func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + + // hold the send lock so that the start events are sent before any exit events in the error case + s.eventSendMu.Lock() + p, err := container.Start(ctx, r) + if err != nil { + s.eventSendMu.Unlock() + return nil, errdefs.ToGRPC(err) + } + + switch r.ExecID { + case "": + switch cg := container.Cgroup().(type) { + case cgroups.Cgroup: + if err := s.ep.Add(container.ID, cg); err != nil { + logrus.WithError(err).Error("add cg to OOM monitor") + } + case *cgroupsv2.Manager: + allControllers, err := cg.RootControllers() + if err != nil { + logrus.WithError(err).Error("failed to get root controllers") + } else { + if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil { + if userns.RunningInUserNS() { + logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers) + } else { + logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers) + } + } + } + if err := s.ep.Add(container.ID, cg); err != nil { + logrus.WithError(err).Error("add cg to OOM monitor") + } + } + + s.send(&eventstypes.TaskStart{ + ContainerID: container.ID, + Pid: uint32(p.Pid()), + }) + default: + s.send(&eventstypes.TaskExecStarted{ + ContainerID: container.ID, + ExecID: r.ExecID, + Pid: uint32(p.Pid()), + }) + } + s.eventSendMu.Unlock() + return &taskAPI.StartResponse{ + Pid: uint32(p.Pid()), + }, nil +} + +// Delete the initial process and container +func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Delete(ctx, r) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + // if we deleted an init task, send the task delete event + if r.ExecID == "" { + s.mu.Lock() + delete(s.containers, r.ID) + s.mu.Unlock() + s.send(&eventstypes.TaskDelete{ + ContainerID: container.ID, + Pid: uint32(p.Pid()), + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }) + } + return &taskAPI.DeleteResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + Pid: uint32(p.Pid()), + }, nil +} + +// Exec an additional process inside the container +func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + ok, cancel := container.ReserveProcess(r.ExecID) + if !ok { + return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) + } + process, err := container.Exec(ctx, r) + if err != nil { + cancel() + return nil, errdefs.ToGRPC(err) + } + + s.send(&eventstypes.TaskExecAdded{ + ContainerID: container.ID, + ExecID: process.ID(), + }) + return empty, nil +} + +// ResizePty of a process +func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.ResizePty(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// State returns runtime state information for a process +func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Process(r.ExecID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + st, err := p.Status(ctx) + if err != nil { + return nil, err + } + status := task.StatusUnknown + switch st { + case "created": + status = task.StatusCreated + case "running": + status = task.StatusRunning + case "stopped": + status = task.StatusStopped + case "paused": + status = task.StatusPaused + case "pausing": + status = task.StatusPausing + } + sio := p.Stdio() + return &taskAPI.StateResponse{ + ID: p.ID(), + Bundle: container.Bundle, + Pid: uint32(p.Pid()), + Status: status, + Stdin: sio.Stdin, + Stdout: sio.Stdout, + Stderr: sio.Stderr, + Terminal: sio.Terminal, + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +// Pause the container +func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Pause(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } + s.send(&eventstypes.TaskPaused{ + ContainerID: container.ID, + }) + return empty, nil +} + +// Resume the container +func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Resume(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } + s.send(&eventstypes.TaskResumed{ + ContainerID: container.ID, + }) + return empty, nil +} + +// Kill a process with the provided signal +func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Kill(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Pids returns all pids inside the container +func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + pids, err := s.getContainerPids(ctx, r.ID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + var processes []*task.ProcessInfo + for _, pid := range pids { + pInfo := task.ProcessInfo{ + Pid: pid, + } + for _, p := range container.ExecdProcesses() { + if p.Pid() == int(pid) { + d := &options.ProcessDetails{ + ExecID: p.ID(), + } + a, err := typeurl.MarshalAny(d) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal process %d info", pid) + } + pInfo.Info = a + break + } + } + processes = append(processes, &pInfo) + } + return &taskAPI.PidsResponse{ + Processes: processes, + }, nil +} + +// CloseIO of a process +func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.CloseIO(ctx, r); err != nil { + return nil, err + } + return empty, nil +} + +// Checkpoint the container +func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Checkpoint(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Update a running container +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Update(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Wait for a process to exit +func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Process(r.ExecID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + p.Wait() + + return &taskAPI.WaitResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +// Connect returns shim information such as the shim's pid +func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { + var pid int + if container, err := s.getContainer(r.ID); err == nil { + pid = container.Pid() + } + return &taskAPI.ConnectResponse{ + ShimPid: uint32(os.Getpid()), + TaskPid: uint32(pid), + }, nil +} + +func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { + s.mu.Lock() + defer s.mu.Unlock() + + // return out if the shim is still servicing containers + if len(s.containers) > 0 { + return empty, nil + } + + // please make sure that temporary resource has been cleanup or registered + // for cleanup before calling shutdown + s.shutdown.Shutdown() + + return empty, nil +} + +func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + cgx := container.Cgroup() + if cgx == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") + } + var statsx interface{} + switch cg := cgx.(type) { + case cgroups.Cgroup: + stats, err := cg.Stat(cgroups.IgnoreNotExist) + if err != nil { + return nil, err + } + statsx = stats + case *cgroupsv2.Manager: + stats, err := cg.Stat() + if err != nil { + return nil, err + } + statsx = stats + default: + return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg) + } + data, err := typeurl.MarshalAny(statsx) + if err != nil { + return nil, err + } + return &taskAPI.StatsResponse{ + Stats: data, + }, nil +} + +func (s *service) processExits() { + for e := range s.ec { + s.checkProcesses(e) + } +} + +func (s *service) send(evt interface{}) { + s.events <- evt +} + +func (s *service) sendL(evt interface{}) { + s.eventSendMu.Lock() + s.events <- evt + s.eventSendMu.Unlock() +} + +func (s *service) checkProcesses(e runcC.Exit) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, container := range s.containers { + if !container.HasPid(e.Pid) { + continue + } + + for _, p := range container.All() { + if p.Pid() != e.Pid { + continue + } + + if ip, ok := p.(*process.Init); ok { + // Ensure all children are killed + if runc.ShouldKillAllOnExit(s.context, container.Bundle) { + if err := ip.KillAll(s.context); err != nil { + logrus.WithError(err).WithField("id", ip.ID()). + Error("failed to kill init's children") + } + } + } + + p.SetExited(e.Status) + s.sendL(&eventstypes.TaskExit{ + ContainerID: container.ID, + ID: p.ID(), + Pid: uint32(e.Pid), + ExitStatus: uint32(e.Status), + ExitedAt: p.ExitedAt(), + }) + return + } + return + } +} + +func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { + container, err := s.getContainer(id) + if err != nil { + return nil, err + } + p, err := container.Process("") + if err != nil { + return nil, errdefs.ToGRPC(err) + } + ps, err := p.(*process.Init).Runtime().Ps(ctx, id) + if err != nil { + return nil, err + } + pids := make([]uint32, 0, len(ps)) + for _, pid := range ps { + pids = append(pids, uint32(pid)) + } + return pids, nil +} + +func (s *service) forward(ctx context.Context, publisher shim.Publisher) { + ns, _ := namespaces.Namespace(ctx) + ctx = namespaces.WithNamespace(context.Background(), ns) + for e := range s.events { + err := publisher.Publish(ctx, runc.GetTopic(e), e) + if err != nil { + logrus.WithError(err).Error("post event") + } + } + publisher.Close() +} + +func (s *service) getContainer(id string) (*runc.Container, error) { + s.mu.Lock() + container := s.containers[id] + s.mu.Unlock() + if container == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") + } + return container, nil +} + +// initialize a single epoll fd to manage our consoles. `initPlatform` should +// only be called once. +func (s *service) initPlatform() error { + if s.platform != nil { + return nil + } + p, err := runc.NewPlatform() + if err != nil { + return err + } + s.platform = p + s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() }) + return nil +} diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go index 857445471..17ab82075 100644 --- a/runtime/v2/runc/v2/service.go +++ b/runtime/v2/runc/v2/service.go @@ -21,627 +21,59 @@ package v2 import ( "context" - "os" - "sync" - "github.com/containerd/cgroups" - cgroupsv2 "github.com/containerd/cgroups/v2" - eventstypes "github.com/containerd/containerd/api/events" - "github.com/containerd/containerd/api/types/task" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/pkg/oom" - oomv1 "github.com/containerd/containerd/pkg/oom/v1" - oomv2 "github.com/containerd/containerd/pkg/oom/v2" - "github.com/containerd/containerd/pkg/process" "github.com/containerd/containerd/pkg/shutdown" - "github.com/containerd/containerd/pkg/stdio" - "github.com/containerd/containerd/pkg/userns" - "github.com/containerd/containerd/plugin" - "github.com/containerd/containerd/runtime/v2/runc" - "github.com/containerd/containerd/runtime/v2/runc/options" - runcservice "github.com/containerd/containerd/runtime/v2/runc/service" + "github.com/containerd/containerd/runtime/v2/runc/manager" + "github.com/containerd/containerd/runtime/v2/runc/tasks" "github.com/containerd/containerd/runtime/v2/shim" shimapi "github.com/containerd/containerd/runtime/v2/task" - taskAPI "github.com/containerd/containerd/runtime/v2/task" - "github.com/containerd/containerd/sys/reaper" - runcC "github.com/containerd/go-runc" - "github.com/containerd/ttrpc" - "github.com/containerd/typeurl" - ptypes "github.com/gogo/protobuf/types" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) -var ( - _ = (taskAPI.TaskService)(&service{}) - empty = &ptypes.Empty{} -) +// TODO(2.0): Remove this package -// New returns a new shim service that can be used via GRPC -// TODO(2.0): Remove this function, rely on plugin registration -func New(_ context.Context, id string, _ shim.Publisher, _ func()) (shim.Shim, error) { - plugin.Register(&plugin.Registration{ - Type: plugin.TTRPCPlugin, - ID: "task", - Requires: []plugin.Type{ - plugin.EventPlugin, - plugin.InternalPlugin, - }, - InitFn: func(ic *plugin.InitContext) (interface{}, error) { - pp, err := ic.GetByID(plugin.EventPlugin, "publisher") - if err != nil { - return nil, err - } - ss, err := ic.GetByID(plugin.InternalPlugin, "shutdown") - if err != nil { - return nil, err - } - return NewTaskService(ic.Context, pp.(shim.Publisher), ss.(shutdown.Service)) - }, - }) - - return runcservice.NewShimManager(id), nil +type shimTaskManager struct { + shimapi.TaskService + id string + manager shim.Manager } -// NewTaskService creates a new instance of a task service -func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) { - var ( - ep oom.Watcher - err error - ) - if cgroups.Mode() == cgroups.Unified { - ep, err = oomv2.New(publisher) - } else { - ep, err = oomv1.New(publisher) - } +func (stm *shimTaskManager) Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) { + ss, err := stm.manager.Stop(ctx, stm.id) if err != nil { return nil, err } - go ep.Run(ctx) - s := &service{ - context: ctx, - events: make(chan interface{}, 128), - ec: reaper.Default.Subscribe(), - ep: ep, - shutdown: sd, - containers: make(map[string]*runc.Container), - } - go s.processExits() - runcC.Monitor = reaper.Default - if err := s.initPlatform(); err != nil { - return nil, errors.Wrap(err, "failed to initialized platform behavior") - } - go s.forward(ctx, publisher) - sd.RegisterCallback(func(context.Context) error { - close(s.events) - return nil - }) - - if address, err := shim.ReadAddress("address"); err == nil { - sd.RegisterCallback(func(context.Context) error { - return shim.RemoveSocket(address) - }) - } - return s, nil -} - -// service is the shim implementation of a remote shim over GRPC -type service struct { - mu sync.Mutex - eventSendMu sync.Mutex - - context context.Context - events chan interface{} - platform stdio.Platform - ec chan runcC.Exit - ep oom.Watcher - - containers map[string]*runc.Container - - shutdown shutdown.Service -} - -// Create a new initial process and container with the underlying OCI runtime -func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { - s.mu.Lock() - defer s.mu.Unlock() - - container, err := runc.NewContainer(ctx, s.platform, r) - if err != nil { - return nil, err - } - - s.containers[r.ID] = container - - s.send(&eventstypes.TaskCreate{ - ContainerID: r.ID, - Bundle: r.Bundle, - Rootfs: r.Rootfs, - IO: &eventstypes.TaskIO{ - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Terminal: r.Terminal, - }, - Checkpoint: r.Checkpoint, - Pid: uint32(container.Pid()), - }) - - return &taskAPI.CreateTaskResponse{ - Pid: uint32(container.Pid()), + return &shimapi.DeleteResponse{ + Pid: uint32(ss.Pid), + ExitStatus: uint32(ss.ExitStatus), + ExitedAt: ss.ExitedAt, }, nil } -func (s *service) RegisterTTRPC(server *ttrpc.Server) error { - shimapi.RegisterTaskService(server, s) - return nil +func (stm *shimTaskManager) StartShim(ctx context.Context, opts shim.StartOpts) (string, error) { + return stm.manager.Start(ctx, opts.ID, opts) } -// Start a process -func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - - // hold the send lock so that the start events are sent before any exit events in the error case - s.eventSendMu.Lock() - p, err := container.Start(ctx, r) - if err != nil { - s.eventSendMu.Unlock() - return nil, errdefs.ToGRPC(err) - } - - switch r.ExecID { - case "": - switch cg := container.Cgroup().(type) { - case cgroups.Cgroup: - if err := s.ep.Add(container.ID, cg); err != nil { - logrus.WithError(err).Error("add cg to OOM monitor") - } - case *cgroupsv2.Manager: - allControllers, err := cg.RootControllers() - if err != nil { - logrus.WithError(err).Error("failed to get root controllers") - } else { - if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil { - if userns.RunningInUserNS() { - logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers) - } else { - logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers) - } - } - } - if err := s.ep.Add(container.ID, cg); err != nil { - logrus.WithError(err).Error("add cg to OOM monitor") - } - } - - s.send(&eventstypes.TaskStart{ - ContainerID: container.ID, - Pid: uint32(p.Pid()), - }) - default: - s.send(&eventstypes.TaskExecStarted{ - ContainerID: container.ID, - ExecID: r.ExecID, - Pid: uint32(p.Pid()), - }) - } - s.eventSendMu.Unlock() - return &taskAPI.StartResponse{ - Pid: uint32(p.Pid()), - }, nil -} - -// Delete the initial process and container -func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - p, err := container.Delete(ctx, r) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - // if we deleted an init task, send the task delete event - if r.ExecID == "" { - s.mu.Lock() - delete(s.containers, r.ID) - s.mu.Unlock() - s.send(&eventstypes.TaskDelete{ - ContainerID: container.ID, - Pid: uint32(p.Pid()), - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - }) - } - return &taskAPI.DeleteResponse{ - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - Pid: uint32(p.Pid()), - }, nil -} - -// Exec an additional process inside the container -func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - ok, cancel := container.ReserveProcess(r.ExecID) +// New returns a new shim service that can be used for +// - serving the task service over grpc/ttrpc +// - shim management +// This function is deprecated in favor direct creation +// of shim manager and registering task service via plugins. +func New(ctx context.Context, id string, publisher shim.Publisher, fn func()) (shim.Shim, error) { + sd, ok := ctx.(shutdown.Service) if !ok { - return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) + ctx, sd = shutdown.WithShutdown(ctx) + sd.RegisterCallback(func(context.Context) error { + fn() + return nil + }) } - process, err := container.Exec(ctx, r) - if err != nil { - cancel() - return nil, errdefs.ToGRPC(err) - } - - s.send(&eventstypes.TaskExecAdded{ - ContainerID: container.ID, - ExecID: process.ID(), - }) - return empty, nil -} - -// ResizePty of a process -func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) + ts, err := tasks.NewTaskService(ctx, publisher, sd) if err != nil { return nil, err } - if err := container.ResizePty(ctx, r); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// State returns runtime state information for a process -func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - p, err := container.Process(r.ExecID) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - st, err := p.Status(ctx) - if err != nil { - return nil, err - } - status := task.StatusUnknown - switch st { - case "created": - status = task.StatusCreated - case "running": - status = task.StatusRunning - case "stopped": - status = task.StatusStopped - case "paused": - status = task.StatusPaused - case "pausing": - status = task.StatusPausing - } - sio := p.Stdio() - return &taskAPI.StateResponse{ - ID: p.ID(), - Bundle: container.Bundle, - Pid: uint32(p.Pid()), - Status: status, - Stdin: sio.Stdin, - Stdout: sio.Stdout, - Stderr: sio.Stderr, - Terminal: sio.Terminal, - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), + return &shimTaskManager{ + TaskService: ts, + id: id, + manager: manager.NewShimManager("runc"), }, nil } - -// Pause the container -func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Pause(ctx); err != nil { - return nil, errdefs.ToGRPC(err) - } - s.send(&eventstypes.TaskPaused{ - ContainerID: container.ID, - }) - return empty, nil -} - -// Resume the container -func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Resume(ctx); err != nil { - return nil, errdefs.ToGRPC(err) - } - s.send(&eventstypes.TaskResumed{ - ContainerID: container.ID, - }) - return empty, nil -} - -// Kill a process with the provided signal -func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Kill(ctx, r); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// Pids returns all pids inside the container -func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - pids, err := s.getContainerPids(ctx, r.ID) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - var processes []*task.ProcessInfo - for _, pid := range pids { - pInfo := task.ProcessInfo{ - Pid: pid, - } - for _, p := range container.ExecdProcesses() { - if p.Pid() == int(pid) { - d := &options.ProcessDetails{ - ExecID: p.ID(), - } - a, err := typeurl.MarshalAny(d) - if err != nil { - return nil, errors.Wrapf(err, "failed to marshal process %d info", pid) - } - pInfo.Info = a - break - } - } - processes = append(processes, &pInfo) - } - return &taskAPI.PidsResponse{ - Processes: processes, - }, nil -} - -// CloseIO of a process -func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.CloseIO(ctx, r); err != nil { - return nil, err - } - return empty, nil -} - -// Checkpoint the container -func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Checkpoint(ctx, r); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// Update a running container -func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - if err := container.Update(ctx, r); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// Wait for a process to exit -func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - p, err := container.Process(r.ExecID) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - p.Wait() - - return &taskAPI.WaitResponse{ - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - }, nil -} - -// Connect returns shim information such as the shim's pid -func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { - var pid int - if container, err := s.getContainer(r.ID); err == nil { - pid = container.Pid() - } - return &taskAPI.ConnectResponse{ - ShimPid: uint32(os.Getpid()), - TaskPid: uint32(pid), - }, nil -} - -func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { - s.mu.Lock() - defer s.mu.Unlock() - - // return out if the shim is still servicing containers - if len(s.containers) > 0 { - return empty, nil - } - - // please make sure that temporary resource has been cleanup or registered - // for cleanup before calling shutdown - s.shutdown.Shutdown() - - return empty, nil -} - -func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { - container, err := s.getContainer(r.ID) - if err != nil { - return nil, err - } - cgx := container.Cgroup() - if cgx == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") - } - var statsx interface{} - switch cg := cgx.(type) { - case cgroups.Cgroup: - stats, err := cg.Stat(cgroups.IgnoreNotExist) - if err != nil { - return nil, err - } - statsx = stats - case *cgroupsv2.Manager: - stats, err := cg.Stat() - if err != nil { - return nil, err - } - statsx = stats - default: - return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg) - } - data, err := typeurl.MarshalAny(statsx) - if err != nil { - return nil, err - } - return &taskAPI.StatsResponse{ - Stats: data, - }, nil -} - -func (s *service) processExits() { - for e := range s.ec { - s.checkProcesses(e) - } -} - -func (s *service) send(evt interface{}) { - s.events <- evt -} - -func (s *service) sendL(evt interface{}) { - s.eventSendMu.Lock() - s.events <- evt - s.eventSendMu.Unlock() -} - -func (s *service) checkProcesses(e runcC.Exit) { - s.mu.Lock() - defer s.mu.Unlock() - - for _, container := range s.containers { - if !container.HasPid(e.Pid) { - continue - } - - for _, p := range container.All() { - if p.Pid() != e.Pid { - continue - } - - if ip, ok := p.(*process.Init); ok { - // Ensure all children are killed - if runc.ShouldKillAllOnExit(s.context, container.Bundle) { - if err := ip.KillAll(s.context); err != nil { - logrus.WithError(err).WithField("id", ip.ID()). - Error("failed to kill init's children") - } - } - } - - p.SetExited(e.Status) - s.sendL(&eventstypes.TaskExit{ - ContainerID: container.ID, - ID: p.ID(), - Pid: uint32(e.Pid), - ExitStatus: uint32(e.Status), - ExitedAt: p.ExitedAt(), - }) - return - } - return - } -} - -func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { - container, err := s.getContainer(id) - if err != nil { - return nil, err - } - p, err := container.Process("") - if err != nil { - return nil, errdefs.ToGRPC(err) - } - ps, err := p.(*process.Init).Runtime().Ps(ctx, id) - if err != nil { - return nil, err - } - pids := make([]uint32, 0, len(ps)) - for _, pid := range ps { - pids = append(pids, uint32(pid)) - } - return pids, nil -} - -func (s *service) forward(ctx context.Context, publisher shim.Publisher) { - ns, _ := namespaces.Namespace(ctx) - ctx = namespaces.WithNamespace(context.Background(), ns) - for e := range s.events { - err := publisher.Publish(ctx, runc.GetTopic(e), e) - if err != nil { - logrus.WithError(err).Error("post event") - } - } - publisher.Close() -} - -func (s *service) getContainer(id string) (*runc.Container, error) { - s.mu.Lock() - container := s.containers[id] - s.mu.Unlock() - if container == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") - } - return container, nil -} - -// initialize a single epoll fd to manage our consoles. `initPlatform` should -// only be called once. -func (s *service) initPlatform() error { - if s.platform != nil { - return nil - } - p, err := runc.NewPlatform() - if err != nil { - return err - } - s.platform = p - s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() }) - return nil -} diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index 18a585082..e25626cf3 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -48,21 +48,37 @@ type Publisher interface { // StartOpts describes shim start configuration received from containerd type StartOpts struct { - ID string + ID string // TODO(2.0): Remove ID, passed directly to start for call symmetry ContainerdBinary string Address string TTRPCAddress string } +type StopStatus struct { + Pid int + ExitStatus int + ExitedAt time.Time +} + // Init func for the creation of a shim server +// TODO(2.0): Remove init function type Init func(context.Context, string, Publisher, func()) (Shim, error) // Shim server interface +// TODO(2.0): Remove unified shim interface type Shim interface { - Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) // TODO(2.0): Update interface to pass ID directly to Cleanup + shimapi.TaskService + Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) StartShim(ctx context.Context, opts StartOpts) (string, error) } +// Manager is the interface which manages the shim process +type Manager interface { + Name() string + Start(ctx context.Context, id string, opts StartOpts) (string, error) + Stop(ctx context.Context, id string) (StopStatus, error) +} + // OptsKey is the context key for the Opts value. type OptsKey struct{} @@ -90,18 +106,18 @@ type ttrpcService interface { } type taskService struct { - local shimapi.TaskService + shimapi.TaskService } -func (t *taskService) RegisterTTRPC(server *ttrpc.Server) error { - shimapi.RegisterTaskService(server, t.local) +func (t taskService) RegisterTTRPC(server *ttrpc.Server) error { + shimapi.RegisterTaskService(server, t.TaskService) return nil } var ( debugFlag bool versionFlag bool - idFlag string + id string namespaceFlag string socketFlag string bundlePath string @@ -118,7 +134,7 @@ func parseFlags() { flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs") flag.BoolVar(&versionFlag, "v", false, "show the shim version and exit") flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim") - flag.StringVar(&idFlag, "id", "", "id of the task") + flag.StringVar(&id, "id", "", "id of the task") flag.StringVar(&socketFlag, "socket", "", "socket path to serve") flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir") @@ -143,37 +159,85 @@ func setRuntime() { } } -func setLogger(ctx context.Context, id string) error { - logrus.SetFormatter(&logrus.TextFormatter{ +func setLogger(ctx context.Context, id string) (context.Context, error) { + l := log.G(ctx) + l.Logger.SetFormatter(&logrus.TextFormatter{ TimestampFormat: log.RFC3339NanoFixed, FullTimestamp: true, }) if debugFlag { - logrus.SetLevel(logrus.DebugLevel) + l.Logger.SetLevel(logrus.DebugLevel) } f, err := openLog(ctx, id) if err != nil { - return err + return ctx, err } - logrus.SetOutput(f) - return nil + l.Logger.SetOutput(f) + return log.WithLogger(ctx, l), nil } // Run initializes and runs a shim server -// TODO(2.0): Remove initFunc from arguments -func Run(id string, initFunc Init, opts ...BinaryOpts) { +// TODO(2.0): Remove function +func Run(name string, initFunc Init, opts ...BinaryOpts) { var config Config for _, o := range opts { o(&config) } - if err := run(id, initFunc, config); err != nil { - fmt.Fprintf(os.Stderr, "%s: %s\n", id, err) + ctx := context.Background() + ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", name)) + + if err := run(ctx, nil, initFunc, name, config); err != nil { + fmt.Fprintf(os.Stderr, "%s: %s", name, err) os.Exit(1) } } -func run(id string, initFunc Init, config Config) error { +// TODO(2.0): Remove this type +type shimToManager struct { + shim Shim + name string +} + +func (stm shimToManager) Name() string { + return stm.name +} + +func (stm shimToManager) Start(ctx context.Context, id string, opts StartOpts) (string, error) { + opts.ID = id + return stm.shim.StartShim(ctx, opts) +} + +func (stm shimToManager) Stop(ctx context.Context, id string) (StopStatus, error) { + // shim must already have id + dr, err := stm.shim.Cleanup(ctx) + if err != nil { + return StopStatus{}, err + } + return StopStatus{ + Pid: int(dr.Pid), + ExitStatus: int(dr.ExitStatus), + ExitedAt: dr.ExitedAt, + }, nil +} + +// RunManager initialzes and runs a shim server +// TODO(2.0): Rename to Run +func RunManager(ctx context.Context, manager Manager, opts ...BinaryOpts) { + var config Config + for _, o := range opts { + o(&config) + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", manager.Name())) + + if err := run(ctx, manager, nil, "", config); err != nil { + fmt.Fprintf(os.Stderr, "%s: %s", manager.Name(), err) + os.Exit(1) + } +} + +func run(ctx context.Context, manager Manager, initFunc Init, name string, config Config) error { parseFlags() if versionFlag { fmt.Printf("%s:\n", os.Args[0]) @@ -208,29 +272,49 @@ func run(id string, initFunc Init, config Config) error { } defer publisher.Close() - ctx := namespaces.WithNamespace(context.Background(), namespaceFlag) + ctx = namespaces.WithNamespace(ctx, namespaceFlag) ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) - ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id)) ctx, sd := shutdown.WithShutdown(ctx) defer sd.Shutdown() - service, err := initFunc(ctx, idFlag, publisher, sd.Shutdown) - if err != nil { - return err + + if manager == nil { + service, err := initFunc(ctx, id, publisher, sd.Shutdown) + if err != nil { + return err + } + plugin.Register(&plugin.Registration{ + Type: plugin.TTRPCPlugin, + ID: "task", + Requires: []plugin.Type{ + plugin.EventPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + return taskService{service}, nil + }, + }) + manager = shimToManager{ + shim: service, + name: name, + } } // Handle explicit actions switch action { case "delete": - logger := logrus.WithFields(logrus.Fields{ + logger := log.G(ctx).WithFields(logrus.Fields{ "pid": os.Getpid(), "namespace": namespaceFlag, }) go handleSignals(ctx, logger, signals) - response, err := service.Cleanup(ctx) + ss, err := manager.Stop(ctx, id) if err != nil { return err } - data, err := proto.Marshal(response) + data, err := proto.Marshal(&shimapi.DeleteResponse{ + Pid: uint32(ss.Pid), + ExitStatus: uint32(ss.ExitStatus), + ExitedAt: ss.ExitedAt, + }) if err != nil { return err } @@ -240,13 +324,12 @@ func run(id string, initFunc Init, config Config) error { return nil case "start": opts := StartOpts{ - ID: idFlag, ContainerdBinary: containerdBinaryFlag, Address: addressFlag, TTRPCAddress: ttrpcAddress, } - address, err := service.StartShim(ctx, opts) + address, err := manager.Start(ctx, id, opts) if err != nil { return err } @@ -257,7 +340,8 @@ func run(id string, initFunc Init, config Config) error { } if !config.NoSetupLogger { - if err := setLogger(ctx, idFlag); err != nil { + ctx, err = setLogger(ctx, id) + if err != nil { return err } } @@ -279,20 +363,6 @@ func run(id string, initFunc Init, config Config) error { }, }) - // If service is an implementation of the task service, register it as a plugin - if ts, ok := service.(shimapi.TaskService); ok { - plugin.Register(&plugin.Registration{ - Type: plugin.TTRPCPlugin, - ID: "task", - Requires: []plugin.Type{ - plugin.EventPlugin, - }, - InitFn: func(ic *plugin.InitContext) (interface{}, error) { - return &taskService{ts}, nil - }, - }) - } - var ( initialized = plugin.NewPluginSet() ttrpcServices = []ttrpcService{} @@ -397,10 +467,10 @@ func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal) er defer l.Close() if err := server.Serve(ctx, l); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure") + log.G(ctx).WithError(err).Fatal("containerd-shim: ttrpc server failure") } }() - logger := logrus.WithFields(logrus.Fields{ + logger := log.G(ctx).WithFields(logrus.Fields{ "pid": os.Getpid(), "path": path, "namespace": namespaceFlag,