containerd/services/tasks/local.go
Maksym Pavlenko 07c2ae12e1 Remove v1 runctypes
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
2023-03-15 09:18:16 -07:00

742 lines
19 KiB
Go

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tasks
import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"
api "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/pkg/blockio"
"github.com/containerd/containerd/pkg/rdt"
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/protobuf/proto"
ptypes "github.com/containerd/containerd/protobuf/types"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/services"
"github.com/containerd/typeurl/v2"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
_ = (api.TasksClient)(&local{})
empty = &ptypes.Empty{}
)
const (
stateTimeout = "io.containerd.timeout.task.state"
)
// Config for the tasks service plugin
type Config struct {
// BlockIOConfigFile specifies the path to blockio configuration file
BlockIOConfigFile string `toml:"blockio_config_file" json:"blockioConfigFile"`
// RdtConfigFile specifies the path to RDT configuration file
RdtConfigFile string `toml:"rdt_config_file" json:"rdtConfigFile"`
}
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.TasksService,
Requires: tasksServiceRequires,
Config: &Config{},
InitFn: initFunc,
})
timeout.Set(stateTimeout, 2*time.Second)
}
func initFunc(ic *plugin.InitContext) (interface{}, error) {
config := ic.Config.(*Config)
v2r, err := ic.GetByID(plugin.RuntimePluginV2, "task")
if err != nil {
return nil, err
}
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
ep, err := ic.Get(plugin.EventPlugin)
if err != nil {
return nil, err
}
monitor, err := ic.Get(plugin.TaskMonitorPlugin)
if err != nil {
if !errdefs.IsNotFound(err) {
return nil, err
}
monitor = runtime.NewNoopMonitor()
}
db := m.(*metadata.DB)
l := &local{
containers: metadata.NewContainerStore(db),
store: db.ContentStore(),
publisher: ep.(events.Publisher),
monitor: monitor.(runtime.TaskMonitor),
v2Runtime: v2r.(runtime.PlatformRuntime),
}
v2Tasks, err := l.v2Runtime.Tasks(ic.Context, true)
if err != nil {
return nil, err
}
for _, t := range v2Tasks {
l.monitor.Monitor(t, nil)
}
if err := blockio.SetConfig(config.BlockIOConfigFile); err != nil {
log.G(ic.Context).WithError(err).Errorf("blockio initialization failed")
}
if err := rdt.SetConfig(config.RdtConfigFile); err != nil {
log.G(ic.Context).WithError(err).Errorf("RDT initialization failed")
}
return l, nil
}
type local struct {
containers containers.Store
store content.Store
publisher events.Publisher
monitor runtime.TaskMonitor
v2Runtime runtime.PlatformRuntime
}
func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
container, err := l.getContainer(ctx, r.ContainerID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
checkpointPath, err := getRestorePath(container.Runtime.Name, r.Options)
if err != nil {
return nil, err
}
// jump get checkpointPath from checkpoint image
if checkpointPath == "" && r.Checkpoint != nil {
checkpointPath, err = os.MkdirTemp(os.Getenv("XDG_RUNTIME_DIR"), "ctrd-checkpoint")
if err != nil {
return nil, err
}
if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint {
return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType)
}
reader, err := l.store.ReaderAt(ctx, ocispec.Descriptor{
MediaType: r.Checkpoint.MediaType,
Digest: digest.Digest(r.Checkpoint.Digest),
Size: r.Checkpoint.Size,
Annotations: r.Checkpoint.Annotations,
})
if err != nil {
return nil, err
}
_, err = archive.Apply(ctx, checkpointPath, content.NewReader(reader))
reader.Close()
if err != nil {
return nil, err
}
}
opts := runtime.CreateOpts{
Spec: container.Spec,
IO: runtime.IO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: checkpointPath,
Runtime: container.Runtime.Name,
RuntimeOptions: container.Runtime.Options,
TaskOptions: r.Options,
SandboxID: container.SandboxID,
}
if r.RuntimePath != "" {
opts.Runtime = r.RuntimePath
}
for _, m := range r.Rootfs {
opts.Rootfs = append(opts.Rootfs, mount.Mount{
Type: m.Type,
Source: m.Source,
Target: m.Target,
Options: m.Options,
})
}
rtime := l.v2Runtime
_, err = rtime.Get(ctx, r.ContainerID)
if err != nil && !errdefs.IsNotFound(err) {
return nil, errdefs.ToGRPC(err)
}
if err == nil {
return nil, errdefs.ToGRPC(fmt.Errorf("task %s: %w", r.ContainerID, errdefs.ErrAlreadyExists))
}
c, err := rtime.Create(ctx, r.ContainerID, opts)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
labels := map[string]string{"runtime": container.Runtime.Name}
if err := l.monitor.Monitor(c, labels); err != nil {
return nil, fmt.Errorf("monitor task: %w", err)
}
pid, err := c.PID(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get task pid: %w", err)
}
return &api.CreateTaskResponse{
ContainerID: r.ContainerID,
Pid: pid,
}, nil
}
func (l *local) Start(ctx context.Context, r *api.StartRequest, _ ...grpc.CallOption) (*api.StartResponse, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
if err := p.Start(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
state, err := p.State(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.StartResponse{
Pid: state.Pid,
}, nil
}
func (l *local) Delete(ctx context.Context, r *api.DeleteTaskRequest, _ ...grpc.CallOption) (*api.DeleteResponse, error) {
container, err := l.getContainer(ctx, r.ContainerID)
if err != nil {
return nil, err
}
// Get task object
t, err := l.v2Runtime.Get(ctx, container.ID)
if err != nil {
return nil, status.Errorf(codes.NotFound, "task %v not found", container.ID)
}
if err := l.monitor.Stop(t); err != nil {
return nil, err
}
exit, err := l.v2Runtime.Delete(ctx, r.ContainerID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.DeleteResponse{
ExitStatus: exit.Status,
ExitedAt: protobuf.ToTimestamp(exit.Timestamp),
Pid: exit.Pid,
}, nil
}
func (l *local) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest, _ ...grpc.CallOption) (*api.DeleteResponse, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
process, err := t.Process(ctx, r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
exit, err := process.Delete(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.DeleteResponse{
ID: r.ExecID,
ExitStatus: exit.Status,
ExitedAt: protobuf.ToTimestamp(exit.Timestamp),
Pid: exit.Pid,
}, nil
}
func getProcessState(ctx context.Context, p runtime.Process) (*task.Process, error) {
ctx, cancel := timeout.WithContext(ctx, stateTimeout)
defer cancel()
state, err := p.State(ctx)
if err != nil {
if errdefs.IsNotFound(err) {
return nil, err
}
log.G(ctx).WithError(err).Errorf("get state for %s", p.ID())
}
status := task.Status_UNKNOWN
switch state.Status {
case runtime.CreatedStatus:
status = task.Status_CREATED
case runtime.RunningStatus:
status = task.Status_RUNNING
case runtime.StoppedStatus:
status = task.Status_STOPPED
case runtime.PausedStatus:
status = task.Status_PAUSED
case runtime.PausingStatus:
status = task.Status_PAUSING
default:
log.G(ctx).WithField("status", state.Status).Warn("unknown status")
}
return &task.Process{
ID: p.ID(),
Pid: state.Pid,
Status: status,
Stdin: state.Stdin,
Stdout: state.Stdout,
Stderr: state.Stderr,
Terminal: state.Terminal,
ExitStatus: state.ExitStatus,
ExitedAt: protobuf.ToTimestamp(state.ExitedAt),
}, nil
}
func (l *local) Get(ctx context.Context, r *api.GetRequest, _ ...grpc.CallOption) (*api.GetResponse, error) {
task, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(task)
if r.ExecID != "" {
if p, err = task.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
t, err := getProcessState(ctx, p)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.GetResponse{
Process: t,
}, nil
}
func (l *local) List(ctx context.Context, r *api.ListTasksRequest, _ ...grpc.CallOption) (*api.ListTasksResponse, error) {
resp := &api.ListTasksResponse{}
tasks, err := l.v2Runtime.Tasks(ctx, false)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
addTasks(ctx, resp, tasks)
return resp, nil
}
func addTasks(ctx context.Context, r *api.ListTasksResponse, tasks []runtime.Task) {
for _, t := range tasks {
tt, err := getProcessState(ctx, t)
if err != nil {
if !errdefs.IsNotFound(err) { // handle race with deletion
log.G(ctx).WithError(err).WithField("id", t.ID()).Error("converting task to protobuf")
}
continue
}
r.Tasks = append(r.Tasks, tt)
}
}
func (l *local) Pause(ctx context.Context, r *api.PauseTaskRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
err = t.Pause(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
func (l *local) Resume(ctx context.Context, r *api.ResumeTaskRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
err = t.Resume(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
func (l *local) Kill(ctx context.Context, r *api.KillRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
func (l *local) ListPids(ctx context.Context, r *api.ListPidsRequest, _ ...grpc.CallOption) (*api.ListPidsResponse, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
processList, err := t.Pids(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
var processes []*task.ProcessInfo
for _, p := range processList {
pInfo := task.ProcessInfo{
Pid: p.Pid,
}
if p.Info != nil {
a, err := protobuf.MarshalAnyToProto(p.Info)
if err != nil {
return nil, fmt.Errorf("failed to marshal process %d info: %w", p.Pid, err)
}
pInfo.Info = a
}
processes = append(processes, &pInfo)
}
return &api.ListPidsResponse{
Processes: processes,
}, nil
}
func (l *local) Exec(ctx context.Context, r *api.ExecProcessRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
if r.ExecID == "" {
return nil, status.Errorf(codes.InvalidArgument, "exec id cannot be empty")
}
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
if _, err := t.Exec(ctx, r.ExecID, runtime.ExecOpts{
Spec: r.Spec,
IO: runtime.IO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
}); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
func (l *local) ResizePty(ctx context.Context, r *api.ResizePtyRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
if err := p.ResizePty(ctx, runtime.ConsoleSize{
Width: r.Width,
Height: r.Height,
}); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
func (l *local) CloseIO(ctx context.Context, r *api.CloseIORequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
if r.Stdin {
if err := p.CloseIO(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
return empty, nil
}
func (l *local) Checkpoint(ctx context.Context, r *api.CheckpointTaskRequest, _ ...grpc.CallOption) (*api.CheckpointTaskResponse, error) {
container, err := l.getContainer(ctx, r.ContainerID)
if err != nil {
return nil, err
}
t, err := l.getTaskFromContainer(ctx, container)
if err != nil {
return nil, err
}
image, err := getCheckpointPath(container.Runtime.Name, r.Options)
if err != nil {
return nil, err
}
checkpointImageExists := false
if image == "" {
checkpointImageExists = true
image, err = os.MkdirTemp(os.Getenv("XDG_RUNTIME_DIR"), "ctrd-checkpoint")
if err != nil {
return nil, errdefs.ToGRPC(err)
}
defer os.RemoveAll(image)
}
if err := t.Checkpoint(ctx, image, r.Options); err != nil {
return nil, errdefs.ToGRPC(err)
}
// do not commit checkpoint image if checkpoint ImagePath is passed,
// return if checkpointImageExists is false
if !checkpointImageExists {
return &api.CheckpointTaskResponse{}, nil
}
// write checkpoint to the content store
tar := archive.Diff(ctx, "", image)
cp, err := l.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, image, tar)
// close tar first after write
if err := tar.Close(); err != nil {
return nil, err
}
if err != nil {
return nil, err
}
// write the config to the content store
pbany := protobuf.FromAny(container.Spec)
data, err := proto.Marshal(pbany)
if err != nil {
return nil, err
}
spec := bytes.NewReader(data)
specD, err := l.writeContent(ctx, images.MediaTypeContainerd1CheckpointConfig, filepath.Join(image, "spec"), spec)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.CheckpointTaskResponse{
Descriptors: []*types.Descriptor{
cp,
specD,
},
}, nil
}
func (l *local) Update(ctx context.Context, r *api.UpdateTaskRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
if err := t.Update(ctx, r.Resources, r.Annotations); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
func (l *local) Metrics(ctx context.Context, r *api.MetricsRequest, _ ...grpc.CallOption) (*api.MetricsResponse, error) {
filter, err := filters.ParseAll(r.Filters...)
if err != nil {
return nil, err
}
var resp api.MetricsResponse
tasks, err := l.v2Runtime.Tasks(ctx, false)
if err != nil {
return nil, err
}
getTasksMetrics(ctx, filter, tasks, &resp)
return &resp, nil
}
func (l *local) Wait(ctx context.Context, r *api.WaitRequest, _ ...grpc.CallOption) (*api.WaitResponse, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
exit, err := p.Wait(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.WaitResponse{
ExitStatus: exit.Status,
ExitedAt: protobuf.ToTimestamp(exit.Timestamp),
}, nil
}
func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime.Task, r *api.MetricsResponse) {
for _, tk := range tasks {
if !filter.Match(filters.AdapterFunc(func(fieldpath []string) (string, bool) {
t := tk
switch fieldpath[0] {
case "id":
return t.ID(), true
case "namespace":
return t.Namespace(), true
case "runtime":
// return t.Info().Runtime, true
}
return "", false
})) {
continue
}
collected := time.Now()
stats, err := tk.Stats(ctx)
if err != nil {
if !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Errorf("collecting metrics for %s", tk.ID())
}
continue
}
r.Metrics = append(r.Metrics, &types.Metric{
Timestamp: protobuf.ToTimestamp(collected),
ID: tk.ID(),
Data: stats,
})
}
}
func (l *local) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
writer, err := l.store.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{MediaType: mediaType}))
if err != nil {
return nil, err
}
defer writer.Close()
size, err := io.Copy(writer, r)
if err != nil {
return nil, err
}
if err := writer.Commit(ctx, 0, ""); err != nil {
return nil, err
}
return &types.Descriptor{
MediaType: mediaType,
Digest: writer.Digest().String(),
Size: size,
Annotations: make(map[string]string),
}, nil
}
func (l *local) getContainer(ctx context.Context, id string) (*containers.Container, error) {
var container containers.Container
container, err := l.containers.Get(ctx, id)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &container, nil
}
func (l *local) getTask(ctx context.Context, id string) (runtime.Task, error) {
container, err := l.getContainer(ctx, id)
if err != nil {
return nil, err
}
return l.getTaskFromContainer(ctx, container)
}
func (l *local) getTaskFromContainer(ctx context.Context, container *containers.Container) (runtime.Task, error) {
t, err := l.v2Runtime.Get(ctx, container.ID)
if err != nil {
return nil, status.Errorf(codes.NotFound, "task %v not found", container.ID)
}
return t, nil
}
// getCheckpointPath only suitable for runc runtime now
func getCheckpointPath(runtime string, option *ptypes.Any) (string, error) {
if option == nil {
return "", nil
}
var checkpointPath string
v, err := typeurl.UnmarshalAny(option)
if err != nil {
return "", err
}
opts, ok := v.(*options.CheckpointOptions)
if !ok {
return "", fmt.Errorf("invalid task checkpoint option for %s", runtime)
}
checkpointPath = opts.ImagePath
return checkpointPath, nil
}
// getRestorePath only suitable for runc runtime now
func getRestorePath(runtime string, option *ptypes.Any) (string, error) {
if option == nil {
return "", nil
}
var restorePath string
v, err := typeurl.UnmarshalAny(option)
if err != nil {
return "", err
}
opts, ok := v.(*options.Options)
if !ok {
return "", fmt.Errorf("invalid task create option for %s", runtime)
}
restorePath = opts.CriuImagePath
return restorePath, nil
}