Merge pull request #1468 from crosbymichael/stats

Add Metrics endpoint to tasks
This commit is contained in:
Kenfe-Mickaël Laventure
2017-09-06 14:59:55 -07:00
committed by GitHub
37 changed files with 5270 additions and 367 deletions

View File

@@ -7,6 +7,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/tasks/v1"
@@ -17,12 +18,14 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/typeurl"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
"google.golang.org/grpc"
@@ -454,6 +457,58 @@ func (s *Service) Update(ctx context.Context, r *api.UpdateTaskRequest) (*google
return empty, nil
}
func (s *Service) Metrics(ctx context.Context, r *api.MetricsRequest) (*api.MetricsResponse, error) {
filter, err := filters.ParseAll(r.Filters...)
if err != nil {
return nil, err
}
var resp api.MetricsResponse
for _, r := range s.runtimes {
tasks, err := r.Tasks(ctx)
if err != nil {
return nil, err
}
getTasksMetrics(ctx, filter, tasks, &resp)
}
return &resp, nil
}
func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime.Task, r *api.MetricsResponse) {
for _, tk := range tasks {
if !filter.Match(filters.AdapterFunc(func(fieldpath []string) (string, bool) {
t := tk
switch fieldpath[0] {
case "id":
return t.ID(), true
case "namespace":
return t.Info().Namespace, true
case "runtime":
return t.Info().Runtime, true
}
return "", false
})) {
continue
}
collected := time.Now()
metrics, err := tk.Metrics(ctx)
if err != nil {
log.G(ctx).WithError(err).Errorf("collecting metrics for %s", tk.ID())
continue
}
data, err := typeurl.MarshalAny(metrics)
if err != nil {
log.G(ctx).WithError(err).Errorf("marshal metrics for %s", tk.ID())
continue
}
r.Metrics = append(r.Metrics, &types.Metric{
ID: tk.ID(),
Timestamp: collected,
Data: data,
})
}
}
func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
writer, err := s.store.Writer(ctx, ref, 0, "")
if err != nil {