Add spans to CRI runtime service and related client methods

This adds otel spans to CRI service mainly targeting mutating apis which includes:
* Sandbox apis - RunPodSandbox, StopPodSandbox, RemovePodSandbox
* Container apis - CreateContainer, StartContainer, StopContainer, RemoveContainer
* Attach, Exec and Exec Sync
* Containerd client methods: container.go, client.go, process.go and task.go

Signed-off-by: Swagat Bora <sbora@amazon.com>
This commit is contained in:
Swagat Bora
2022-11-17 17:52:08 +00:00
parent 45d8917089
commit c0cdcb34f1
17 changed files with 287 additions and 13 deletions

View File

@@ -59,6 +59,7 @@ import (
"github.com/containerd/containerd/v2/pkg/dialer"
"github.com/containerd/containerd/v2/pkg/namespaces"
ptypes "github.com/containerd/containerd/v2/pkg/protobuf/types"
"github.com/containerd/containerd/v2/pkg/tracing"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/errdefs"
"github.com/containerd/platforms"
@@ -284,6 +285,8 @@ func (c *Client) Containers(ctx context.Context, filters ...string) ([]Container
// NewContainer will create a new container with the provided id.
// The id must be unique within the namespace.
func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
ctx, span := tracing.StartSpan(ctx, "client.NewContainer")
defer span.End()
ctx, done, err := c.WithLease(ctx)
if err != nil {
return nil, err
@@ -301,6 +304,13 @@ func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContain
return nil, err
}
}
span.SetAttributes(
tracing.Attribute("container.id", container.ID),
tracing.Attribute("container.image.ref", container.Image),
tracing.Attribute("container.runtime.name", container.Runtime.Name),
tracing.Attribute("container.snapshotter.name", container.Snapshotter),
)
r, err := c.ContainerService().Create(ctx, container)
if err != nil {
return nil, err
@@ -310,10 +320,21 @@ func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContain
// LoadContainer loads an existing container from metadata
func (c *Client) LoadContainer(ctx context.Context, id string) (Container, error) {
ctx, span := tracing.StartSpan(ctx, "client.LoadContainer")
defer span.End()
r, err := c.ContainerService().Get(ctx, id)
if err != nil {
return nil, err
}
span.SetAttributes(
tracing.Attribute("container.id", r.ID),
tracing.Attribute("container.image.ref", r.Image),
tracing.Attribute("container.runtime.name", r.Runtime.Name),
tracing.Attribute("container.snapshotter.name", r.Snapshotter),
tracing.Attribute("container.createdAt", r.CreatedAt.Format(time.RFC3339)),
tracing.Attribute("container.updatedAt", r.UpdatedAt.Format(time.RFC3339)),
)
return containerFromRecord(c, r), nil
}

View File

@@ -32,6 +32,7 @@ import (
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/pkg/oci"
"github.com/containerd/containerd/v2/pkg/tracing"
"github.com/containerd/errdefs"
"github.com/containerd/fifo"
"github.com/containerd/typeurl/v2"
@@ -140,6 +141,10 @@ func (c *container) Labels(ctx context.Context) (map[string]string, error) {
}
func (c *container) SetLabels(ctx context.Context, labels map[string]string) (map[string]string, error) {
ctx, span := tracing.StartSpan(ctx, "container.SetLabels",
tracing.WithAttribute("container.id", c.id),
)
defer span.End()
container := containers.Container{
ID: c.id,
Labels: labels,
@@ -175,6 +180,10 @@ func (c *container) Spec(ctx context.Context) (*oci.Spec, error) {
// Delete deletes an existing container
// an error is returned if the container has running tasks
func (c *container) Delete(ctx context.Context, opts ...DeleteOpts) error {
ctx, span := tracing.StartSpan(ctx, "container.Delete",
tracing.WithAttribute("container.id", c.id),
)
defer span.End()
if _, err := c.loadTask(ctx, nil); err == nil {
return fmt.Errorf("cannot delete running task %v: %w", c.id, errdefs.ErrFailedPrecondition)
}
@@ -211,6 +220,8 @@ func (c *container) Image(ctx context.Context) (Image, error) {
}
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) {
ctx, span := tracing.StartSpan(ctx, "container.NewTask")
defer span.End()
i, err := ioCreate(c.id)
if err != nil {
return nil, err
@@ -298,16 +309,28 @@ func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...N
if info.Checkpoint != nil {
request.Checkpoint = info.Checkpoint
}
span.SetAttributes(
tracing.Attribute("task.container.id", request.ContainerID),
tracing.Attribute("task.request.options", request.Options.String()),
tracing.Attribute("task.runtime.name", info.runtime),
)
response, err := c.client.TaskService().Create(ctx, request)
if err != nil {
return nil, errdefs.FromGRPC(err)
}
span.AddEvent("task created",
tracing.Attribute("task.process.id", int(response.Pid)),
)
t.pid = response.Pid
return t, nil
}
func (c *container) Update(ctx context.Context, opts ...UpdateContainerOpts) error {
// fetch the current container config before updating it
ctx, span := tracing.StartSpan(ctx, "container.Update")
defer span.End()
r, err := c.get(ctx)
if err != nil {
return err

View File

@@ -26,6 +26,7 @@ import (
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/pkg/protobuf"
"github.com/containerd/containerd/v2/pkg/tracing"
"github.com/containerd/errdefs"
)
@@ -118,6 +119,11 @@ func (p *process) Pid() uint32 {
// Start starts the exec process
func (p *process) Start(ctx context.Context) error {
ctx, span := tracing.StartSpan(ctx, "process.Start",
tracing.WithAttribute("process.id", p.ID()),
tracing.WithAttribute("process.task.id", p.task.ID()),
)
defer span.End()
r, err := p.task.client.TaskService().Start(ctx, &tasks.StartRequest{
ContainerID: p.task.id,
ExecID: p.id,
@@ -130,11 +136,18 @@ func (p *process) Start(ctx context.Context) error {
}
return errdefs.FromGRPC(err)
}
span.SetAttributes(tracing.Attribute("process.pid", int(r.Pid)))
p.pid = r.Pid
return nil
}
func (p *process) Kill(ctx context.Context, s syscall.Signal, opts ...KillOpts) error {
ctx, span := tracing.StartSpan(ctx, "process.Kill",
tracing.WithAttribute("process.id", p.ID()),
tracing.WithAttribute("process.pid", int(p.Pid())),
tracing.WithAttribute("process.task.id", p.task.ID()),
)
defer span.End()
var i KillInfo
for _, o := range opts {
if err := o(ctx, &i); err != nil {
@@ -154,6 +167,11 @@ func (p *process) Wait(ctx context.Context) (<-chan ExitStatus, error) {
c := make(chan ExitStatus, 1)
go func() {
defer close(c)
ctx, span := tracing.StartSpan(ctx, "process.Wait",
tracing.WithAttribute("process.id", p.ID()),
tracing.WithAttribute("process.task.id", p.task.ID()),
)
defer span.End()
r, err := p.task.client.TaskService().Wait(ctx, &tasks.WaitRequest{
ContainerID: p.task.id,
ExecID: p.id,
@@ -174,6 +192,10 @@ func (p *process) Wait(ctx context.Context) (<-chan ExitStatus, error) {
}
func (p *process) CloseIO(ctx context.Context, opts ...IOCloserOpts) error {
ctx, span := tracing.StartSpan(ctx, "process.CloseIO",
tracing.WithAttribute("process.id", p.ID()),
)
defer span.End()
r := &tasks.CloseIORequest{
ContainerID: p.task.id,
ExecID: p.id,
@@ -192,6 +214,10 @@ func (p *process) IO() cio.IO {
}
func (p *process) Resize(ctx context.Context, w, h uint32) error {
ctx, span := tracing.StartSpan(ctx, "process.Resize",
tracing.WithAttribute("process.id", p.ID()),
)
defer span.End()
_, err := p.task.client.TaskService().ResizePty(ctx, &tasks.ResizePtyRequest{
ContainerID: p.task.id,
Width: w,
@@ -202,6 +228,10 @@ func (p *process) Resize(ctx context.Context, w, h uint32) error {
}
func (p *process) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStatus, error) {
ctx, span := tracing.StartSpan(ctx, "process.Delete",
tracing.WithAttribute("process.id", p.ID()),
)
defer span.End()
for _, o := range opts {
if err := o(ctx, p); err != nil {
return nil, err
@@ -238,8 +268,11 @@ func (p *process) Status(ctx context.Context) (Status, error) {
if err != nil {
return Status{}, errdefs.FromGRPC(err)
}
status := ProcessStatus(strings.ToLower(r.Process.Status.String()))
exitStatus := r.Process.ExitStatus
return Status{
Status: ProcessStatus(strings.ToLower(r.Process.Status.String())),
ExitStatus: r.Process.ExitStatus,
Status: status,
ExitStatus: exitStatus,
}, nil
}

View File

@@ -38,6 +38,7 @@ import (
"github.com/containerd/containerd/v2/pkg/protobuf"
google_protobuf "github.com/containerd/containerd/v2/pkg/protobuf/types"
"github.com/containerd/containerd/v2/pkg/rootfs"
"github.com/containerd/containerd/v2/pkg/tracing"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/errdefs"
"github.com/containerd/typeurl/v2"
@@ -210,6 +211,10 @@ func (t *task) Pid() uint32 {
}
func (t *task) Start(ctx context.Context) error {
ctx, span := tracing.StartSpan(ctx, "task.Start",
tracing.WithAttribute("task.id", t.ID()),
)
defer span.End()
r, err := t.client.TaskService().Start(ctx, &tasks.StartRequest{
ContainerID: t.id,
})
@@ -220,17 +225,28 @@ func (t *task) Start(ctx context.Context) error {
}
return errdefs.FromGRPC(err)
}
span.SetAttributes(tracing.Attribute("task.pid", r.Pid))
t.pid = r.Pid
return nil
}
func (t *task) Kill(ctx context.Context, s syscall.Signal, opts ...KillOpts) error {
ctx, span := tracing.StartSpan(ctx, "task.Kill",
tracing.WithAttribute("task.id", t.ID()),
tracing.WithAttribute("task.pid", int(t.Pid())),
)
defer span.End()
var i KillInfo
for _, o := range opts {
if err := o(ctx, &i); err != nil {
return err
}
}
span.SetAttributes(
tracing.Attribute("task.exec.id", i.ExecID),
tracing.Attribute("task.exec.killall", i.All),
)
_, err := t.client.TaskService().Kill(ctx, &tasks.KillRequest{
Signal: uint32(s),
ContainerID: t.id,
@@ -244,6 +260,10 @@ func (t *task) Kill(ctx context.Context, s syscall.Signal, opts ...KillOpts) err
}
func (t *task) Pause(ctx context.Context) error {
ctx, span := tracing.StartSpan(ctx, "task.Pause",
tracing.WithAttribute("task.id", t.ID()),
)
defer span.End()
_, err := t.client.TaskService().Pause(ctx, &tasks.PauseTaskRequest{
ContainerID: t.id,
})
@@ -251,6 +271,10 @@ func (t *task) Pause(ctx context.Context) error {
}
func (t *task) Resume(ctx context.Context) error {
ctx, span := tracing.StartSpan(ctx, "task.Resume",
tracing.WithAttribute("task.id", t.ID()),
)
defer span.End()
_, err := t.client.TaskService().Resume(ctx, &tasks.ResumeTaskRequest{
ContainerID: t.id,
})
@@ -264,10 +288,14 @@ func (t *task) Status(ctx context.Context) (Status, error) {
if err != nil {
return Status{}, errdefs.FromGRPC(err)
}
status := ProcessStatus(strings.ToLower(r.Process.Status.String()))
exitStatus := r.Process.ExitStatus
exitTime := protobuf.FromTimestamp(r.Process.ExitedAt)
return Status{
Status: ProcessStatus(strings.ToLower(r.Process.Status.String())),
ExitStatus: r.Process.ExitStatus,
ExitTime: protobuf.FromTimestamp(r.Process.ExitedAt),
Status: status,
ExitStatus: exitStatus,
ExitTime: exitTime,
}, nil
}
@@ -275,6 +303,10 @@ func (t *task) Wait(ctx context.Context) (<-chan ExitStatus, error) {
c := make(chan ExitStatus, 1)
go func() {
defer close(c)
ctx, span := tracing.StartSpan(ctx, "task.Wait",
tracing.WithAttribute("task.id", t.ID()),
)
defer span.End()
r, err := t.client.TaskService().Wait(ctx, &tasks.WaitRequest{
ContainerID: t.id,
})
@@ -297,6 +329,10 @@ func (t *task) Wait(ctx context.Context) (<-chan ExitStatus, error) {
// it returns the exit status of the task and any errors that were encountered
// during cleanup
func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStatus, error) {
ctx, span := tracing.StartSpan(ctx, "task.Delete",
tracing.WithAttribute("task.id", t.ID()),
)
defer span.End()
for _, o := range opts {
if err := o(ctx, t); err != nil {
return nil, err
@@ -306,6 +342,7 @@ func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStat
if err != nil && errdefs.IsNotFound(err) {
return nil, err
}
switch status.Status {
case Stopped, Unknown, "":
case Created:
@@ -350,9 +387,14 @@ func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStat
}
func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate cio.Creator) (_ Process, err error) {
ctx, span := tracing.StartSpan(ctx, "task.Exec",
tracing.WithAttribute("task.id", t.ID()),
)
defer span.End()
if id == "" {
return nil, fmt.Errorf("exec id must not be empty: %w", errdefs.ErrInvalidArgument)
}
span.SetAttributes(tracing.Attribute("task.exec.id", id))
i, err := ioCreate(id)
if err != nil {
return nil, err
@@ -408,6 +450,10 @@ func (t *task) Pids(ctx context.Context) ([]ProcessInfo, error) {
}
func (t *task) CloseIO(ctx context.Context, opts ...IOCloserOpts) error {
ctx, span := tracing.StartSpan(ctx, "task.CloseIO",
tracing.WithAttribute("task.id", t.ID()),
)
defer span.End()
r := &tasks.CloseIORequest{
ContainerID: t.id,
}
@@ -416,6 +462,7 @@ func (t *task) CloseIO(ctx context.Context, opts ...IOCloserOpts) error {
o(&i)
}
r.Stdin = i.Stdin
_, err := t.client.TaskService().CloseIO(ctx, r)
return errdefs.FromGRPC(err)
}
@@ -425,6 +472,10 @@ func (t *task) IO() cio.IO {
}
func (t *task) Resize(ctx context.Context, w, h uint32) error {
ctx, span := tracing.StartSpan(ctx, "task.Resize",
tracing.WithAttribute("task.id", t.ID()),
)
defer span.End()
_, err := t.client.TaskService().ResizePty(ctx, &tasks.ResizePtyRequest{
ContainerID: t.id,
Width: w,
@@ -538,6 +589,10 @@ type UpdateTaskInfo struct {
type UpdateTaskOpts func(context.Context, *Client, *UpdateTaskInfo) error
func (t *task) Update(ctx context.Context, opts ...UpdateTaskOpts) error {
ctx, span := tracing.StartSpan(ctx, "task.Update",
tracing.WithAttribute("task.id", t.ID()),
)
defer span.End()
request := &tasks.UpdateTaskRequest{
ContainerID: t.id,
}