diff --git a/client/client.go b/client/client.go index 01f544d1b..70687c55a 100644 --- a/client/client.go +++ b/client/client.go @@ -59,6 +59,7 @@ import ( "github.com/containerd/containerd/v2/pkg/dialer" "github.com/containerd/containerd/v2/pkg/namespaces" ptypes "github.com/containerd/containerd/v2/pkg/protobuf/types" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/containerd/v2/plugins" "github.com/containerd/errdefs" "github.com/containerd/platforms" @@ -284,6 +285,8 @@ func (c *Client) Containers(ctx context.Context, filters ...string) ([]Container // NewContainer will create a new container with the provided id. // The id must be unique within the namespace. func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) { + ctx, span := tracing.StartSpan(ctx, "client.NewContainer") + defer span.End() ctx, done, err := c.WithLease(ctx) if err != nil { return nil, err @@ -301,6 +304,13 @@ func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContain return nil, err } } + + span.SetAttributes( + tracing.Attribute("container.id", container.ID), + tracing.Attribute("container.image.ref", container.Image), + tracing.Attribute("container.runtime.name", container.Runtime.Name), + tracing.Attribute("container.snapshotter.name", container.Snapshotter), + ) r, err := c.ContainerService().Create(ctx, container) if err != nil { return nil, err @@ -310,10 +320,21 @@ func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContain // LoadContainer loads an existing container from metadata func (c *Client) LoadContainer(ctx context.Context, id string) (Container, error) { + ctx, span := tracing.StartSpan(ctx, "client.LoadContainer") + defer span.End() r, err := c.ContainerService().Get(ctx, id) if err != nil { return nil, err } + + span.SetAttributes( + tracing.Attribute("container.id", r.ID), + tracing.Attribute("container.image.ref", r.Image), + tracing.Attribute("container.runtime.name", r.Runtime.Name), + tracing.Attribute("container.snapshotter.name", r.Snapshotter), + tracing.Attribute("container.createdAt", r.CreatedAt.Format(time.RFC3339)), + tracing.Attribute("container.updatedAt", r.UpdatedAt.Format(time.RFC3339)), + ) return containerFromRecord(c, r), nil } diff --git a/client/container.go b/client/container.go index 41fe63ffa..2730f749d 100644 --- a/client/container.go +++ b/client/container.go @@ -32,6 +32,7 @@ import ( "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/pkg/cio" "github.com/containerd/containerd/v2/pkg/oci" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/errdefs" "github.com/containerd/fifo" "github.com/containerd/typeurl/v2" @@ -140,6 +141,10 @@ func (c *container) Labels(ctx context.Context) (map[string]string, error) { } func (c *container) SetLabels(ctx context.Context, labels map[string]string) (map[string]string, error) { + ctx, span := tracing.StartSpan(ctx, "container.SetLabels", + tracing.WithAttribute("container.id", c.id), + ) + defer span.End() container := containers.Container{ ID: c.id, Labels: labels, @@ -175,6 +180,10 @@ func (c *container) Spec(ctx context.Context) (*oci.Spec, error) { // Delete deletes an existing container // an error is returned if the container has running tasks func (c *container) Delete(ctx context.Context, opts ...DeleteOpts) error { + ctx, span := tracing.StartSpan(ctx, "container.Delete", + tracing.WithAttribute("container.id", c.id), + ) + defer span.End() if _, err := c.loadTask(ctx, nil); err == nil { return fmt.Errorf("cannot delete running task %v: %w", c.id, errdefs.ErrFailedPrecondition) } @@ -211,6 +220,8 @@ func (c *container) Image(ctx context.Context) (Image, error) { } func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) { + ctx, span := tracing.StartSpan(ctx, "container.NewTask") + defer span.End() i, err := ioCreate(c.id) if err != nil { return nil, err @@ -298,16 +309,28 @@ func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...N if info.Checkpoint != nil { request.Checkpoint = info.Checkpoint } + + span.SetAttributes( + tracing.Attribute("task.container.id", request.ContainerID), + tracing.Attribute("task.request.options", request.Options.String()), + tracing.Attribute("task.runtime.name", info.runtime), + ) response, err := c.client.TaskService().Create(ctx, request) if err != nil { return nil, errdefs.FromGRPC(err) } + + span.AddEvent("task created", + tracing.Attribute("task.process.id", int(response.Pid)), + ) t.pid = response.Pid return t, nil } func (c *container) Update(ctx context.Context, opts ...UpdateContainerOpts) error { // fetch the current container config before updating it + ctx, span := tracing.StartSpan(ctx, "container.Update") + defer span.End() r, err := c.get(ctx) if err != nil { return err diff --git a/client/process.go b/client/process.go index 4cd72faf5..59f1bbc9c 100644 --- a/client/process.go +++ b/client/process.go @@ -26,6 +26,7 @@ import ( "github.com/containerd/containerd/api/services/tasks/v1" "github.com/containerd/containerd/v2/pkg/cio" "github.com/containerd/containerd/v2/pkg/protobuf" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/errdefs" ) @@ -118,6 +119,11 @@ func (p *process) Pid() uint32 { // Start starts the exec process func (p *process) Start(ctx context.Context) error { + ctx, span := tracing.StartSpan(ctx, "process.Start", + tracing.WithAttribute("process.id", p.ID()), + tracing.WithAttribute("process.task.id", p.task.ID()), + ) + defer span.End() r, err := p.task.client.TaskService().Start(ctx, &tasks.StartRequest{ ContainerID: p.task.id, ExecID: p.id, @@ -130,11 +136,18 @@ func (p *process) Start(ctx context.Context) error { } return errdefs.FromGRPC(err) } + span.SetAttributes(tracing.Attribute("process.pid", int(r.Pid))) p.pid = r.Pid return nil } func (p *process) Kill(ctx context.Context, s syscall.Signal, opts ...KillOpts) error { + ctx, span := tracing.StartSpan(ctx, "process.Kill", + tracing.WithAttribute("process.id", p.ID()), + tracing.WithAttribute("process.pid", int(p.Pid())), + tracing.WithAttribute("process.task.id", p.task.ID()), + ) + defer span.End() var i KillInfo for _, o := range opts { if err := o(ctx, &i); err != nil { @@ -154,6 +167,11 @@ func (p *process) Wait(ctx context.Context) (<-chan ExitStatus, error) { c := make(chan ExitStatus, 1) go func() { defer close(c) + ctx, span := tracing.StartSpan(ctx, "process.Wait", + tracing.WithAttribute("process.id", p.ID()), + tracing.WithAttribute("process.task.id", p.task.ID()), + ) + defer span.End() r, err := p.task.client.TaskService().Wait(ctx, &tasks.WaitRequest{ ContainerID: p.task.id, ExecID: p.id, @@ -174,6 +192,10 @@ func (p *process) Wait(ctx context.Context) (<-chan ExitStatus, error) { } func (p *process) CloseIO(ctx context.Context, opts ...IOCloserOpts) error { + ctx, span := tracing.StartSpan(ctx, "process.CloseIO", + tracing.WithAttribute("process.id", p.ID()), + ) + defer span.End() r := &tasks.CloseIORequest{ ContainerID: p.task.id, ExecID: p.id, @@ -192,6 +214,10 @@ func (p *process) IO() cio.IO { } func (p *process) Resize(ctx context.Context, w, h uint32) error { + ctx, span := tracing.StartSpan(ctx, "process.Resize", + tracing.WithAttribute("process.id", p.ID()), + ) + defer span.End() _, err := p.task.client.TaskService().ResizePty(ctx, &tasks.ResizePtyRequest{ ContainerID: p.task.id, Width: w, @@ -202,6 +228,10 @@ func (p *process) Resize(ctx context.Context, w, h uint32) error { } func (p *process) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStatus, error) { + ctx, span := tracing.StartSpan(ctx, "process.Delete", + tracing.WithAttribute("process.id", p.ID()), + ) + defer span.End() for _, o := range opts { if err := o(ctx, p); err != nil { return nil, err @@ -238,8 +268,11 @@ func (p *process) Status(ctx context.Context) (Status, error) { if err != nil { return Status{}, errdefs.FromGRPC(err) } + status := ProcessStatus(strings.ToLower(r.Process.Status.String())) + exitStatus := r.Process.ExitStatus + return Status{ - Status: ProcessStatus(strings.ToLower(r.Process.Status.String())), - ExitStatus: r.Process.ExitStatus, + Status: status, + ExitStatus: exitStatus, }, nil } diff --git a/client/task.go b/client/task.go index 236ae4691..0f6018dbb 100644 --- a/client/task.go +++ b/client/task.go @@ -38,6 +38,7 @@ import ( "github.com/containerd/containerd/v2/pkg/protobuf" google_protobuf "github.com/containerd/containerd/v2/pkg/protobuf/types" "github.com/containerd/containerd/v2/pkg/rootfs" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/containerd/v2/plugins" "github.com/containerd/errdefs" "github.com/containerd/typeurl/v2" @@ -210,6 +211,10 @@ func (t *task) Pid() uint32 { } func (t *task) Start(ctx context.Context) error { + ctx, span := tracing.StartSpan(ctx, "task.Start", + tracing.WithAttribute("task.id", t.ID()), + ) + defer span.End() r, err := t.client.TaskService().Start(ctx, &tasks.StartRequest{ ContainerID: t.id, }) @@ -220,17 +225,28 @@ func (t *task) Start(ctx context.Context) error { } return errdefs.FromGRPC(err) } + span.SetAttributes(tracing.Attribute("task.pid", r.Pid)) t.pid = r.Pid return nil } func (t *task) Kill(ctx context.Context, s syscall.Signal, opts ...KillOpts) error { + ctx, span := tracing.StartSpan(ctx, "task.Kill", + tracing.WithAttribute("task.id", t.ID()), + tracing.WithAttribute("task.pid", int(t.Pid())), + ) + defer span.End() var i KillInfo for _, o := range opts { if err := o(ctx, &i); err != nil { return err } } + + span.SetAttributes( + tracing.Attribute("task.exec.id", i.ExecID), + tracing.Attribute("task.exec.killall", i.All), + ) _, err := t.client.TaskService().Kill(ctx, &tasks.KillRequest{ Signal: uint32(s), ContainerID: t.id, @@ -244,6 +260,10 @@ func (t *task) Kill(ctx context.Context, s syscall.Signal, opts ...KillOpts) err } func (t *task) Pause(ctx context.Context) error { + ctx, span := tracing.StartSpan(ctx, "task.Pause", + tracing.WithAttribute("task.id", t.ID()), + ) + defer span.End() _, err := t.client.TaskService().Pause(ctx, &tasks.PauseTaskRequest{ ContainerID: t.id, }) @@ -251,6 +271,10 @@ func (t *task) Pause(ctx context.Context) error { } func (t *task) Resume(ctx context.Context) error { + ctx, span := tracing.StartSpan(ctx, "task.Resume", + tracing.WithAttribute("task.id", t.ID()), + ) + defer span.End() _, err := t.client.TaskService().Resume(ctx, &tasks.ResumeTaskRequest{ ContainerID: t.id, }) @@ -264,10 +288,14 @@ func (t *task) Status(ctx context.Context) (Status, error) { if err != nil { return Status{}, errdefs.FromGRPC(err) } + status := ProcessStatus(strings.ToLower(r.Process.Status.String())) + exitStatus := r.Process.ExitStatus + exitTime := protobuf.FromTimestamp(r.Process.ExitedAt) + return Status{ - Status: ProcessStatus(strings.ToLower(r.Process.Status.String())), - ExitStatus: r.Process.ExitStatus, - ExitTime: protobuf.FromTimestamp(r.Process.ExitedAt), + Status: status, + ExitStatus: exitStatus, + ExitTime: exitTime, }, nil } @@ -275,6 +303,10 @@ func (t *task) Wait(ctx context.Context) (<-chan ExitStatus, error) { c := make(chan ExitStatus, 1) go func() { defer close(c) + ctx, span := tracing.StartSpan(ctx, "task.Wait", + tracing.WithAttribute("task.id", t.ID()), + ) + defer span.End() r, err := t.client.TaskService().Wait(ctx, &tasks.WaitRequest{ ContainerID: t.id, }) @@ -297,6 +329,10 @@ func (t *task) Wait(ctx context.Context) (<-chan ExitStatus, error) { // it returns the exit status of the task and any errors that were encountered // during cleanup func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStatus, error) { + ctx, span := tracing.StartSpan(ctx, "task.Delete", + tracing.WithAttribute("task.id", t.ID()), + ) + defer span.End() for _, o := range opts { if err := o(ctx, t); err != nil { return nil, err @@ -306,6 +342,7 @@ func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStat if err != nil && errdefs.IsNotFound(err) { return nil, err } + switch status.Status { case Stopped, Unknown, "": case Created: @@ -350,9 +387,14 @@ func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStat } func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate cio.Creator) (_ Process, err error) { + ctx, span := tracing.StartSpan(ctx, "task.Exec", + tracing.WithAttribute("task.id", t.ID()), + ) + defer span.End() if id == "" { return nil, fmt.Errorf("exec id must not be empty: %w", errdefs.ErrInvalidArgument) } + span.SetAttributes(tracing.Attribute("task.exec.id", id)) i, err := ioCreate(id) if err != nil { return nil, err @@ -408,6 +450,10 @@ func (t *task) Pids(ctx context.Context) ([]ProcessInfo, error) { } func (t *task) CloseIO(ctx context.Context, opts ...IOCloserOpts) error { + ctx, span := tracing.StartSpan(ctx, "task.CloseIO", + tracing.WithAttribute("task.id", t.ID()), + ) + defer span.End() r := &tasks.CloseIORequest{ ContainerID: t.id, } @@ -416,6 +462,7 @@ func (t *task) CloseIO(ctx context.Context, opts ...IOCloserOpts) error { o(&i) } r.Stdin = i.Stdin + _, err := t.client.TaskService().CloseIO(ctx, r) return errdefs.FromGRPC(err) } @@ -425,6 +472,10 @@ func (t *task) IO() cio.IO { } func (t *task) Resize(ctx context.Context, w, h uint32) error { + ctx, span := tracing.StartSpan(ctx, "task.Resize", + tracing.WithAttribute("task.id", t.ID()), + ) + defer span.End() _, err := t.client.TaskService().ResizePty(ctx, &tasks.ResizePtyRequest{ ContainerID: t.id, Width: w, @@ -538,6 +589,10 @@ type UpdateTaskInfo struct { type UpdateTaskOpts func(context.Context, *Client, *UpdateTaskInfo) error func (t *task) Update(ctx context.Context, opts ...UpdateTaskOpts) error { + ctx, span := tracing.StartSpan(ctx, "task.Update", + tracing.WithAttribute("task.id", t.ID()), + ) + defer span.End() request := &tasks.UpdateTaskRequest{ ContainerID: t.id, } diff --git a/core/metadata/containers.go b/core/metadata/containers.go index 8f79fb8ea..d49259679 100644 --- a/core/metadata/containers.go +++ b/core/metadata/containers.go @@ -31,11 +31,16 @@ import ( "github.com/containerd/containerd/v2/pkg/namespaces" "github.com/containerd/containerd/v2/pkg/protobuf/proto" "github.com/containerd/containerd/v2/pkg/protobuf/types" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/errdefs" "github.com/containerd/typeurl/v2" bolt "go.etcd.io/bbolt" ) +const ( + spanContainerPrefix = "metadata.containers" +) + type containerStore struct { db *DB } @@ -116,6 +121,11 @@ func (s *containerStore) List(ctx context.Context, fs ...string) ([]containers.C } func (s *containerStore) Create(ctx context.Context, container containers.Container) (containers.Container, error) { + ctx, span := tracing.StartSpan(ctx, + tracing.Name(spanContainerPrefix, "Create"), + tracing.WithAttribute("container.id", container.ID), + ) + defer span.End() namespace, err := namespaces.NamespaceRequired(ctx) if err != nil { return containers.Container{}, err @@ -145,6 +155,9 @@ func (s *containerStore) Create(ctx context.Context, container containers.Contai return fmt.Errorf("failed to write container %q: %w", container.ID, err) } + span.SetAttributes( + tracing.Attribute("container.createdAt", container.CreatedAt.Format(time.RFC3339)), + ) return nil }); err != nil { return containers.Container{}, err @@ -154,6 +167,11 @@ func (s *containerStore) Create(ctx context.Context, container containers.Contai } func (s *containerStore) Update(ctx context.Context, container containers.Container, fieldpaths ...string) (containers.Container, error) { + ctx, span := tracing.StartSpan(ctx, + tracing.Name(spanContainerPrefix, "Update"), + tracing.WithAttribute("container.id", container.ID), + ) + defer span.End() namespace, err := namespaces.NamespaceRequired(ctx) if err != nil { return containers.Container{}, err @@ -245,6 +263,10 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai return fmt.Errorf("failed to write container %q: %w", container.ID, err) } + span.SetAttributes( + tracing.Attribute("container.createdAt", updated.CreatedAt.Format(time.RFC3339)), + tracing.Attribute("container.updatedAt", updated.UpdatedAt.Format(time.RFC3339)), + ) return nil }); err != nil { return containers.Container{}, err @@ -254,6 +276,12 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai } func (s *containerStore) Delete(ctx context.Context, id string) error { + ctx, span := tracing.StartSpan(ctx, + tracing.Name(spanContainerPrefix, "Delete"), + tracing.WithAttribute("container.id", id), + ) + defer span.End() + namespace, err := namespaces.NamespaceRequired(ctx) if err != nil { return err diff --git a/core/metadata/sandbox.go b/core/metadata/sandbox.go index 98a283a70..6659b035c 100644 --- a/core/metadata/sandbox.go +++ b/core/metadata/sandbox.go @@ -28,11 +28,16 @@ import ( "github.com/containerd/containerd/v2/pkg/filters" "github.com/containerd/containerd/v2/pkg/identifiers" "github.com/containerd/containerd/v2/pkg/namespaces" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/errdefs" "github.com/containerd/typeurl/v2" "go.etcd.io/bbolt" ) +const ( + spanSandboxPrefix = "metadata.sandbox" +) + type sandboxStore struct { db *DB } @@ -46,6 +51,11 @@ func NewSandboxStore(db *DB) api.Store { // Create a sandbox record in the store func (s *sandboxStore) Create(ctx context.Context, sandbox api.Sandbox) (api.Sandbox, error) { + ctx, span := tracing.StartSpan(ctx, + tracing.Name(spanSandboxPrefix, "Create"), + tracing.WithAttribute("sandbox.id", sandbox.ID), + ) + defer span.End() ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return api.Sandbox{}, err @@ -68,6 +78,9 @@ func (s *sandboxStore) Create(ctx context.Context, sandbox api.Sandbox) (api.San return fmt.Errorf("write error: %w", err) } + span.SetAttributes( + tracing.Attribute("sandbox.CreatedAt", sandbox.CreatedAt.Format(time.RFC3339)), + ) return nil }); err != nil { return api.Sandbox{}, err @@ -78,6 +91,11 @@ func (s *sandboxStore) Create(ctx context.Context, sandbox api.Sandbox) (api.San // Update the sandbox with the provided sandbox object and fields func (s *sandboxStore) Update(ctx context.Context, sandbox api.Sandbox, fieldpaths ...string) (api.Sandbox, error) { + ctx, span := tracing.StartSpan(ctx, + tracing.Name(spanSandboxPrefix, "Update"), + tracing.WithAttribute("sandbox.id", sandbox.ID), + ) + defer span.End() ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return api.Sandbox{}, err @@ -142,6 +160,10 @@ func (s *sandboxStore) Update(ctx context.Context, sandbox api.Sandbox, fieldpat return err } + span.SetAttributes( + tracing.Attribute("sandbox.CreatedAt", updated.CreatedAt.Format(time.RFC3339)), + tracing.Attribute("sandbox.UpdatedAt", updated.UpdatedAt.Format(time.RFC3339)), + ) ret = updated return nil }); err != nil { @@ -227,6 +249,11 @@ func (s *sandboxStore) List(ctx context.Context, fields ...string) ([]api.Sandbo // Delete a sandbox from metadata store using the id func (s *sandboxStore) Delete(ctx context.Context, id string) error { + ctx, span := tracing.StartSpan(ctx, + tracing.Name(spanSandboxPrefix, "Delete"), + tracing.WithAttribute("sandbox.id", id), + ) + defer span.End() ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err diff --git a/internal/cri/server/container_attach.go b/internal/cri/server/container_attach.go index cd7bea720..01478590c 100644 --- a/internal/cri/server/container_attach.go +++ b/internal/cri/server/container_attach.go @@ -22,6 +22,7 @@ import ( "io" containerd "github.com/containerd/containerd/v2/client" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/log" "k8s.io/client-go/tools/remotecommand" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -31,10 +32,12 @@ import ( // Attach prepares a streaming endpoint to attach to a running container, and returns the address. func (c *criService) Attach(ctx context.Context, r *runtime.AttachRequest) (*runtime.AttachResponse, error) { + span := tracing.SpanFromContext(ctx) cntr, err := c.containerStore.Get(r.GetContainerId()) if err != nil { return nil, fmt.Errorf("failed to find container in store: %w", err) } + span.SetAttributes(tracing.Attribute("container.id", cntr.ID)) state := cntr.Status.Get().State() if state != runtime.ContainerState_CONTAINER_RUNNING { return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state)) diff --git a/internal/cri/server/container_create.go b/internal/cri/server/container_create.go index 7d8fc9a49..6d97048c7 100644 --- a/internal/cri/server/container_create.go +++ b/internal/cri/server/container_create.go @@ -45,6 +45,7 @@ import ( "github.com/containerd/containerd/v2/internal/cri/util" "github.com/containerd/containerd/v2/pkg/blockio" "github.com/containerd/containerd/v2/pkg/oci" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/platforms" ) @@ -55,6 +56,7 @@ func init() { // CreateContainer creates a new container in the given PodSandbox. func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (_ *runtime.CreateContainerResponse, retErr error) { + span := tracing.SpanFromContext(ctx) config := r.GetConfig() log.G(ctx).Debugf("Container config %+v", config) sandboxConfig := r.GetSandboxConfig() @@ -72,7 +74,10 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta sandboxID = cstatus.SandboxID sandboxPid = cstatus.Pid ) - + span.SetAttributes( + tracing.Attribute("sandbox.id", sandboxID), + tracing.Attribute("sandbox.pid", sandboxPid), + ) // Generate unique id and name for the container and reserve the name. // Reserve the container name to avoid concurrent `CreateContainer` request creating // the same container. @@ -87,6 +92,10 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta if err = c.containerNameIndex.Reserve(name, id); err != nil { return nil, fmt.Errorf("failed to reserve container name %q: %w", name, err) } + span.SetAttributes( + tracing.Attribute("container.id", id), + tracing.Attribute("container.name", name), + ) defer func() { // Release the name if the function returns with an error. if retErr != nil { @@ -112,7 +121,9 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta if err != nil { return nil, fmt.Errorf("failed to get image from containerd %q: %w", image.ID, err) } - + span.SetAttributes( + tracing.Attribute("container.image.ref", containerdImage.Name()), + ) start := time.Now() // Create container root directory. @@ -345,6 +356,9 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start) + span.AddEvent("container created", + tracing.Attribute("container.create.duration", time.Since(start).String()), + ) return &runtime.CreateContainerResponse{ContainerId: id}, nil } diff --git a/internal/cri/server/container_exec.go b/internal/cri/server/container_exec.go index e33a59493..3642dbbf9 100644 --- a/internal/cri/server/container_exec.go +++ b/internal/cri/server/container_exec.go @@ -20,15 +20,18 @@ import ( "context" "fmt" + "github.com/containerd/containerd/v2/pkg/tracing" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" ) // Exec prepares a streaming endpoint to execute a command in the container, and returns the address. func (c *criService) Exec(ctx context.Context, r *runtime.ExecRequest) (*runtime.ExecResponse, error) { + span := tracing.SpanFromContext(ctx) cntr, err := c.containerStore.Get(r.GetContainerId()) if err != nil { return nil, fmt.Errorf("failed to find container %q in store: %w", r.GetContainerId(), err) } + span.SetAttributes(tracing.Attribute("container.id", cntr.ID)) state := cntr.Status.Get().State() if state != runtime.ContainerState_CONTAINER_RUNNING { return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state)) diff --git a/internal/cri/server/container_execsync.go b/internal/cri/server/container_execsync.go index 2555bfe71..c4b616a26 100644 --- a/internal/cri/server/container_execsync.go +++ b/internal/cri/server/container_execsync.go @@ -25,6 +25,7 @@ import ( "time" "github.com/containerd/containerd/v2/pkg/oci" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/errdefs" "github.com/containerd/log" "k8s.io/client-go/tools/remotecommand" @@ -271,6 +272,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont // this case, the CRI plugin will still have a goroutine waiting for the exec process // to exit and log the exit code, but dockershim won't. func (c *criService) execInContainer(ctx context.Context, id string, opts execOptions) (*uint32, error) { + span := tracing.SpanFromContext(ctx) // Get container from our container store. cntr, err := c.containerStore.Get(id) @@ -278,6 +280,7 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp return nil, fmt.Errorf("failed to find container %q in store: %w", id, err) } id = cntr.ID + span.SetAttributes(tracing.Attribute("container.id", id)) state := cntr.Status.Get().State() if state != runtime.ContainerState_CONTAINER_RUNNING { diff --git a/internal/cri/server/container_remove.go b/internal/cri/server/container_remove.go index c3e4d000e..a666456e3 100644 --- a/internal/cri/server/container_remove.go +++ b/internal/cri/server/container_remove.go @@ -24,6 +24,7 @@ import ( containerd "github.com/containerd/containerd/v2/client" containerstore "github.com/containerd/containerd/v2/internal/cri/store/container" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/errdefs" "github.com/containerd/log" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -31,6 +32,7 @@ import ( // RemoveContainer removes the container. func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (_ *runtime.RemoveContainerResponse, retErr error) { + span := tracing.SpanFromContext(ctx) start := time.Now() ctrID := r.GetContainerId() container, err := c.containerStore.Get(ctrID) @@ -43,6 +45,7 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta return &runtime.RemoveContainerResponse{}, nil } id := container.ID + span.SetAttributes(tracing.Attribute("container.id", id)) i, err := container.Container.Info(ctx) if err != nil { if !errdefs.IsNotFound(err) { @@ -129,6 +132,11 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta containerRemoveTimer.WithValues(i.Runtime.Name).UpdateSince(start) + span.AddEvent("container removed", + tracing.Attribute("container.id", container.ID), + tracing.Attribute("container.remove.duration", time.Since(start).String()), + ) + return &runtime.RemoveContainerResponse{}, nil } diff --git a/internal/cri/server/container_start.go b/internal/cri/server/container_start.go index 173f2468b..1e0681392 100644 --- a/internal/cri/server/container_start.go +++ b/internal/cri/server/container_start.go @@ -23,6 +23,7 @@ import ( "io" "time" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/errdefs" "github.com/containerd/log" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -38,12 +39,13 @@ import ( // StartContainer starts the container. func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (retRes *runtime.StartContainerResponse, retErr error) { + span := tracing.SpanFromContext(ctx) start := time.Now() cntr, err := c.containerStore.Get(r.GetContainerId()) if err != nil { return nil, fmt.Errorf("an error occurred when try to find container %q: %w", r.GetContainerId(), err) } - + span.SetAttributes(tracing.Attribute("container.id", cntr.ID)) info, err := cntr.Container.Info(ctx) if err != nil { return nil, fmt.Errorf("get container info: %w", err) @@ -87,6 +89,7 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain if sandbox.Status.Get().State != sandboxstore.StateReady { return nil, fmt.Errorf("sandbox container %q is not running", sandboxID) } + span.SetAttributes(tracing.Attribute("sandbox.id", sandboxID)) // Recheck target container validity in Linux namespace options. if linux := config.GetLinux(); linux != nil { @@ -190,6 +193,10 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start) + span.AddEvent("container started", + tracing.Attribute("container.start.duration", time.Since(start).String()), + ) + return &runtime.StartContainerResponse{}, nil } diff --git a/internal/cri/server/container_stop.go b/internal/cri/server/container_stop.go index 5ff793c4b..736911fbd 100644 --- a/internal/cri/server/container_stop.go +++ b/internal/cri/server/container_stop.go @@ -27,6 +27,7 @@ import ( containerstore "github.com/containerd/containerd/v2/internal/cri/store/container" ctrdutil "github.com/containerd/containerd/v2/internal/cri/util" "github.com/containerd/containerd/v2/pkg/protobuf" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/errdefs" "github.com/containerd/log" @@ -36,6 +37,7 @@ import ( // StopContainer stops a running container with a grace period (i.e., timeout). func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (*runtime.StopContainerResponse, error) { + span := tracing.SpanFromContext(ctx) start := time.Now() // Get container config from container store. container, err := c.containerStore.Get(r.GetContainerId()) @@ -49,7 +51,7 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer // https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L67-L68 return &runtime.StopContainerResponse{}, nil } - + span.SetAttributes(tracing.Attribute("container.id", container.ID)) if err := c.stopContainer(ctx, container, time.Duration(r.GetTimeout())*time.Second); err != nil { return nil, err } @@ -76,6 +78,8 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer // stopContainer stops a container based on the container metadata. func (c *criService) stopContainer(ctx context.Context, container containerstore.Container, timeout time.Duration) error { + span := tracing.SpanFromContext(ctx) + start := time.Now() id := container.ID sandboxID := container.SandboxID @@ -199,6 +203,12 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore if err != nil { return fmt.Errorf("an error occurs during waiting for container %q to be killed: %w", id, err) } + + span.AddEvent("container stopped", + tracing.Attribute("container.id", id), + tracing.Attribute("container.stop.duration", time.Since(start).String()), + ) + return nil } diff --git a/internal/cri/server/sandbox_remove.go b/internal/cri/server/sandbox_remove.go index 1829accd8..f7e1ed880 100644 --- a/internal/cri/server/sandbox_remove.go +++ b/internal/cri/server/sandbox_remove.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/errdefs" "github.com/containerd/log" @@ -30,6 +31,7 @@ import ( // RemovePodSandbox removes the sandbox. If there are running containers in the // sandbox, they should be forcibly removed. func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodSandboxRequest) (*runtime.RemovePodSandboxResponse, error) { + span := tracing.SpanFromContext(ctx) start := time.Now() sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId()) if err != nil { @@ -44,6 +46,7 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS } // Use the full sandbox id. id := sandbox.ID + span.SetAttributes(tracing.Attribute("sandbox.id", id)) // If the sandbox is still running, not ready, or in an unknown state, forcibly stop it. // Even if it's in a NotReady state, this will close its network namespace, if open. @@ -110,5 +113,9 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS sandboxRemoveTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(start) + span.AddEvent("pod sandbox removed", + tracing.Attribute("sandbox.remove.duration", time.Since(start).String()), + ) + return &runtime.RemovePodSandboxResponse{}, nil } diff --git a/internal/cri/server/sandbox_run.go b/internal/cri/server/sandbox_run.go index 8c0cb2a5e..2ab43bb39 100644 --- a/internal/cri/server/sandbox_run.go +++ b/internal/cri/server/sandbox_run.go @@ -39,6 +39,7 @@ import ( sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox" "github.com/containerd/containerd/v2/internal/cri/util" "github.com/containerd/containerd/v2/pkg/netns" + "github.com/containerd/containerd/v2/pkg/tracing" ) func init() { @@ -49,6 +50,7 @@ func init() { // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // the sandbox is in ready state. func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) { + span := tracing.SpanFromContext(ctx) config := r.GetConfig() log.G(ctx).Debugf("Sandbox config %+v", config) @@ -59,6 +61,11 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox return nil, errors.New("sandbox config must include metadata") } name := makeSandboxName(metadata) + + span.SetAttributes( + tracing.Attribute("sandbox.id", id), + tracing.Attribute("sandbox.name", name), + ) log.G(ctx).WithField("podsandboxid", id).Debugf("generated id for sandbox name %q", name) // cleanupErr records the last error returned by the critical cleanup operations in deferred functions, @@ -172,6 +179,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox // // To simplify this, in the future, we should just remove this case (podNetwork && // !userNsEnabled) and just keep the other case (podNetwork && userNsEnabled). + span.AddEvent("setup pod network") netStart := time.Now() // If it is not in host network namespace then create a namespace and set the sandbox // handle. NetNSPath in sandbox metadata and NetNS is non empty only for non host network @@ -356,6 +364,10 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox return nil, fmt.Errorf("failed to setup network for sandbox %q: %w", id, err) } sandboxCreateNetworkTimer.UpdateSince(netStart) + + span.AddEvent("finished pod network setup", + tracing.Attribute("pod.network.setup.duration", time.Since(netStart).String()), + ) } // TODO: get rid of this. sandbox object should no longer have Container field. diff --git a/internal/cri/server/sandbox_stop.go b/internal/cri/server/sandbox_stop.go index 0194575f4..a5b790eba 100644 --- a/internal/cri/server/sandbox_stop.go +++ b/internal/cri/server/sandbox_stop.go @@ -22,6 +22,7 @@ import ( "fmt" "time" + "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/log" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -32,6 +33,7 @@ import ( // StopPodSandbox stops the sandbox. If there are any running containers in the // sandbox, they should be forcibly terminated. func (c *criService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (*runtime.StopPodSandboxResponse, error) { + span := tracing.SpanFromContext(ctx) sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId()) if err != nil { if !errdefs.IsNotFound(err) { @@ -44,7 +46,7 @@ func (c *criService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandb // https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L45-L46 return &runtime.StopPodSandboxResponse{}, nil } - + span.SetAttributes(tracing.Attribute("sandbox.id", sandbox.ID)) if err := c.stopPodSandbox(ctx, sandbox); err != nil { return nil, err } @@ -53,12 +55,14 @@ func (c *criService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandb } func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sandbox) error { + span := tracing.SpanFromContext(ctx) // Use the full sandbox id. id := sandbox.ID // Stop all containers inside the sandbox. This terminates the container forcibly, // and container may still be created, so production should not rely on this behavior. // TODO(random-liu): Introduce a state in sandbox to avoid future container creation. + span.AddEvent("stopping containers in the sandbox") stop := time.Now() containers := c.containerStore.List() for _, container := range containers { @@ -87,6 +91,10 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop) + span.AddEvent("sandbox container stopped", + tracing.Attribute("sandbox.stop.duration", time.Since(stop).String()), + ) + err := c.nri.StopPodSandbox(ctx, &sandbox) if err != nil { log.G(ctx).WithError(err).Errorf("NRI sandbox stop notification failed") @@ -94,6 +102,7 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa // Teardown network for sandbox. if sandbox.NetNS != nil { + span.AddEvent("start pod network teardown") netStop := time.Now() // Use empty netns path if netns is not available. This is defined in: // https://github.com/containernetworking/cni/blob/v0.7.0-alpha1/SPEC.md @@ -109,10 +118,13 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa return fmt.Errorf("failed to remove network namespace for sandbox %q: %w", id, err) } sandboxDeleteNetwork.UpdateSince(netStop) + + span.AddEvent("finished pod network teardown", + tracing.Attribute("network.teardown.duration", time.Since(netStop).String()), + ) } log.G(ctx).Infof("TearDown network for sandbox %q successfully", id) - return nil } diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index c6c6b1264..ce8a9d6ba 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -36,6 +36,14 @@ type StartConfig struct { type SpanOpt func(config *StartConfig) +// WithAttribute appends attributes to a new created span. +func WithAttribute(k string, v interface{}) SpanOpt { + return func(config *StartConfig) { + config.spanOpts = append(config.spanOpts, + trace.WithAttributes(Attribute(k, v))) + } +} + // UpdateHTTPClient updates the http client with the necessary otel transport func UpdateHTTPClient(client *http.Client, name string) { client.Transport = otelhttp.NewTransport( @@ -80,8 +88,8 @@ func (s *Span) End() { } // AddEvent adds an event with provided name and options. -func (s *Span) AddEvent(name string, options ...trace.EventOption) { - s.otelSpan.AddEvent(name, options...) +func (s *Span) AddEvent(name string, attributes ...attribute.KeyValue) { + s.otelSpan.AddEvent(name, trace.WithAttributes(attributes...)) } // SetStatus sets the status of the current span.