
Could issues where when exec processes fail the wait block is not released. Second, you could not dump stacks if the reaper loop locks up. Third, the publisher was not waiting on the correct pid. Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
514 lines
13 KiB
Go
514 lines
13 KiB
Go
// +build !windows
|
|
|
|
package shim
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
|
|
"github.com/containerd/console"
|
|
eventstypes "github.com/containerd/containerd/api/events"
|
|
"github.com/containerd/containerd/api/types/task"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/events"
|
|
"github.com/containerd/containerd/linux/proc"
|
|
"github.com/containerd/containerd/linux/runctypes"
|
|
shimapi "github.com/containerd/containerd/linux/shim/v1"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/namespaces"
|
|
"github.com/containerd/containerd/reaper"
|
|
"github.com/containerd/containerd/runtime"
|
|
runc "github.com/containerd/go-runc"
|
|
"github.com/containerd/typeurl"
|
|
ptypes "github.com/gogo/protobuf/types"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
var empty = &ptypes.Empty{}
|
|
|
|
// Config contains shim specific configuration
|
|
type Config struct {
|
|
Path string
|
|
Namespace string
|
|
WorkDir string
|
|
Criu string
|
|
RuntimeRoot string
|
|
SystemdCgroup bool
|
|
}
|
|
|
|
// NewService returns a new shim service that can be used via GRPC
|
|
func NewService(config Config, publisher events.Publisher) (*Service, error) {
|
|
if config.Namespace == "" {
|
|
return nil, fmt.Errorf("shim namespace cannot be empty")
|
|
}
|
|
ctx := namespaces.WithNamespace(context.Background(), config.Namespace)
|
|
ctx = log.WithLogger(ctx, logrus.WithFields(logrus.Fields{
|
|
"namespace": config.Namespace,
|
|
"path": config.Path,
|
|
"pid": os.Getpid(),
|
|
}))
|
|
s := &Service{
|
|
config: config,
|
|
context: ctx,
|
|
processes: make(map[string]proc.Process),
|
|
events: make(chan interface{}, 128),
|
|
ec: reaper.Default.Subscribe(),
|
|
}
|
|
go s.processExits()
|
|
if err := s.initPlatform(); err != nil {
|
|
return nil, errors.Wrap(err, "failed to initialized platform behavior")
|
|
}
|
|
go s.forward(publisher)
|
|
return s, nil
|
|
}
|
|
|
|
// Service is the shim implementation of a remote shim over GRPC
|
|
type Service struct {
|
|
mu sync.Mutex
|
|
|
|
config Config
|
|
context context.Context
|
|
processes map[string]proc.Process
|
|
events chan interface{}
|
|
platform proc.Platform
|
|
ec chan runc.Exit
|
|
|
|
// Filled by Create()
|
|
id string
|
|
bundle string
|
|
}
|
|
|
|
// Create a new initial process and container with the underlying OCI runtime
|
|
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
process, err := proc.New(
|
|
ctx,
|
|
s.config.Path,
|
|
s.config.WorkDir,
|
|
s.config.RuntimeRoot,
|
|
s.config.Namespace,
|
|
s.config.Criu,
|
|
s.config.SystemdCgroup,
|
|
s.platform,
|
|
&proc.CreateConfig{
|
|
ID: r.ID,
|
|
Bundle: r.Bundle,
|
|
Runtime: r.Runtime,
|
|
Rootfs: r.Rootfs,
|
|
Terminal: r.Terminal,
|
|
Stdin: r.Stdin,
|
|
Stdout: r.Stdout,
|
|
Stderr: r.Stderr,
|
|
Checkpoint: r.Checkpoint,
|
|
ParentCheckpoint: r.ParentCheckpoint,
|
|
Options: r.Options,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
// save the main task id and bundle to the shim for additional requests
|
|
s.id = r.ID
|
|
s.bundle = r.Bundle
|
|
pid := process.Pid()
|
|
s.processes[r.ID] = process
|
|
return &shimapi.CreateTaskResponse{
|
|
Pid: uint32(pid),
|
|
}, nil
|
|
}
|
|
|
|
// Start a process
|
|
func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
p := s.processes[r.ID]
|
|
if p == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", r.ID)
|
|
}
|
|
if err := p.Start(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
return &shimapi.StartResponse{
|
|
ID: p.ID(),
|
|
Pid: uint32(p.Pid()),
|
|
}, nil
|
|
}
|
|
|
|
// Delete the initial process and container
|
|
func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteResponse, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
p := s.processes[s.id]
|
|
if p == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
|
}
|
|
if err := p.Delete(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
delete(s.processes, s.id)
|
|
s.platform.Close()
|
|
return &shimapi.DeleteResponse{
|
|
ExitStatus: uint32(p.ExitStatus()),
|
|
ExitedAt: p.ExitedAt(),
|
|
Pid: uint32(p.Pid()),
|
|
}, nil
|
|
}
|
|
|
|
// DeleteProcess deletes an exec'd process
|
|
func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if r.ID == s.id {
|
|
return nil, status.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess")
|
|
}
|
|
p := s.processes[r.ID]
|
|
if p == nil {
|
|
return nil, errors.Wrapf(errdefs.ErrNotFound, "process %s", r.ID)
|
|
}
|
|
if err := p.Delete(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
delete(s.processes, r.ID)
|
|
return &shimapi.DeleteResponse{
|
|
ExitStatus: uint32(p.ExitStatus()),
|
|
ExitedAt: p.ExitedAt(),
|
|
Pid: uint32(p.Pid()),
|
|
}, nil
|
|
}
|
|
|
|
// Exec an additional process inside the container
|
|
func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*ptypes.Empty, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if p := s.processes[r.ID]; p != nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ID)
|
|
}
|
|
|
|
p := s.processes[s.id]
|
|
if p == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
|
}
|
|
|
|
process, err := p.(*proc.Init).Exec(ctx, s.config.Path, &proc.ExecConfig{
|
|
ID: r.ID,
|
|
Terminal: r.Terminal,
|
|
Stdin: r.Stdin,
|
|
Stdout: r.Stdout,
|
|
Stderr: r.Stderr,
|
|
Spec: r.Spec,
|
|
})
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
s.processes[r.ID] = process
|
|
return empty, nil
|
|
}
|
|
|
|
// ResizePty of a process
|
|
func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*ptypes.Empty, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if r.ID == "" {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided")
|
|
}
|
|
ws := console.WinSize{
|
|
Width: uint16(r.Width),
|
|
Height: uint16(r.Height),
|
|
}
|
|
p := s.processes[r.ID]
|
|
if p == nil {
|
|
return nil, errors.Errorf("process does not exist %s", r.ID)
|
|
}
|
|
if err := p.Resize(ws); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// State returns runtime state information for a process
|
|
func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
p := s.processes[r.ID]
|
|
if p == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID)
|
|
}
|
|
st, err := p.Status(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
status := task.StatusUnknown
|
|
switch st {
|
|
case "created":
|
|
status = task.StatusCreated
|
|
case "running":
|
|
status = task.StatusRunning
|
|
case "stopped":
|
|
status = task.StatusStopped
|
|
case "paused":
|
|
status = task.StatusPaused
|
|
case "pausing":
|
|
status = task.StatusPausing
|
|
}
|
|
sio := p.Stdio()
|
|
return &shimapi.StateResponse{
|
|
ID: p.ID(),
|
|
Bundle: s.bundle,
|
|
Pid: uint32(p.Pid()),
|
|
Status: status,
|
|
Stdin: sio.Stdin,
|
|
Stdout: sio.Stdout,
|
|
Stderr: sio.Stderr,
|
|
Terminal: sio.Terminal,
|
|
ExitStatus: uint32(p.ExitStatus()),
|
|
ExitedAt: p.ExitedAt(),
|
|
}, nil
|
|
}
|
|
|
|
// Pause the container
|
|
func (s *Service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
p := s.processes[s.id]
|
|
if p == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
|
}
|
|
if err := p.(*proc.Init).Pause(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// Resume the container
|
|
func (s *Service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
p := s.processes[s.id]
|
|
if p == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
|
}
|
|
if err := p.(*proc.Init).Resume(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// Kill a process with the provided signal
|
|
func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Empty, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if r.ID == "" {
|
|
p := s.processes[s.id]
|
|
if p == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
|
}
|
|
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
p := s.processes[r.ID]
|
|
if p == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID)
|
|
}
|
|
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// ListPids returns all pids inside the container
|
|
func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*shimapi.ListPidsResponse, error) {
|
|
pids, err := s.getContainerPids(ctx, r.ID)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
var processes []*task.ProcessInfo
|
|
for _, pid := range pids {
|
|
pInfo := task.ProcessInfo{
|
|
Pid: pid,
|
|
}
|
|
for _, p := range s.processes {
|
|
if p.Pid() == int(pid) {
|
|
d := &runctypes.ProcessDetails{
|
|
ExecID: p.ID(),
|
|
}
|
|
a, err := typeurl.MarshalAny(d)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
|
|
}
|
|
pInfo.Info = a
|
|
break
|
|
}
|
|
}
|
|
processes = append(processes, &pInfo)
|
|
}
|
|
return &shimapi.ListPidsResponse{
|
|
Processes: processes,
|
|
}, nil
|
|
}
|
|
|
|
// CloseIO of a process
|
|
func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptypes.Empty, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
p := s.processes[r.ID]
|
|
if p == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", r.ID)
|
|
}
|
|
if stdin := p.Stdin(); stdin != nil {
|
|
if err := stdin.Close(); err != nil {
|
|
return nil, errors.Wrap(err, "close stdin")
|
|
}
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// Checkpoint the container
|
|
func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*ptypes.Empty, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
p := s.processes[s.id]
|
|
if p == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
|
}
|
|
if err := p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{
|
|
Path: r.Path,
|
|
Options: r.Options,
|
|
}); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// ShimInfo returns shim information such as the shim's pid
|
|
func (s *Service) ShimInfo(ctx context.Context, r *ptypes.Empty) (*shimapi.ShimInfoResponse, error) {
|
|
return &shimapi.ShimInfoResponse{
|
|
ShimPid: uint32(os.Getpid()),
|
|
}, nil
|
|
}
|
|
|
|
// Update a running container
|
|
func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*ptypes.Empty, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
p := s.processes[s.id]
|
|
if p == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
|
}
|
|
if err := p.(*proc.Init).Update(ctx, r.Resources); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// Wait for a process to exit
|
|
func (s *Service) Wait(ctx context.Context, r *shimapi.WaitRequest) (*shimapi.WaitResponse, error) {
|
|
s.mu.Lock()
|
|
p := s.processes[r.ID]
|
|
s.mu.Unlock()
|
|
if p == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
|
}
|
|
p.Wait()
|
|
|
|
return &shimapi.WaitResponse{
|
|
ExitStatus: uint32(p.ExitStatus()),
|
|
ExitedAt: p.ExitedAt(),
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) processExits() {
|
|
for e := range s.ec {
|
|
s.checkProcesses(e)
|
|
}
|
|
}
|
|
|
|
func (s *Service) checkProcesses(e runc.Exit) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
for _, p := range s.processes {
|
|
if p.Pid() == e.Pid {
|
|
if ip, ok := p.(*proc.Init); ok {
|
|
// Ensure all children are killed
|
|
if err := ip.KillAll(s.context); err != nil {
|
|
log.G(s.context).WithError(err).WithField("id", ip.ID()).
|
|
Error("failed to kill init's children")
|
|
}
|
|
}
|
|
p.SetExited(e.Status)
|
|
s.events <- &eventstypes.TaskExit{
|
|
ContainerID: s.id,
|
|
ID: p.ID(),
|
|
Pid: uint32(e.Pid),
|
|
ExitStatus: uint32(e.Status),
|
|
ExitedAt: p.ExitedAt(),
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
p := s.processes[s.id]
|
|
if p == nil {
|
|
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
|
|
}
|
|
|
|
ps, err := p.(*proc.Init).Runtime().Ps(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pids := make([]uint32, 0, len(ps))
|
|
for _, pid := range ps {
|
|
pids = append(pids, uint32(pid))
|
|
}
|
|
return pids, nil
|
|
}
|
|
|
|
func (s *Service) forward(publisher events.Publisher) {
|
|
for e := range s.events {
|
|
if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil {
|
|
log.G(s.context).WithError(err).Error("post event")
|
|
}
|
|
}
|
|
}
|
|
|
|
func getTopic(ctx context.Context, e interface{}) string {
|
|
switch e.(type) {
|
|
case *eventstypes.TaskCreate:
|
|
return runtime.TaskCreateEventTopic
|
|
case *eventstypes.TaskStart:
|
|
return runtime.TaskStartEventTopic
|
|
case *eventstypes.TaskOOM:
|
|
return runtime.TaskOOMEventTopic
|
|
case *eventstypes.TaskExit:
|
|
return runtime.TaskExitEventTopic
|
|
case *eventstypes.TaskDelete:
|
|
return runtime.TaskDeleteEventTopic
|
|
case *eventstypes.TaskExecAdded:
|
|
return runtime.TaskExecAddedEventTopic
|
|
case *eventstypes.TaskExecStarted:
|
|
return runtime.TaskExecStartedEventTopic
|
|
case *eventstypes.TaskPaused:
|
|
return runtime.TaskPausedEventTopic
|
|
case *eventstypes.TaskResumed:
|
|
return runtime.TaskResumedEventTopic
|
|
case *eventstypes.TaskCheckpointed:
|
|
return runtime.TaskCheckpointedEventTopic
|
|
default:
|
|
logrus.Warnf("no topic for type %#v", e)
|
|
}
|
|
return runtime.TaskUnknownTopic
|
|
}
|