Seperate shim manager and task service

Create new shim manager interface and deprecate older
shim manager interface.

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2021-09-20 10:48:13 -07:00
parent 6835a94707
commit 04e57d71b2
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
4 changed files with 789 additions and 667 deletions

View File

@ -1,5 +1,3 @@
// +build linux
/*
Copyright The containerd Authors.
@ -16,7 +14,7 @@
limitations under the License.
*/
package service
package manager
import (
"context"
@ -30,6 +28,7 @@ import (
"github.com/containerd/cgroups"
cgroupsv2 "github.com/containerd/cgroups/v2"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/process"
@ -37,22 +36,20 @@ import (
"github.com/containerd/containerd/runtime/v2/runc"
"github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/runtime/v2/shim"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
runcC "github.com/containerd/go-runc"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/proto"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
exec "golang.org/x/sys/execabs"
"golang.org/x/sys/unix"
)
// NewShimManager returns an implementation of the shim manager
// using runc
func NewShimManager(id string) shim.Shim {
func NewShimManager(name string) shim.Manager {
return &manager{
id: id,
name: name,
}
}
@ -69,7 +66,7 @@ type spec struct {
}
type manager struct {
id string
name string
}
func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
@ -112,12 +109,16 @@ func readSpec() (*spec, error) {
return &s, nil
}
func (manager) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, retErr error) {
cmd, err := newCommand(ctx, opts.ID, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress)
func (m manager) Name() string {
return m.name
}
func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ string, retErr error) {
cmd, err := newCommand(ctx, id, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress)
if err != nil {
return "", err
}
grouping := opts.ID
grouping := id
spec, err := readSpec()
if err != nil {
return "", err
@ -236,24 +237,24 @@ func (manager) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, re
return address, nil
}
func (m manager) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
func (manager) Stop(ctx context.Context, id string) (shim.StopStatus, error) {
cwd, err := os.Getwd()
if err != nil {
return nil, err
return shim.StopStatus{}, err
}
path := filepath.Join(filepath.Dir(cwd), m.id)
path := filepath.Join(filepath.Dir(cwd), id)
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
return shim.StopStatus{}, err
}
runtime, err := runc.ReadRuntime(path)
if err != nil {
return nil, err
return shim.StopStatus{}, err
}
opts, err := runc.ReadOptions(path)
if err != nil {
return nil, err
return shim.StopStatus{}, err
}
root := process.RuncRoot
if opts != nil && opts.Root != "" {
@ -261,16 +262,16 @@ func (m manager) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
}
r := process.NewRunc(root, path, ns, runtime, "", false)
if err := r.Delete(ctx, m.id, &runcC.DeleteOpts{
if err := r.Delete(ctx, id, &runcC.DeleteOpts{
Force: true,
}); err != nil {
logrus.WithError(err).Warn("failed to remove runc container")
log.G(ctx).WithError(err).Warn("failed to remove runc container")
}
if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil {
logrus.WithError(err).Warn("failed to cleanup rootfs mount")
log.G(ctx).WithError(err).Warn("failed to cleanup rootfs mount")
}
return &taskAPI.DeleteResponse{
return shim.StopStatus{
ExitedAt: time.Now(),
ExitStatus: 128 + uint32(unix.SIGKILL),
ExitStatus: 128 + int(unix.SIGKILL),
}, nil
}

View File

@ -0,0 +1,619 @@
//go:build linux
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tasks
import (
"context"
"os"
"sync"
"github.com/containerd/cgroups"
cgroupsv2 "github.com/containerd/cgroups/v2"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/oom"
oomv1 "github.com/containerd/containerd/pkg/oom/v1"
oomv2 "github.com/containerd/containerd/pkg/oom/v2"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/pkg/shutdown"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/pkg/userns"
"github.com/containerd/containerd/runtime/v2/runc"
"github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/runtime/v2/shim"
shimapi "github.com/containerd/containerd/runtime/v2/task"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/containerd/sys/reaper"
runcC "github.com/containerd/go-runc"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var (
_ = (taskAPI.TaskService)(&service{})
empty = &ptypes.Empty{}
)
// NewTaskService creates a new instance of a task service
func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) {
var (
ep oom.Watcher
err error
)
if cgroups.Mode() == cgroups.Unified {
ep, err = oomv2.New(publisher)
} else {
ep, err = oomv1.New(publisher)
}
if err != nil {
return nil, err
}
go ep.Run(ctx)
s := &service{
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
}
go s.processExits()
runcC.Monitor = reaper.Default
if err := s.initPlatform(); err != nil {
return nil, errors.Wrap(err, "failed to initialized platform behavior")
}
go s.forward(ctx, publisher)
sd.RegisterCallback(func(context.Context) error {
close(s.events)
return nil
})
if address, err := shim.ReadAddress("address"); err == nil {
sd.RegisterCallback(func(context.Context) error {
return shim.RemoveSocket(address)
})
}
return s, nil
}
// service is the shim implementation of a remote shim over GRPC
type service struct {
mu sync.Mutex
eventSendMu sync.Mutex
context context.Context
events chan interface{}
platform stdio.Platform
ec chan runcC.Exit
ep oom.Watcher
containers map[string]*runc.Container
shutdown shutdown.Service
}
// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()
container, err := runc.NewContainer(ctx, s.platform, r)
if err != nil {
return nil, err
}
s.containers[r.ID] = container
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: uint32(container.Pid()),
})
return &taskAPI.CreateTaskResponse{
Pid: uint32(container.Pid()),
}, nil
}
func (s *service) RegisterTTRPC(server *ttrpc.Server) error {
shimapi.RegisterTaskService(server, s)
return nil
}
// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()
p, err := container.Start(ctx, r)
if err != nil {
s.eventSendMu.Unlock()
return nil, errdefs.ToGRPC(err)
}
switch r.ExecID {
case "":
switch cg := container.Cgroup().(type) {
case cgroups.Cgroup:
if err := s.ep.Add(container.ID, cg); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
}
case *cgroupsv2.Manager:
allControllers, err := cg.RootControllers()
if err != nil {
logrus.WithError(err).Error("failed to get root controllers")
} else {
if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil {
if userns.RunningInUserNS() {
logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers)
} else {
logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers)
}
}
}
if err := s.ep.Add(container.ID, cg); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
}
}
s.send(&eventstypes.TaskStart{
ContainerID: container.ID,
Pid: uint32(p.Pid()),
})
default:
s.send(&eventstypes.TaskExecStarted{
ContainerID: container.ID,
ExecID: r.ExecID,
Pid: uint32(p.Pid()),
})
}
s.eventSendMu.Unlock()
return &taskAPI.StartResponse{
Pid: uint32(p.Pid()),
}, nil
}
// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Delete(ctx, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
// if we deleted an init task, send the task delete event
if r.ExecID == "" {
s.mu.Lock()
delete(s.containers, r.ID)
s.mu.Unlock()
s.send(&eventstypes.TaskDelete{
ContainerID: container.ID,
Pid: uint32(p.Pid()),
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
})
}
return &taskAPI.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
Pid: uint32(p.Pid()),
}, nil
}
// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
ok, cancel := container.ReserveProcess(r.ExecID)
if !ok {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
}
process, err := container.Exec(ctx, r)
if err != nil {
cancel()
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskExecAdded{
ContainerID: container.ID,
ExecID: process.ID(),
})
return empty, nil
}
// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.ResizePty(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Process(r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
st, err := p.Status(ctx)
if err != nil {
return nil, err
}
status := task.StatusUnknown
switch st {
case "created":
status = task.StatusCreated
case "running":
status = task.StatusRunning
case "stopped":
status = task.StatusStopped
case "paused":
status = task.StatusPaused
case "pausing":
status = task.StatusPausing
}
sio := p.Stdio()
return &taskAPI.StateResponse{
ID: p.ID(),
Bundle: container.Bundle,
Pid: uint32(p.Pid()),
Status: status,
Stdin: sio.Stdin,
Stdout: sio.Stdout,
Stderr: sio.Stderr,
Terminal: sio.Terminal,
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
}
// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Pause(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskPaused{
ContainerID: container.ID,
})
return empty, nil
}
// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Resume(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskResumed{
ContainerID: container.ID,
})
return empty, nil
}
// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Kill(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Pids returns all pids inside the container
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
pids, err := s.getContainerPids(ctx, r.ID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
var processes []*task.ProcessInfo
for _, pid := range pids {
pInfo := task.ProcessInfo{
Pid: pid,
}
for _, p := range container.ExecdProcesses() {
if p.Pid() == int(pid) {
d := &options.ProcessDetails{
ExecID: p.ID(),
}
a, err := typeurl.MarshalAny(d)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
}
pInfo.Info = a
break
}
}
processes = append(processes, &pInfo)
}
return &taskAPI.PidsResponse{
Processes: processes,
}, nil
}
// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.CloseIO(ctx, r); err != nil {
return nil, err
}
return empty, nil
}
// Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Checkpoint(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Update(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Process(r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
p.Wait()
return &taskAPI.WaitResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
}
// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
var pid int
if container, err := s.getContainer(r.ID); err == nil {
pid = container.Pid()
}
return &taskAPI.ConnectResponse{
ShimPid: uint32(os.Getpid()),
TaskPid: uint32(pid),
}, nil
}
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
// return out if the shim is still servicing containers
if len(s.containers) > 0 {
return empty, nil
}
// please make sure that temporary resource has been cleanup or registered
// for cleanup before calling shutdown
s.shutdown.Shutdown()
return empty, nil
}
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
cgx := container.Cgroup()
if cgx == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
}
var statsx interface{}
switch cg := cgx.(type) {
case cgroups.Cgroup:
stats, err := cg.Stat(cgroups.IgnoreNotExist)
if err != nil {
return nil, err
}
statsx = stats
case *cgroupsv2.Manager:
stats, err := cg.Stat()
if err != nil {
return nil, err
}
statsx = stats
default:
return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg)
}
data, err := typeurl.MarshalAny(statsx)
if err != nil {
return nil, err
}
return &taskAPI.StatsResponse{
Stats: data,
}, nil
}
func (s *service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
}
}
func (s *service) send(evt interface{}) {
s.events <- evt
}
func (s *service) sendL(evt interface{}) {
s.eventSendMu.Lock()
s.events <- evt
s.eventSendMu.Unlock()
}
func (s *service) checkProcesses(e runcC.Exit) {
s.mu.Lock()
defer s.mu.Unlock()
for _, container := range s.containers {
if !container.HasPid(e.Pid) {
continue
}
for _, p := range container.All() {
if p.Pid() != e.Pid {
continue
}
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if runc.ShouldKillAllOnExit(s.context, container.Bundle) {
if err := ip.KillAll(s.context); err != nil {
logrus.WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
}
p.SetExited(e.Status)
s.sendL(&eventstypes.TaskExit{
ContainerID: container.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: p.ExitedAt(),
})
return
}
return
}
}
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
container, err := s.getContainer(id)
if err != nil {
return nil, err
}
p, err := container.Process("")
if err != nil {
return nil, errdefs.ToGRPC(err)
}
ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
if err != nil {
return nil, err
}
pids := make([]uint32, 0, len(ps))
for _, pid := range ps {
pids = append(pids, uint32(pid))
}
return pids, nil
}
func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
ns, _ := namespaces.Namespace(ctx)
ctx = namespaces.WithNamespace(context.Background(), ns)
for e := range s.events {
err := publisher.Publish(ctx, runc.GetTopic(e), e)
if err != nil {
logrus.WithError(err).Error("post event")
}
}
publisher.Close()
}
func (s *service) getContainer(id string) (*runc.Container, error) {
s.mu.Lock()
container := s.containers[id]
s.mu.Unlock()
if container == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
}
return container, nil
}
// initialize a single epoll fd to manage our consoles. `initPlatform` should
// only be called once.
func (s *service) initPlatform() error {
if s.platform != nil {
return nil
}
p, err := runc.NewPlatform()
if err != nil {
return err
}
s.platform = p
s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() })
return nil
}

View File

@ -21,627 +21,59 @@ package v2
import (
"context"
"os"
"sync"
"github.com/containerd/cgroups"
cgroupsv2 "github.com/containerd/cgroups/v2"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/oom"
oomv1 "github.com/containerd/containerd/pkg/oom/v1"
oomv2 "github.com/containerd/containerd/pkg/oom/v2"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/pkg/shutdown"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/pkg/userns"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime/v2/runc"
"github.com/containerd/containerd/runtime/v2/runc/options"
runcservice "github.com/containerd/containerd/runtime/v2/runc/service"
"github.com/containerd/containerd/runtime/v2/runc/manager"
"github.com/containerd/containerd/runtime/v2/runc/tasks"
"github.com/containerd/containerd/runtime/v2/shim"
shimapi "github.com/containerd/containerd/runtime/v2/task"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/containerd/sys/reaper"
runcC "github.com/containerd/go-runc"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var (
_ = (taskAPI.TaskService)(&service{})
empty = &ptypes.Empty{}
)
// TODO(2.0): Remove this package
// New returns a new shim service that can be used via GRPC
// TODO(2.0): Remove this function, rely on plugin registration
func New(_ context.Context, id string, _ shim.Publisher, _ func()) (shim.Shim, error) {
plugin.Register(&plugin.Registration{
Type: plugin.TTRPCPlugin,
ID: "task",
Requires: []plugin.Type{
plugin.EventPlugin,
plugin.InternalPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
pp, err := ic.GetByID(plugin.EventPlugin, "publisher")
type shimTaskManager struct {
shimapi.TaskService
id string
manager shim.Manager
}
func (stm *shimTaskManager) Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) {
ss, err := stm.manager.Stop(ctx, stm.id)
if err != nil {
return nil, err
}
ss, err := ic.GetByID(plugin.InternalPlugin, "shutdown")
if err != nil {
return nil, err
}
return NewTaskService(ic.Context, pp.(shim.Publisher), ss.(shutdown.Service))
},
})
return runcservice.NewShimManager(id), nil
}
// NewTaskService creates a new instance of a task service
func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) {
var (
ep oom.Watcher
err error
)
if cgroups.Mode() == cgroups.Unified {
ep, err = oomv2.New(publisher)
} else {
ep, err = oomv1.New(publisher)
}
if err != nil {
return nil, err
}
go ep.Run(ctx)
s := &service{
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
}
go s.processExits()
runcC.Monitor = reaper.Default
if err := s.initPlatform(); err != nil {
return nil, errors.Wrap(err, "failed to initialized platform behavior")
}
go s.forward(ctx, publisher)
sd.RegisterCallback(func(context.Context) error {
close(s.events)
return nil
})
if address, err := shim.ReadAddress("address"); err == nil {
sd.RegisterCallback(func(context.Context) error {
return shim.RemoveSocket(address)
})
}
return s, nil
}
// service is the shim implementation of a remote shim over GRPC
type service struct {
mu sync.Mutex
eventSendMu sync.Mutex
context context.Context
events chan interface{}
platform stdio.Platform
ec chan runcC.Exit
ep oom.Watcher
containers map[string]*runc.Container
shutdown shutdown.Service
}
// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()
container, err := runc.NewContainer(ctx, s.platform, r)
if err != nil {
return nil, err
}
s.containers[r.ID] = container
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: uint32(container.Pid()),
})
return &taskAPI.CreateTaskResponse{
Pid: uint32(container.Pid()),
return &shimapi.DeleteResponse{
Pid: uint32(ss.Pid),
ExitStatus: uint32(ss.ExitStatus),
ExitedAt: ss.ExitedAt,
}, nil
}
func (s *service) RegisterTTRPC(server *ttrpc.Server) error {
shimapi.RegisterTaskService(server, s)
return nil
func (stm *shimTaskManager) StartShim(ctx context.Context, opts shim.StartOpts) (string, error) {
return stm.manager.Start(ctx, opts.ID, opts)
}
// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()
p, err := container.Start(ctx, r)
if err != nil {
s.eventSendMu.Unlock()
return nil, errdefs.ToGRPC(err)
}
switch r.ExecID {
case "":
switch cg := container.Cgroup().(type) {
case cgroups.Cgroup:
if err := s.ep.Add(container.ID, cg); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
}
case *cgroupsv2.Manager:
allControllers, err := cg.RootControllers()
if err != nil {
logrus.WithError(err).Error("failed to get root controllers")
} else {
if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil {
if userns.RunningInUserNS() {
logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers)
} else {
logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers)
}
}
}
if err := s.ep.Add(container.ID, cg); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
}
}
s.send(&eventstypes.TaskStart{
ContainerID: container.ID,
Pid: uint32(p.Pid()),
})
default:
s.send(&eventstypes.TaskExecStarted{
ContainerID: container.ID,
ExecID: r.ExecID,
Pid: uint32(p.Pid()),
})
}
s.eventSendMu.Unlock()
return &taskAPI.StartResponse{
Pid: uint32(p.Pid()),
}, nil
}
// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Delete(ctx, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
// if we deleted an init task, send the task delete event
if r.ExecID == "" {
s.mu.Lock()
delete(s.containers, r.ID)
s.mu.Unlock()
s.send(&eventstypes.TaskDelete{
ContainerID: container.ID,
Pid: uint32(p.Pid()),
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
})
}
return &taskAPI.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
Pid: uint32(p.Pid()),
}, nil
}
// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
ok, cancel := container.ReserveProcess(r.ExecID)
// New returns a new shim service that can be used for
// - serving the task service over grpc/ttrpc
// - shim management
// This function is deprecated in favor direct creation
// of shim manager and registering task service via plugins.
func New(ctx context.Context, id string, publisher shim.Publisher, fn func()) (shim.Shim, error) {
sd, ok := ctx.(shutdown.Service)
if !ok {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
}
process, err := container.Exec(ctx, r)
if err != nil {
cancel()
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskExecAdded{
ContainerID: container.ID,
ExecID: process.ID(),
})
return empty, nil
}
// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.ResizePty(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Process(r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
st, err := p.Status(ctx)
if err != nil {
return nil, err
}
status := task.StatusUnknown
switch st {
case "created":
status = task.StatusCreated
case "running":
status = task.StatusRunning
case "stopped":
status = task.StatusStopped
case "paused":
status = task.StatusPaused
case "pausing":
status = task.StatusPausing
}
sio := p.Stdio()
return &taskAPI.StateResponse{
ID: p.ID(),
Bundle: container.Bundle,
Pid: uint32(p.Pid()),
Status: status,
Stdin: sio.Stdin,
Stdout: sio.Stdout,
Stderr: sio.Stderr,
Terminal: sio.Terminal,
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
}
// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Pause(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskPaused{
ContainerID: container.ID,
})
return empty, nil
}
// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Resume(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskResumed{
ContainerID: container.ID,
})
return empty, nil
}
// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Kill(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Pids returns all pids inside the container
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
pids, err := s.getContainerPids(ctx, r.ID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
var processes []*task.ProcessInfo
for _, pid := range pids {
pInfo := task.ProcessInfo{
Pid: pid,
}
for _, p := range container.ExecdProcesses() {
if p.Pid() == int(pid) {
d := &options.ProcessDetails{
ExecID: p.ID(),
}
a, err := typeurl.MarshalAny(d)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
}
pInfo.Info = a
break
}
}
processes = append(processes, &pInfo)
}
return &taskAPI.PidsResponse{
Processes: processes,
}, nil
}
// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.CloseIO(ctx, r); err != nil {
return nil, err
}
return empty, nil
}
// Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Checkpoint(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Update(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Process(r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
p.Wait()
return &taskAPI.WaitResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
}
// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
var pid int
if container, err := s.getContainer(r.ID); err == nil {
pid = container.Pid()
}
return &taskAPI.ConnectResponse{
ShimPid: uint32(os.Getpid()),
TaskPid: uint32(pid),
}, nil
}
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
// return out if the shim is still servicing containers
if len(s.containers) > 0 {
return empty, nil
}
// please make sure that temporary resource has been cleanup or registered
// for cleanup before calling shutdown
s.shutdown.Shutdown()
return empty, nil
}
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
cgx := container.Cgroup()
if cgx == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
}
var statsx interface{}
switch cg := cgx.(type) {
case cgroups.Cgroup:
stats, err := cg.Stat(cgroups.IgnoreNotExist)
if err != nil {
return nil, err
}
statsx = stats
case *cgroupsv2.Manager:
stats, err := cg.Stat()
if err != nil {
return nil, err
}
statsx = stats
default:
return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg)
}
data, err := typeurl.MarshalAny(statsx)
if err != nil {
return nil, err
}
return &taskAPI.StatsResponse{
Stats: data,
}, nil
}
func (s *service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
}
}
func (s *service) send(evt interface{}) {
s.events <- evt
}
func (s *service) sendL(evt interface{}) {
s.eventSendMu.Lock()
s.events <- evt
s.eventSendMu.Unlock()
}
func (s *service) checkProcesses(e runcC.Exit) {
s.mu.Lock()
defer s.mu.Unlock()
for _, container := range s.containers {
if !container.HasPid(e.Pid) {
continue
}
for _, p := range container.All() {
if p.Pid() != e.Pid {
continue
}
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if runc.ShouldKillAllOnExit(s.context, container.Bundle) {
if err := ip.KillAll(s.context); err != nil {
logrus.WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
}
p.SetExited(e.Status)
s.sendL(&eventstypes.TaskExit{
ContainerID: container.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: p.ExitedAt(),
})
return
}
return
}
}
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
container, err := s.getContainer(id)
if err != nil {
return nil, err
}
p, err := container.Process("")
if err != nil {
return nil, errdefs.ToGRPC(err)
}
ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
if err != nil {
return nil, err
}
pids := make([]uint32, 0, len(ps))
for _, pid := range ps {
pids = append(pids, uint32(pid))
}
return pids, nil
}
func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
ns, _ := namespaces.Namespace(ctx)
ctx = namespaces.WithNamespace(context.Background(), ns)
for e := range s.events {
err := publisher.Publish(ctx, runc.GetTopic(e), e)
if err != nil {
logrus.WithError(err).Error("post event")
}
}
publisher.Close()
}
func (s *service) getContainer(id string) (*runc.Container, error) {
s.mu.Lock()
container := s.containers[id]
s.mu.Unlock()
if container == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
}
return container, nil
}
// initialize a single epoll fd to manage our consoles. `initPlatform` should
// only be called once.
func (s *service) initPlatform() error {
if s.platform != nil {
ctx, sd = shutdown.WithShutdown(ctx)
sd.RegisterCallback(func(context.Context) error {
fn()
return nil
})
}
p, err := runc.NewPlatform()
ts, err := tasks.NewTaskService(ctx, publisher, sd)
if err != nil {
return err
return nil, err
}
s.platform = p
s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() })
return nil
return &shimTaskManager{
TaskService: ts,
id: id,
manager: manager.NewShimManager("runc"),
}, nil
}

View File

@ -48,21 +48,37 @@ type Publisher interface {
// StartOpts describes shim start configuration received from containerd
type StartOpts struct {
ID string
ID string // TODO(2.0): Remove ID, passed directly to start for call symmetry
ContainerdBinary string
Address string
TTRPCAddress string
}
type StopStatus struct {
Pid int
ExitStatus int
ExitedAt time.Time
}
// Init func for the creation of a shim server
// TODO(2.0): Remove init function
type Init func(context.Context, string, Publisher, func()) (Shim, error)
// Shim server interface
// TODO(2.0): Remove unified shim interface
type Shim interface {
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) // TODO(2.0): Update interface to pass ID directly to Cleanup
shimapi.TaskService
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
StartShim(ctx context.Context, opts StartOpts) (string, error)
}
// Manager is the interface which manages the shim process
type Manager interface {
Name() string
Start(ctx context.Context, id string, opts StartOpts) (string, error)
Stop(ctx context.Context, id string) (StopStatus, error)
}
// OptsKey is the context key for the Opts value.
type OptsKey struct{}
@ -90,18 +106,18 @@ type ttrpcService interface {
}
type taskService struct {
local shimapi.TaskService
shimapi.TaskService
}
func (t *taskService) RegisterTTRPC(server *ttrpc.Server) error {
shimapi.RegisterTaskService(server, t.local)
func (t taskService) RegisterTTRPC(server *ttrpc.Server) error {
shimapi.RegisterTaskService(server, t.TaskService)
return nil
}
var (
debugFlag bool
versionFlag bool
idFlag string
id string
namespaceFlag string
socketFlag string
bundlePath string
@ -118,7 +134,7 @@ func parseFlags() {
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
flag.BoolVar(&versionFlag, "v", false, "show the shim version and exit")
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
flag.StringVar(&idFlag, "id", "", "id of the task")
flag.StringVar(&id, "id", "", "id of the task")
flag.StringVar(&socketFlag, "socket", "", "socket path to serve")
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
@ -143,37 +159,85 @@ func setRuntime() {
}
}
func setLogger(ctx context.Context, id string) error {
logrus.SetFormatter(&logrus.TextFormatter{
func setLogger(ctx context.Context, id string) (context.Context, error) {
l := log.G(ctx)
l.Logger.SetFormatter(&logrus.TextFormatter{
TimestampFormat: log.RFC3339NanoFixed,
FullTimestamp: true,
})
if debugFlag {
logrus.SetLevel(logrus.DebugLevel)
l.Logger.SetLevel(logrus.DebugLevel)
}
f, err := openLog(ctx, id)
if err != nil {
return err
return ctx, err
}
logrus.SetOutput(f)
return nil
l.Logger.SetOutput(f)
return log.WithLogger(ctx, l), nil
}
// Run initializes and runs a shim server
// TODO(2.0): Remove initFunc from arguments
func Run(id string, initFunc Init, opts ...BinaryOpts) {
// TODO(2.0): Remove function
func Run(name string, initFunc Init, opts ...BinaryOpts) {
var config Config
for _, o := range opts {
o(&config)
}
if err := run(id, initFunc, config); err != nil {
fmt.Fprintf(os.Stderr, "%s: %s\n", id, err)
ctx := context.Background()
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", name))
if err := run(ctx, nil, initFunc, name, config); err != nil {
fmt.Fprintf(os.Stderr, "%s: %s", name, err)
os.Exit(1)
}
}
func run(id string, initFunc Init, config Config) error {
// TODO(2.0): Remove this type
type shimToManager struct {
shim Shim
name string
}
func (stm shimToManager) Name() string {
return stm.name
}
func (stm shimToManager) Start(ctx context.Context, id string, opts StartOpts) (string, error) {
opts.ID = id
return stm.shim.StartShim(ctx, opts)
}
func (stm shimToManager) Stop(ctx context.Context, id string) (StopStatus, error) {
// shim must already have id
dr, err := stm.shim.Cleanup(ctx)
if err != nil {
return StopStatus{}, err
}
return StopStatus{
Pid: int(dr.Pid),
ExitStatus: int(dr.ExitStatus),
ExitedAt: dr.ExitedAt,
}, nil
}
// RunManager initialzes and runs a shim server
// TODO(2.0): Rename to Run
func RunManager(ctx context.Context, manager Manager, opts ...BinaryOpts) {
var config Config
for _, o := range opts {
o(&config)
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", manager.Name()))
if err := run(ctx, manager, nil, "", config); err != nil {
fmt.Fprintf(os.Stderr, "%s: %s", manager.Name(), err)
os.Exit(1)
}
}
func run(ctx context.Context, manager Manager, initFunc Init, name string, config Config) error {
parseFlags()
if versionFlag {
fmt.Printf("%s:\n", os.Args[0])
@ -208,29 +272,49 @@ func run(id string, initFunc Init, config Config) error {
}
defer publisher.Close()
ctx := namespaces.WithNamespace(context.Background(), namespaceFlag)
ctx = namespaces.WithNamespace(ctx, namespaceFlag)
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id))
ctx, sd := shutdown.WithShutdown(ctx)
defer sd.Shutdown()
service, err := initFunc(ctx, idFlag, publisher, sd.Shutdown)
if manager == nil {
service, err := initFunc(ctx, id, publisher, sd.Shutdown)
if err != nil {
return err
}
plugin.Register(&plugin.Registration{
Type: plugin.TTRPCPlugin,
ID: "task",
Requires: []plugin.Type{
plugin.EventPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return taskService{service}, nil
},
})
manager = shimToManager{
shim: service,
name: name,
}
}
// Handle explicit actions
switch action {
case "delete":
logger := logrus.WithFields(logrus.Fields{
logger := log.G(ctx).WithFields(logrus.Fields{
"pid": os.Getpid(),
"namespace": namespaceFlag,
})
go handleSignals(ctx, logger, signals)
response, err := service.Cleanup(ctx)
ss, err := manager.Stop(ctx, id)
if err != nil {
return err
}
data, err := proto.Marshal(response)
data, err := proto.Marshal(&shimapi.DeleteResponse{
Pid: uint32(ss.Pid),
ExitStatus: uint32(ss.ExitStatus),
ExitedAt: ss.ExitedAt,
})
if err != nil {
return err
}
@ -240,13 +324,12 @@ func run(id string, initFunc Init, config Config) error {
return nil
case "start":
opts := StartOpts{
ID: idFlag,
ContainerdBinary: containerdBinaryFlag,
Address: addressFlag,
TTRPCAddress: ttrpcAddress,
}
address, err := service.StartShim(ctx, opts)
address, err := manager.Start(ctx, id, opts)
if err != nil {
return err
}
@ -257,7 +340,8 @@ func run(id string, initFunc Init, config Config) error {
}
if !config.NoSetupLogger {
if err := setLogger(ctx, idFlag); err != nil {
ctx, err = setLogger(ctx, id)
if err != nil {
return err
}
}
@ -279,20 +363,6 @@ func run(id string, initFunc Init, config Config) error {
},
})
// If service is an implementation of the task service, register it as a plugin
if ts, ok := service.(shimapi.TaskService); ok {
plugin.Register(&plugin.Registration{
Type: plugin.TTRPCPlugin,
ID: "task",
Requires: []plugin.Type{
plugin.EventPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return &taskService{ts}, nil
},
})
}
var (
initialized = plugin.NewPluginSet()
ttrpcServices = []ttrpcService{}
@ -397,10 +467,10 @@ func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal) er
defer l.Close()
if err := server.Serve(ctx, l); err != nil &&
!strings.Contains(err.Error(), "use of closed network connection") {
logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure")
log.G(ctx).WithError(err).Fatal("containerd-shim: ttrpc server failure")
}
}()
logger := logrus.WithFields(logrus.Fields{
logger := log.G(ctx).WithFields(logrus.Fields{
"pid": os.Getpid(),
"path": path,
"namespace": namespaceFlag,