ref: #1464 This tries to solve issues with races around process state. First it adds the process mutex around the state call so that any state changes, deletions, etc will be handled in order. Second, for IsNoExist errors from the runtime, return a stopped state if a process has been removed from the underlying OCI runtime but not from the shim yet. This shouldn't happen with the lock from above but its hare to verify this issue. Third, handle shim disconnections and return an ErrNotFound. Forth, don't abort returning all tasks if one task is unable to return its state. Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
577 lines
14 KiB
Go
577 lines
14 KiB
Go
package tasks
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/boltdb/bolt"
|
|
api "github.com/containerd/containerd/api/services/tasks/v1"
|
|
"github.com/containerd/containerd/api/types"
|
|
"github.com/containerd/containerd/api/types/task"
|
|
"github.com/containerd/containerd/archive"
|
|
"github.com/containerd/containerd/containers"
|
|
"github.com/containerd/containerd/content"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/events"
|
|
"github.com/containerd/containerd/filters"
|
|
"github.com/containerd/containerd/images"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/metadata"
|
|
"github.com/containerd/containerd/mount"
|
|
"github.com/containerd/containerd/plugin"
|
|
"github.com/containerd/containerd/runtime"
|
|
"github.com/containerd/containerd/typeurl"
|
|
google_protobuf "github.com/golang/protobuf/ptypes/empty"
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
)
|
|
|
|
var (
|
|
_ = (api.TasksServer)(&Service{})
|
|
empty = &google_protobuf.Empty{}
|
|
)
|
|
|
|
func init() {
|
|
plugin.Register(&plugin.Registration{
|
|
Type: plugin.GRPCPlugin,
|
|
ID: "tasks",
|
|
Requires: []plugin.PluginType{
|
|
plugin.RuntimePlugin,
|
|
plugin.MetadataPlugin,
|
|
plugin.ContentPlugin,
|
|
},
|
|
Init: New,
|
|
})
|
|
}
|
|
|
|
func New(ic *plugin.InitContext) (interface{}, error) {
|
|
rt, err := ic.GetAll(plugin.RuntimePlugin)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m, err := ic.Get(plugin.MetadataPlugin)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ct, err := ic.Get(plugin.ContentPlugin)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cs := metadata.NewContentStore(m.(*bolt.DB), ct.(content.Store))
|
|
runtimes := make(map[string]runtime.Runtime)
|
|
for _, rr := range rt {
|
|
r := rr.(runtime.Runtime)
|
|
runtimes[r.ID()] = r
|
|
}
|
|
return &Service{
|
|
runtimes: runtimes,
|
|
db: m.(*bolt.DB),
|
|
store: cs,
|
|
publisher: ic.Events,
|
|
}, nil
|
|
}
|
|
|
|
type Service struct {
|
|
runtimes map[string]runtime.Runtime
|
|
db *bolt.DB
|
|
store content.Store
|
|
publisher events.Publisher
|
|
}
|
|
|
|
func (s *Service) Register(server *grpc.Server) error {
|
|
api.RegisterTasksServer(server, s)
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) Create(ctx context.Context, r *api.CreateTaskRequest) (*api.CreateTaskResponse, error) {
|
|
var (
|
|
checkpointPath string
|
|
err error
|
|
)
|
|
if r.Checkpoint != nil {
|
|
checkpointPath, err = ioutil.TempDir("", "ctrd-checkpoint")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint {
|
|
return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType)
|
|
}
|
|
reader, err := s.store.ReaderAt(ctx, r.Checkpoint.Digest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = archive.Apply(ctx, checkpointPath, content.NewReader(reader))
|
|
reader.Close()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
container, err := s.getContainer(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
opts := runtime.CreateOpts{
|
|
Spec: container.Spec,
|
|
IO: runtime.IO{
|
|
Stdin: r.Stdin,
|
|
Stdout: r.Stdout,
|
|
Stderr: r.Stderr,
|
|
Terminal: r.Terminal,
|
|
},
|
|
Checkpoint: checkpointPath,
|
|
Options: r.Options,
|
|
}
|
|
for _, m := range r.Rootfs {
|
|
opts.Rootfs = append(opts.Rootfs, mount.Mount{
|
|
Type: m.Type,
|
|
Source: m.Source,
|
|
Options: m.Options,
|
|
})
|
|
}
|
|
runtime, err := s.getRuntime(container.Runtime.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c, err := runtime.Create(ctx, r.ContainerID, opts)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
state, err := c.State(ctx)
|
|
if err != nil {
|
|
log.G(ctx).Error(err)
|
|
}
|
|
|
|
return &api.CreateTaskResponse{
|
|
ContainerID: r.ContainerID,
|
|
Pid: state.Pid,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*api.StartResponse, error) {
|
|
t, err := s.getTask(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p := runtime.Process(t)
|
|
if r.ExecID != "" {
|
|
if p, err = t.Process(ctx, r.ExecID); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
}
|
|
if err := p.Start(ctx); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
state, err := p.State(ctx)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return &api.StartResponse{
|
|
Pid: state.Pid,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) Delete(ctx context.Context, r *api.DeleteTaskRequest) (*api.DeleteResponse, error) {
|
|
t, err := s.getTask(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
runtime, err := s.getRuntime(t.Info().Runtime)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
exit, err := runtime.Delete(ctx, t)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return &api.DeleteResponse{
|
|
ExitStatus: exit.Status,
|
|
ExitedAt: exit.Timestamp,
|
|
Pid: exit.Pid,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest) (*api.DeleteResponse, error) {
|
|
t, err := s.getTask(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
exit, err := t.DeleteProcess(ctx, r.ExecID)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return &api.DeleteResponse{
|
|
ID: r.ExecID,
|
|
ExitStatus: exit.Status,
|
|
ExitedAt: exit.Timestamp,
|
|
Pid: exit.Pid,
|
|
}, nil
|
|
}
|
|
|
|
func processFromContainerd(ctx context.Context, p runtime.Process) (*task.Process, error) {
|
|
state, err := p.State(ctx)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
var status task.Status
|
|
switch state.Status {
|
|
case runtime.CreatedStatus:
|
|
status = task.StatusCreated
|
|
case runtime.RunningStatus:
|
|
status = task.StatusRunning
|
|
case runtime.StoppedStatus:
|
|
status = task.StatusStopped
|
|
case runtime.PausedStatus:
|
|
status = task.StatusPaused
|
|
case runtime.PausingStatus:
|
|
status = task.StatusPausing
|
|
default:
|
|
log.G(ctx).WithField("status", state.Status).Warn("unknown status")
|
|
}
|
|
return &task.Process{
|
|
ID: p.ID(),
|
|
Pid: state.Pid,
|
|
Status: status,
|
|
Stdin: state.Stdin,
|
|
Stdout: state.Stdout,
|
|
Stderr: state.Stderr,
|
|
Terminal: state.Terminal,
|
|
ExitStatus: state.ExitStatus,
|
|
ExitedAt: state.ExitedAt,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) Get(ctx context.Context, r *api.GetRequest) (*api.GetResponse, error) {
|
|
task, err := s.getTask(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p := runtime.Process(task)
|
|
if r.ExecID != "" {
|
|
if p, err = task.Process(ctx, r.ExecID); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
}
|
|
t, err := processFromContainerd(ctx, p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &api.GetResponse{
|
|
Process: t,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) List(ctx context.Context, r *api.ListTasksRequest) (*api.ListTasksResponse, error) {
|
|
resp := &api.ListTasksResponse{}
|
|
for _, r := range s.runtimes {
|
|
tasks, err := r.Tasks(ctx)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
addTasks(ctx, resp, tasks)
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func addTasks(ctx context.Context, r *api.ListTasksResponse, tasks []runtime.Task) {
|
|
for _, t := range tasks {
|
|
tt, err := processFromContainerd(ctx, t)
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).WithField("id", t.ID()).Error("converting task to protobuf")
|
|
continue
|
|
}
|
|
r.Tasks = append(r.Tasks, tt)
|
|
}
|
|
}
|
|
|
|
func (s *Service) Pause(ctx context.Context, r *api.PauseTaskRequest) (*google_protobuf.Empty, error) {
|
|
t, err := s.getTask(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = t.Pause(ctx)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
func (s *Service) Resume(ctx context.Context, r *api.ResumeTaskRequest) (*google_protobuf.Empty, error) {
|
|
t, err := s.getTask(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = t.Resume(ctx)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobuf.Empty, error) {
|
|
t, err := s.getTask(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p := runtime.Process(t)
|
|
if r.ExecID != "" {
|
|
if p, err = t.Process(ctx, r.ExecID); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
}
|
|
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
func (s *Service) ListPids(ctx context.Context, r *api.ListPidsRequest) (*api.ListPidsResponse, error) {
|
|
t, err := s.getTask(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pids, err := t.Pids(ctx)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return &api.ListPidsResponse{
|
|
Pids: pids,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) Exec(ctx context.Context, r *api.ExecProcessRequest) (*google_protobuf.Empty, error) {
|
|
if r.ExecID == "" {
|
|
return nil, grpc.Errorf(codes.InvalidArgument, "exec id cannot be empty")
|
|
}
|
|
t, err := s.getTask(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if _, err := t.Exec(ctx, r.ExecID, runtime.ExecOpts{
|
|
Spec: r.Spec,
|
|
IO: runtime.IO{
|
|
Stdin: r.Stdin,
|
|
Stdout: r.Stdout,
|
|
Stderr: r.Stderr,
|
|
Terminal: r.Terminal,
|
|
},
|
|
}); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
func (s *Service) ResizePty(ctx context.Context, r *api.ResizePtyRequest) (*google_protobuf.Empty, error) {
|
|
t, err := s.getTask(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p := runtime.Process(t)
|
|
if r.ExecID != "" {
|
|
if p, err = t.Process(ctx, r.ExecID); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
}
|
|
if err := p.ResizePty(ctx, runtime.ConsoleSize{
|
|
Width: r.Width,
|
|
Height: r.Height,
|
|
}); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
func (s *Service) CloseIO(ctx context.Context, r *api.CloseIORequest) (*google_protobuf.Empty, error) {
|
|
t, err := s.getTask(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p := runtime.Process(t)
|
|
if r.ExecID != "" {
|
|
if p, err = t.Process(ctx, r.ExecID); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
}
|
|
if r.Stdin {
|
|
if err := p.CloseIO(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointTaskRequest) (*api.CheckpointTaskResponse, error) {
|
|
container, err := s.getContainer(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
t, err := s.getTaskFromContainer(ctx, container)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
image, err := ioutil.TempDir("", "ctd-checkpoint")
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
defer os.RemoveAll(image)
|
|
if err := t.Checkpoint(ctx, image, r.Options); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
// write checkpoint to the content store
|
|
tar := archive.Diff(ctx, "", image)
|
|
cp, err := s.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, image, tar)
|
|
// close tar first after write
|
|
if err := tar.Close(); err != nil {
|
|
return nil, err
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// write the config to the content store
|
|
data, err := container.Spec.Marshal()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
spec := bytes.NewReader(data)
|
|
specD, err := s.writeContent(ctx, images.MediaTypeContainerd1CheckpointConfig, filepath.Join(image, "spec"), spec)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return &api.CheckpointTaskResponse{
|
|
Descriptors: []*types.Descriptor{
|
|
cp,
|
|
specD,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) Update(ctx context.Context, r *api.UpdateTaskRequest) (*google_protobuf.Empty, error) {
|
|
t, err := s.getTask(ctx, r.ContainerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := t.Update(ctx, r.Resources); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
func (s *Service) Metrics(ctx context.Context, r *api.MetricsRequest) (*api.MetricsResponse, error) {
|
|
filter, err := filters.ParseAll(r.Filters...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var resp api.MetricsResponse
|
|
for _, r := range s.runtimes {
|
|
tasks, err := r.Tasks(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
getTasksMetrics(ctx, filter, tasks, &resp)
|
|
}
|
|
return &resp, nil
|
|
}
|
|
|
|
func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime.Task, r *api.MetricsResponse) {
|
|
for _, tk := range tasks {
|
|
if !filter.Match(filters.AdapterFunc(func(fieldpath []string) (string, bool) {
|
|
t := tk
|
|
switch fieldpath[0] {
|
|
case "id":
|
|
return t.ID(), true
|
|
case "namespace":
|
|
return t.Info().Namespace, true
|
|
case "runtime":
|
|
return t.Info().Runtime, true
|
|
}
|
|
return "", false
|
|
})) {
|
|
continue
|
|
}
|
|
|
|
collected := time.Now()
|
|
metrics, err := tk.Metrics(ctx)
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Errorf("collecting metrics for %s", tk.ID())
|
|
continue
|
|
}
|
|
data, err := typeurl.MarshalAny(metrics)
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Errorf("marshal metrics for %s", tk.ID())
|
|
continue
|
|
}
|
|
r.Metrics = append(r.Metrics, &types.Metric{
|
|
ID: tk.ID(),
|
|
Timestamp: collected,
|
|
Data: data,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
|
|
writer, err := s.store.Writer(ctx, ref, 0, "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer writer.Close()
|
|
size, err := io.Copy(writer, r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := writer.Commit(ctx, 0, ""); err != nil {
|
|
return nil, err
|
|
}
|
|
return &types.Descriptor{
|
|
MediaType: mediaType,
|
|
Digest: writer.Digest(),
|
|
Size_: size,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) getContainer(ctx context.Context, id string) (*containers.Container, error) {
|
|
var container containers.Container
|
|
if err := s.db.View(func(tx *bolt.Tx) error {
|
|
store := metadata.NewContainerStore(tx)
|
|
var err error
|
|
container, err = store.Get(ctx, id)
|
|
return err
|
|
}); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return &container, nil
|
|
}
|
|
|
|
func (s *Service) getTask(ctx context.Context, id string) (runtime.Task, error) {
|
|
container, err := s.getContainer(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.getTaskFromContainer(ctx, container)
|
|
}
|
|
|
|
func (s *Service) getTaskFromContainer(ctx context.Context, container *containers.Container) (runtime.Task, error) {
|
|
runtime, err := s.getRuntime(container.Runtime.Name)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPCf(err, "runtime for task %s", container.Runtime.Name)
|
|
}
|
|
t, err := runtime.Get(ctx, container.ID)
|
|
if err != nil {
|
|
return nil, grpc.Errorf(codes.NotFound, "task %v not found", container.ID)
|
|
}
|
|
return t, nil
|
|
}
|
|
|
|
func (s *Service) getRuntime(name string) (runtime.Runtime, error) {
|
|
runtime, ok := s.runtimes[name]
|
|
if !ok {
|
|
return nil, grpc.Errorf(codes.NotFound, "unknown runtime %q", name)
|
|
}
|
|
return runtime, nil
|
|
}
|