620 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			620 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
//go:build linux
 | 
						|
// +build linux
 | 
						|
 | 
						|
/*
 | 
						|
   Copyright The containerd Authors.
 | 
						|
 | 
						|
   Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
   you may not use this file except in compliance with the License.
 | 
						|
   You may obtain a copy of the License at
 | 
						|
 | 
						|
       http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
   Unless required by applicable law or agreed to in writing, software
 | 
						|
   distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
   See the License for the specific language governing permissions and
 | 
						|
   limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package task
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"os"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/containerd/cgroups"
 | 
						|
	cgroupsv2 "github.com/containerd/cgroups/v2"
 | 
						|
	eventstypes "github.com/containerd/containerd/api/events"
 | 
						|
	"github.com/containerd/containerd/api/types/task"
 | 
						|
	"github.com/containerd/containerd/errdefs"
 | 
						|
	"github.com/containerd/containerd/namespaces"
 | 
						|
	"github.com/containerd/containerd/pkg/oom"
 | 
						|
	oomv1 "github.com/containerd/containerd/pkg/oom/v1"
 | 
						|
	oomv2 "github.com/containerd/containerd/pkg/oom/v2"
 | 
						|
	"github.com/containerd/containerd/pkg/process"
 | 
						|
	"github.com/containerd/containerd/pkg/shutdown"
 | 
						|
	"github.com/containerd/containerd/pkg/stdio"
 | 
						|
	"github.com/containerd/containerd/pkg/userns"
 | 
						|
	"github.com/containerd/containerd/runtime/v2/runc"
 | 
						|
	"github.com/containerd/containerd/runtime/v2/runc/options"
 | 
						|
	"github.com/containerd/containerd/runtime/v2/shim"
 | 
						|
	shimapi "github.com/containerd/containerd/runtime/v2/task"
 | 
						|
	taskAPI "github.com/containerd/containerd/runtime/v2/task"
 | 
						|
	"github.com/containerd/containerd/sys/reaper"
 | 
						|
	runcC "github.com/containerd/go-runc"
 | 
						|
	"github.com/containerd/ttrpc"
 | 
						|
	"github.com/containerd/typeurl"
 | 
						|
	ptypes "github.com/gogo/protobuf/types"
 | 
						|
	"github.com/pkg/errors"
 | 
						|
	"github.com/sirupsen/logrus"
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	_     = (taskAPI.TaskService)(&service{})
 | 
						|
	empty = &ptypes.Empty{}
 | 
						|
)
 | 
						|
 | 
						|
// NewTaskService creates a new instance of a task service
 | 
						|
func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) {
 | 
						|
	var (
 | 
						|
		ep  oom.Watcher
 | 
						|
		err error
 | 
						|
	)
 | 
						|
	if cgroups.Mode() == cgroups.Unified {
 | 
						|
		ep, err = oomv2.New(publisher)
 | 
						|
	} else {
 | 
						|
		ep, err = oomv1.New(publisher)
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	go ep.Run(ctx)
 | 
						|
	s := &service{
 | 
						|
		context:    ctx,
 | 
						|
		events:     make(chan interface{}, 128),
 | 
						|
		ec:         reaper.Default.Subscribe(),
 | 
						|
		ep:         ep,
 | 
						|
		shutdown:   sd,
 | 
						|
		containers: make(map[string]*runc.Container),
 | 
						|
	}
 | 
						|
	go s.processExits()
 | 
						|
	runcC.Monitor = reaper.Default
 | 
						|
	if err := s.initPlatform(); err != nil {
 | 
						|
		return nil, errors.Wrap(err, "failed to initialized platform behavior")
 | 
						|
	}
 | 
						|
	go s.forward(ctx, publisher)
 | 
						|
	sd.RegisterCallback(func(context.Context) error {
 | 
						|
		close(s.events)
 | 
						|
		return nil
 | 
						|
	})
 | 
						|
 | 
						|
	if address, err := shim.ReadAddress("address"); err == nil {
 | 
						|
		sd.RegisterCallback(func(context.Context) error {
 | 
						|
			return shim.RemoveSocket(address)
 | 
						|
		})
 | 
						|
	}
 | 
						|
	return s, nil
 | 
						|
}
 | 
						|
 | 
						|
// service is the shim implementation of a remote shim over GRPC
 | 
						|
type service struct {
 | 
						|
	mu          sync.Mutex
 | 
						|
	eventSendMu sync.Mutex
 | 
						|
 | 
						|
	context  context.Context
 | 
						|
	events   chan interface{}
 | 
						|
	platform stdio.Platform
 | 
						|
	ec       chan runcC.Exit
 | 
						|
	ep       oom.Watcher
 | 
						|
 | 
						|
	containers map[string]*runc.Container
 | 
						|
 | 
						|
	shutdown shutdown.Service
 | 
						|
}
 | 
						|
 | 
						|
// Create a new initial process and container with the underlying OCI runtime
 | 
						|
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
 | 
						|
	s.mu.Lock()
 | 
						|
	defer s.mu.Unlock()
 | 
						|
 | 
						|
	container, err := runc.NewContainer(ctx, s.platform, r)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	s.containers[r.ID] = container
 | 
						|
 | 
						|
	s.send(&eventstypes.TaskCreate{
 | 
						|
		ContainerID: r.ID,
 | 
						|
		Bundle:      r.Bundle,
 | 
						|
		Rootfs:      r.Rootfs,
 | 
						|
		IO: &eventstypes.TaskIO{
 | 
						|
			Stdin:    r.Stdin,
 | 
						|
			Stdout:   r.Stdout,
 | 
						|
			Stderr:   r.Stderr,
 | 
						|
			Terminal: r.Terminal,
 | 
						|
		},
 | 
						|
		Checkpoint: r.Checkpoint,
 | 
						|
		Pid:        uint32(container.Pid()),
 | 
						|
	})
 | 
						|
 | 
						|
	return &taskAPI.CreateTaskResponse{
 | 
						|
		Pid: uint32(container.Pid()),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *service) RegisterTTRPC(server *ttrpc.Server) error {
 | 
						|
	shimapi.RegisterTaskService(server, s)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Start a process
 | 
						|
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// hold the send lock so that the start events are sent before any exit events in the error case
 | 
						|
	s.eventSendMu.Lock()
 | 
						|
	p, err := container.Start(ctx, r)
 | 
						|
	if err != nil {
 | 
						|
		s.eventSendMu.Unlock()
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
 | 
						|
	switch r.ExecID {
 | 
						|
	case "":
 | 
						|
		switch cg := container.Cgroup().(type) {
 | 
						|
		case cgroups.Cgroup:
 | 
						|
			if err := s.ep.Add(container.ID, cg); err != nil {
 | 
						|
				logrus.WithError(err).Error("add cg to OOM monitor")
 | 
						|
			}
 | 
						|
		case *cgroupsv2.Manager:
 | 
						|
			allControllers, err := cg.RootControllers()
 | 
						|
			if err != nil {
 | 
						|
				logrus.WithError(err).Error("failed to get root controllers")
 | 
						|
			} else {
 | 
						|
				if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil {
 | 
						|
					if userns.RunningInUserNS() {
 | 
						|
						logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers)
 | 
						|
					} else {
 | 
						|
						logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers)
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
			if err := s.ep.Add(container.ID, cg); err != nil {
 | 
						|
				logrus.WithError(err).Error("add cg to OOM monitor")
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		s.send(&eventstypes.TaskStart{
 | 
						|
			ContainerID: container.ID,
 | 
						|
			Pid:         uint32(p.Pid()),
 | 
						|
		})
 | 
						|
	default:
 | 
						|
		s.send(&eventstypes.TaskExecStarted{
 | 
						|
			ContainerID: container.ID,
 | 
						|
			ExecID:      r.ExecID,
 | 
						|
			Pid:         uint32(p.Pid()),
 | 
						|
		})
 | 
						|
	}
 | 
						|
	s.eventSendMu.Unlock()
 | 
						|
	return &taskAPI.StartResponse{
 | 
						|
		Pid: uint32(p.Pid()),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// Delete the initial process and container
 | 
						|
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	p, err := container.Delete(ctx, r)
 | 
						|
	if err != nil {
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
	// if we deleted an init task, send the task delete event
 | 
						|
	if r.ExecID == "" {
 | 
						|
		s.mu.Lock()
 | 
						|
		delete(s.containers, r.ID)
 | 
						|
		s.mu.Unlock()
 | 
						|
		s.send(&eventstypes.TaskDelete{
 | 
						|
			ContainerID: container.ID,
 | 
						|
			Pid:         uint32(p.Pid()),
 | 
						|
			ExitStatus:  uint32(p.ExitStatus()),
 | 
						|
			ExitedAt:    p.ExitedAt(),
 | 
						|
		})
 | 
						|
	}
 | 
						|
	return &taskAPI.DeleteResponse{
 | 
						|
		ExitStatus: uint32(p.ExitStatus()),
 | 
						|
		ExitedAt:   p.ExitedAt(),
 | 
						|
		Pid:        uint32(p.Pid()),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// Exec an additional process inside the container
 | 
						|
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	ok, cancel := container.ReserveProcess(r.ExecID)
 | 
						|
	if !ok {
 | 
						|
		return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
 | 
						|
	}
 | 
						|
	process, err := container.Exec(ctx, r)
 | 
						|
	if err != nil {
 | 
						|
		cancel()
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
 | 
						|
	s.send(&eventstypes.TaskExecAdded{
 | 
						|
		ContainerID: container.ID,
 | 
						|
		ExecID:      process.ID(),
 | 
						|
	})
 | 
						|
	return empty, nil
 | 
						|
}
 | 
						|
 | 
						|
// ResizePty of a process
 | 
						|
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if err := container.ResizePty(ctx, r); err != nil {
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
	return empty, nil
 | 
						|
}
 | 
						|
 | 
						|
// State returns runtime state information for a process
 | 
						|
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	p, err := container.Process(r.ExecID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
	st, err := p.Status(ctx)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	status := task.StatusUnknown
 | 
						|
	switch st {
 | 
						|
	case "created":
 | 
						|
		status = task.StatusCreated
 | 
						|
	case "running":
 | 
						|
		status = task.StatusRunning
 | 
						|
	case "stopped":
 | 
						|
		status = task.StatusStopped
 | 
						|
	case "paused":
 | 
						|
		status = task.StatusPaused
 | 
						|
	case "pausing":
 | 
						|
		status = task.StatusPausing
 | 
						|
	}
 | 
						|
	sio := p.Stdio()
 | 
						|
	return &taskAPI.StateResponse{
 | 
						|
		ID:         p.ID(),
 | 
						|
		Bundle:     container.Bundle,
 | 
						|
		Pid:        uint32(p.Pid()),
 | 
						|
		Status:     status,
 | 
						|
		Stdin:      sio.Stdin,
 | 
						|
		Stdout:     sio.Stdout,
 | 
						|
		Stderr:     sio.Stderr,
 | 
						|
		Terminal:   sio.Terminal,
 | 
						|
		ExitStatus: uint32(p.ExitStatus()),
 | 
						|
		ExitedAt:   p.ExitedAt(),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// Pause the container
 | 
						|
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if err := container.Pause(ctx); err != nil {
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
	s.send(&eventstypes.TaskPaused{
 | 
						|
		ContainerID: container.ID,
 | 
						|
	})
 | 
						|
	return empty, nil
 | 
						|
}
 | 
						|
 | 
						|
// Resume the container
 | 
						|
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if err := container.Resume(ctx); err != nil {
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
	s.send(&eventstypes.TaskResumed{
 | 
						|
		ContainerID: container.ID,
 | 
						|
	})
 | 
						|
	return empty, nil
 | 
						|
}
 | 
						|
 | 
						|
// Kill a process with the provided signal
 | 
						|
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if err := container.Kill(ctx, r); err != nil {
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
	return empty, nil
 | 
						|
}
 | 
						|
 | 
						|
// Pids returns all pids inside the container
 | 
						|
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	pids, err := s.getContainerPids(ctx, r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
	var processes []*task.ProcessInfo
 | 
						|
	for _, pid := range pids {
 | 
						|
		pInfo := task.ProcessInfo{
 | 
						|
			Pid: pid,
 | 
						|
		}
 | 
						|
		for _, p := range container.ExecdProcesses() {
 | 
						|
			if p.Pid() == int(pid) {
 | 
						|
				d := &options.ProcessDetails{
 | 
						|
					ExecID: p.ID(),
 | 
						|
				}
 | 
						|
				a, err := typeurl.MarshalAny(d)
 | 
						|
				if err != nil {
 | 
						|
					return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
 | 
						|
				}
 | 
						|
				pInfo.Info = a
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
		processes = append(processes, &pInfo)
 | 
						|
	}
 | 
						|
	return &taskAPI.PidsResponse{
 | 
						|
		Processes: processes,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// CloseIO of a process
 | 
						|
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if err := container.CloseIO(ctx, r); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return empty, nil
 | 
						|
}
 | 
						|
 | 
						|
// Checkpoint the container
 | 
						|
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if err := container.Checkpoint(ctx, r); err != nil {
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
	return empty, nil
 | 
						|
}
 | 
						|
 | 
						|
// Update a running container
 | 
						|
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if err := container.Update(ctx, r); err != nil {
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
	return empty, nil
 | 
						|
}
 | 
						|
 | 
						|
// Wait for a process to exit
 | 
						|
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	p, err := container.Process(r.ExecID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
	p.Wait()
 | 
						|
 | 
						|
	return &taskAPI.WaitResponse{
 | 
						|
		ExitStatus: uint32(p.ExitStatus()),
 | 
						|
		ExitedAt:   p.ExitedAt(),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// Connect returns shim information such as the shim's pid
 | 
						|
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
 | 
						|
	var pid int
 | 
						|
	if container, err := s.getContainer(r.ID); err == nil {
 | 
						|
		pid = container.Pid()
 | 
						|
	}
 | 
						|
	return &taskAPI.ConnectResponse{
 | 
						|
		ShimPid: uint32(os.Getpid()),
 | 
						|
		TaskPid: uint32(pid),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
 | 
						|
	s.mu.Lock()
 | 
						|
	defer s.mu.Unlock()
 | 
						|
 | 
						|
	// return out if the shim is still servicing containers
 | 
						|
	if len(s.containers) > 0 {
 | 
						|
		return empty, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// please make sure that temporary resource has been cleanup or registered
 | 
						|
	// for cleanup before calling shutdown
 | 
						|
	s.shutdown.Shutdown()
 | 
						|
 | 
						|
	return empty, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
 | 
						|
	container, err := s.getContainer(r.ID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	cgx := container.Cgroup()
 | 
						|
	if cgx == nil {
 | 
						|
		return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
 | 
						|
	}
 | 
						|
	var statsx interface{}
 | 
						|
	switch cg := cgx.(type) {
 | 
						|
	case cgroups.Cgroup:
 | 
						|
		stats, err := cg.Stat(cgroups.IgnoreNotExist)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		statsx = stats
 | 
						|
	case *cgroupsv2.Manager:
 | 
						|
		stats, err := cg.Stat()
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		statsx = stats
 | 
						|
	default:
 | 
						|
		return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg)
 | 
						|
	}
 | 
						|
	data, err := typeurl.MarshalAny(statsx)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return &taskAPI.StatsResponse{
 | 
						|
		Stats: data,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *service) processExits() {
 | 
						|
	for e := range s.ec {
 | 
						|
		s.checkProcesses(e)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *service) send(evt interface{}) {
 | 
						|
	s.events <- evt
 | 
						|
}
 | 
						|
 | 
						|
func (s *service) sendL(evt interface{}) {
 | 
						|
	s.eventSendMu.Lock()
 | 
						|
	s.events <- evt
 | 
						|
	s.eventSendMu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
func (s *service) checkProcesses(e runcC.Exit) {
 | 
						|
	s.mu.Lock()
 | 
						|
	defer s.mu.Unlock()
 | 
						|
 | 
						|
	for _, container := range s.containers {
 | 
						|
		if !container.HasPid(e.Pid) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		for _, p := range container.All() {
 | 
						|
			if p.Pid() != e.Pid {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			if ip, ok := p.(*process.Init); ok {
 | 
						|
				// Ensure all children are killed
 | 
						|
				if runc.ShouldKillAllOnExit(s.context, container.Bundle) {
 | 
						|
					if err := ip.KillAll(s.context); err != nil {
 | 
						|
						logrus.WithError(err).WithField("id", ip.ID()).
 | 
						|
							Error("failed to kill init's children")
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			p.SetExited(e.Status)
 | 
						|
			s.sendL(&eventstypes.TaskExit{
 | 
						|
				ContainerID: container.ID,
 | 
						|
				ID:          p.ID(),
 | 
						|
				Pid:         uint32(e.Pid),
 | 
						|
				ExitStatus:  uint32(e.Status),
 | 
						|
				ExitedAt:    p.ExitedAt(),
 | 
						|
			})
 | 
						|
			return
 | 
						|
		}
 | 
						|
		return
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
 | 
						|
	container, err := s.getContainer(id)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	p, err := container.Process("")
 | 
						|
	if err != nil {
 | 
						|
		return nil, errdefs.ToGRPC(err)
 | 
						|
	}
 | 
						|
	ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	pids := make([]uint32, 0, len(ps))
 | 
						|
	for _, pid := range ps {
 | 
						|
		pids = append(pids, uint32(pid))
 | 
						|
	}
 | 
						|
	return pids, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
 | 
						|
	ns, _ := namespaces.Namespace(ctx)
 | 
						|
	ctx = namespaces.WithNamespace(context.Background(), ns)
 | 
						|
	for e := range s.events {
 | 
						|
		err := publisher.Publish(ctx, runc.GetTopic(e), e)
 | 
						|
		if err != nil {
 | 
						|
			logrus.WithError(err).Error("post event")
 | 
						|
		}
 | 
						|
	}
 | 
						|
	publisher.Close()
 | 
						|
}
 | 
						|
 | 
						|
func (s *service) getContainer(id string) (*runc.Container, error) {
 | 
						|
	s.mu.Lock()
 | 
						|
	container := s.containers[id]
 | 
						|
	s.mu.Unlock()
 | 
						|
	if container == nil {
 | 
						|
		return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
 | 
						|
	}
 | 
						|
	return container, nil
 | 
						|
}
 | 
						|
 | 
						|
// initialize a single epoll fd to manage our consoles. `initPlatform` should
 | 
						|
// only be called once.
 | 
						|
func (s *service) initPlatform() error {
 | 
						|
	if s.platform != nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	p, err := runc.NewPlatform()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	s.platform = p
 | 
						|
	s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() })
 | 
						|
	return nil
 | 
						|
}
 |