@@ -42,6 +42,7 @@ import (
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/containerd/containerd/runtime"
|
||||
"github.com/containerd/containerd/runtime/v2"
|
||||
"github.com/containerd/containerd/services"
|
||||
"github.com/containerd/typeurl"
|
||||
ptypes "github.com/gogo/protobuf/types"
|
||||
@@ -63,7 +64,9 @@ func init() {
|
||||
ID: services.TasksService,
|
||||
Requires: []plugin.Type{
|
||||
plugin.RuntimePlugin,
|
||||
plugin.RuntimePluginV2,
|
||||
plugin.MetadataPlugin,
|
||||
plugin.TaskMonitorPlugin,
|
||||
},
|
||||
InitFn: initFunc,
|
||||
})
|
||||
@@ -75,11 +78,15 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
v2r, err := ic.Get(plugin.RuntimePluginV2)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m, err := ic.Get(plugin.MetadataPlugin)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cs := m.(*metadata.DB).ContentStore()
|
||||
runtimes := make(map[string]runtime.PlatformRuntime)
|
||||
for _, rr := range rt {
|
||||
ri, err := rr.Instance()
|
||||
@@ -94,12 +101,33 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) {
|
||||
if len(runtimes) == 0 {
|
||||
return nil, errors.New("no runtimes available to create task service")
|
||||
}
|
||||
return &local{
|
||||
monitor, err := ic.Get(plugin.TaskMonitorPlugin)
|
||||
if err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
monitor = runtime.NewNoopMonitor()
|
||||
}
|
||||
|
||||
cs := m.(*metadata.DB).ContentStore()
|
||||
l := &local{
|
||||
runtimes: runtimes,
|
||||
db: m.(*metadata.DB),
|
||||
store: cs,
|
||||
publisher: ic.Events,
|
||||
}, nil
|
||||
monitor: monitor.(runtime.TaskMonitor),
|
||||
v2Runtime: v2r.(*v2.TaskManager),
|
||||
}
|
||||
for _, r := range runtimes {
|
||||
tasks, err := r.Tasks(ic.Context, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, t := range tasks {
|
||||
l.monitor.Monitor(t)
|
||||
}
|
||||
}
|
||||
return l, nil
|
||||
}
|
||||
|
||||
type local struct {
|
||||
@@ -107,6 +135,9 @@ type local struct {
|
||||
db *metadata.DB
|
||||
store content.Store
|
||||
publisher events.Publisher
|
||||
|
||||
monitor runtime.TaskMonitor
|
||||
v2Runtime *v2.TaskManager
|
||||
}
|
||||
|
||||
func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
|
||||
@@ -136,7 +167,6 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
container, err := l.getContainer(ctx, r.ContainerID)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
@@ -149,8 +179,10 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.
|
||||
Stderr: r.Stderr,
|
||||
Terminal: r.Terminal,
|
||||
},
|
||||
Checkpoint: checkpointPath,
|
||||
Options: r.Options,
|
||||
Checkpoint: checkpointPath,
|
||||
Runtime: container.Runtime.Name,
|
||||
RuntimeOptions: container.Runtime.Options,
|
||||
TaskOptions: r.Options,
|
||||
}
|
||||
for _, m := range r.Rootfs {
|
||||
opts.Rootfs = append(opts.Rootfs, mount.Mount{
|
||||
@@ -167,11 +199,14 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
// TODO: fast path for getting pid on create
|
||||
if err := l.monitor.Monitor(c); err != nil {
|
||||
return nil, errors.Wrap(err, "monitor task")
|
||||
}
|
||||
state, err := c.State(ctx)
|
||||
if err != nil {
|
||||
log.G(ctx).Error(err)
|
||||
}
|
||||
|
||||
return &api.CreateTaskResponse{
|
||||
ContainerID: r.ContainerID,
|
||||
Pid: state.Pid,
|
||||
@@ -206,13 +241,12 @@ func (l *local) Delete(ctx context.Context, r *api.DeleteTaskRequest, _ ...grpc.
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
runtime, err := l.getRuntime(t.Info().Runtime)
|
||||
if err != nil {
|
||||
if err := l.monitor.Stop(t); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exit, err := runtime.Delete(ctx, t)
|
||||
exit, err := t.Delete(ctx)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
return nil, err
|
||||
}
|
||||
return &api.DeleteResponse{
|
||||
ExitStatus: exit.Status,
|
||||
@@ -226,7 +260,11 @@ func (l *local) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exit, err := t.DeleteProcess(ctx, r.ExecID)
|
||||
process, err := t.Process(ctx, r.ExecID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exit, err := process.Delete(ctx)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
@@ -293,8 +331,8 @@ func (l *local) Get(ctx context.Context, r *api.GetRequest, _ ...grpc.CallOption
|
||||
|
||||
func (l *local) List(ctx context.Context, r *api.ListTasksRequest, _ ...grpc.CallOption) (*api.ListTasksResponse, error) {
|
||||
resp := &api.ListTasksResponse{}
|
||||
for _, r := range l.runtimes {
|
||||
tasks, err := r.Tasks(ctx)
|
||||
for _, r := range l.allRuntimes() {
|
||||
tasks, err := r.Tasks(ctx, false)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
@@ -508,8 +546,8 @@ func (l *local) Metrics(ctx context.Context, r *api.MetricsRequest, _ ...grpc.Ca
|
||||
return nil, err
|
||||
}
|
||||
var resp api.MetricsResponse
|
||||
for _, r := range l.runtimes {
|
||||
tasks, err := r.Tasks(ctx)
|
||||
for _, r := range l.allRuntimes() {
|
||||
tasks, err := r.Tasks(ctx, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -547,32 +585,26 @@ func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime
|
||||
case "id":
|
||||
return t.ID(), true
|
||||
case "namespace":
|
||||
return t.Info().Namespace, true
|
||||
// return t.Info().Namespace, true
|
||||
case "runtime":
|
||||
return t.Info().Runtime, true
|
||||
// return t.Info().Runtime, true
|
||||
}
|
||||
return "", false
|
||||
})) {
|
||||
continue
|
||||
}
|
||||
|
||||
collected := time.Now()
|
||||
metrics, err := tk.Metrics(ctx)
|
||||
stats, err := tk.Stats(ctx)
|
||||
if err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
log.G(ctx).WithError(err).Errorf("collecting metrics for %s", tk.ID())
|
||||
}
|
||||
continue
|
||||
}
|
||||
data, err := typeurl.MarshalAny(metrics)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("marshal metrics for %s", tk.ID())
|
||||
continue
|
||||
}
|
||||
r.Metrics = append(r.Metrics, &types.Metric{
|
||||
ID: tk.ID(),
|
||||
Timestamp: collected,
|
||||
Data: data,
|
||||
ID: tk.ID(),
|
||||
Data: stats,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -633,7 +665,16 @@ func (l *local) getTaskFromContainer(ctx context.Context, container *containers.
|
||||
func (l *local) getRuntime(name string) (runtime.PlatformRuntime, error) {
|
||||
runtime, ok := l.runtimes[name]
|
||||
if !ok {
|
||||
return nil, status.Errorf(codes.NotFound, "unknown runtime %q", name)
|
||||
// one runtime to rule them all
|
||||
return l.v2Runtime, nil
|
||||
}
|
||||
return runtime, nil
|
||||
}
|
||||
|
||||
func (l *local) allRuntimes() (o []runtime.PlatformRuntime) {
|
||||
for _, r := range l.runtimes {
|
||||
o = append(o, r)
|
||||
}
|
||||
o = append(o, l.v2Runtime)
|
||||
return o
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user