Add tracing spans in CRI image service and pull.go

Signed-off-by: Swagat Bora <sbora@amazon.com>

Add spans around image unpack operations
Use image.ref to denote image name and image.id for the image config digest
Add top-level spand and record errors in the CRI instrumentation service
This commit is contained in:
Swagat Bora 2022-09-29 20:22:55 +00:00
parent bb0c3804c6
commit 3b87d46ce2
18 changed files with 226 additions and 10 deletions

View File

@ -82,7 +82,7 @@ func clientWithTrace() error {
defer cancel()
ctx, span := tracing.StartSpan(ctx, "OPERATION NAME")
defer span.End()
defer tracing.StopSpan(span)
...
}
```

View File

@ -33,6 +33,8 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/godbus/dbus/v5 v5.0.6 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
@ -50,6 +52,8 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
golang.org/x/mod v0.6.0 // indirect
golang.org/x/net v0.1.0 // indirect
golang.org/x/sync v0.1.0 // indirect

View File

@ -258,7 +258,9 @@ github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTg
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-logr/zapr v1.2.0/go.mod h1:Qa4Bsj2Vb+FAVeAKsLD8RLQ+YRJB8YDmOAKxaBQf7Ro=
github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0=
@ -704,6 +706,7 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.2
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0/go.mod h1:J0dBVrt7dPS/lKJyQoW0xzQiUr4r2Ik1VwPjAUWnofI=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0/go.mod h1:2AboqHi0CiIZU0qwhtUfCYD1GeUzvvIXWNkhDt7ZMG4=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel/exporters/otlp v0.20.0/go.mod h1:YIieizyaN77rtLJra0buKiNBOm9XQfkPEKBeuhoMwAM=
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0/go.mod h1:M1hVZHNxcbkAlcvrOMlpQ4YOO3Awf+4N2dxkZL3xm04=
@ -717,6 +720,7 @@ go.opentelemetry.io/otel/sdk v1.7.0/go.mod h1:uTEOTwaqIVuTGiJN7ii13Ibp75wJmYUDe3
go.opentelemetry.io/otel/sdk/export/metric v0.20.0/go.mod h1:h7RBNMsDJ5pmI1zExLi+bJK+Dr8NQCh0qGhm1KDnNlE=
go.opentelemetry.io/otel/sdk/metric v0.20.0/go.mod h1:knxiS8Xd4E/N+ZqKmUPf3gTTZ4/0TjTXukfxjzSTpHE=
go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o=
go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.16.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=

View File

@ -95,6 +95,11 @@ const (
// runtimeRunhcsV1 is the runtime type for runhcs.
runtimeRunhcsV1 = "io.containerd.runhcs.v1"
// name prefix for CRI sbserver specific spans
criSbServerSpanPrefix = "pkg.cri.sbserver"
spanDelimiter = "."
)
// makeSandboxName generates sandbox name from sandbox metadata. The name
@ -501,3 +506,10 @@ func copyResourcesToStatus(spec *runtimespec.Spec, status containerstore.Status)
}
return status
}
func makeSpanName(funcName string) string {
return strings.Join([]string{
criSbServerSpanPrefix,
funcName,
}, spanDelimiter)
}

View File

@ -36,6 +36,7 @@ import (
"github.com/containerd/imgcrypt"
"github.com/containerd/imgcrypt/images/encryption"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"go.opentelemetry.io/otel/attribute"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd"
@ -48,6 +49,7 @@ import (
distribution "github.com/containerd/containerd/reference/docker"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/config"
"github.com/containerd/containerd/tracing"
)
// For image management:
@ -93,6 +95,7 @@ import (
// PullImage pulls an image with authentication config.
func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (*runtime.PullImageResponse, error) {
span := tracing.CurrentSpan(ctx)
imageRef := r.GetImage().GetImage()
namedRef, err := distribution.ParseDockerRef(imageRef)
if err != nil {
@ -133,6 +136,10 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
return nil, err
}
log.G(ctx).Debugf("PullImage %q with snapshotter %s", ref, snapshotter)
span.SetAttributes(
attribute.String("image.ref", ref),
attribute.String("snapshotter.name", snapshotter),
)
pullOpts := []containerd.RemoteOpt{
containerd.WithSchema1Conversion, //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
@ -165,6 +172,7 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
if err != nil {
return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
}
span.AddEvent("Pull and unpack image complete")
configDesc, err := image.Config(ctx)
if err != nil {

View File

@ -22,6 +22,8 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/tracing"
"go.opentelemetry.io/otel/attribute"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
@ -33,15 +35,17 @@ import (
// Remove the whole image no matter the it's image id or reference. This is the
// semantic defined in CRI now.
func (c *criService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) {
span := tracing.CurrentSpan(ctx)
image, err := c.localResolve(r.GetImage().GetImage())
if err != nil {
if errdefs.IsNotFound(err) {
span.AddEvent(err.Error())
// return empty without error when image not found.
return &runtime.RemoveImageResponse{}, nil
}
return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err)
}
span.SetAttributes(attribute.String("image.id", image.ID))
// Remove all image references.
for i, ref := range image.References {
var opts []images.DeleteOpt

View File

@ -24,6 +24,8 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
"github.com/containerd/containerd/tracing"
"go.opentelemetry.io/otel/attribute"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -33,14 +35,17 @@ import (
// TODO(random-liu): We should change CRI to distinguish image id and image spec. (See
// kubernetes/kubernetes#46255)
func (c *criService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (*runtime.ImageStatusResponse, error) {
span := tracing.CurrentSpan(ctx)
image, err := c.localResolve(r.GetImage().GetImage())
if err != nil {
if errdefs.IsNotFound(err) {
span.AddEvent(err.Error())
// return empty without error when image not found.
return &runtime.ImageStatusResponse{}, nil
}
return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err)
}
span.SetAttributes(attribute.String("image.id", image.ID))
// TODO(random-liu): [P0] Make sure corresponding snapshot exists. What if snapshot
// doesn't exist?

View File

@ -22,6 +22,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/tracing"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
runtime_alpha "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
@ -931,6 +932,8 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("PullImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -939,6 +942,7 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma
log.G(ctx).Infof("PullImage %q returns image reference %q",
r.GetImage().GetImage(), res.GetImageRef())
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.PullImage(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -948,6 +952,8 @@ func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_al
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("PullImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -956,6 +962,7 @@ func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_al
log.G(ctx).Infof("PullImage %q returns image reference %q",
r.GetImage().GetImage(), res.GetImageRef())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.PullImageRequest
@ -986,6 +993,8 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ListImages"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter())
defer func() {
if err != nil {
@ -994,6 +1003,7 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm
log.G(ctx).Tracef("ListImages with filter %+v returns image list %+v",
r.GetFilter(), res.GetImages())
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.ListImages(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1003,6 +1013,8 @@ func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_a
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ListImages"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter())
defer func() {
if err != nil {
@ -1011,6 +1023,7 @@ func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_a
log.G(ctx).Tracef("ListImages with filter %+v returns image list %+v",
r.GetFilter(), res.GetImages())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ListImagesRequest
@ -1041,6 +1054,8 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageStatus"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1049,6 +1064,7 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image
log.G(ctx).Tracef("ImageStatus for %q returns image status %+v",
r.GetImage().GetImage(), res.GetImage())
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.ImageStatus(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1058,6 +1074,8 @@ func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageStatus"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1066,6 +1084,7 @@ func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_
log.G(ctx).Tracef("ImageStatus for %q returns image status %+v",
r.GetImage().GetImage(), res.GetImage())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ImageStatusRequest
@ -1096,6 +1115,8 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("RemoveImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1103,6 +1124,7 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
} else {
log.G(ctx).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage())
}
tracing.SetSpanStatus(span, err)
}()
res, err := in.c.RemoveImage(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1112,6 +1134,8 @@ func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("RemoveImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1119,6 +1143,7 @@ func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_
} else {
log.G(ctx).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.RemoveImageRequest
@ -1149,6 +1174,8 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageFsInfo"))
defer tracing.StopSpan(span)
log.G(ctx).Debugf("ImageFsInfo")
defer func() {
if err != nil {
@ -1156,6 +1183,7 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image
} else {
log.G(ctx).Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems)
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.ImageFsInfo(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1165,6 +1193,8 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageFsInfo"))
defer tracing.StopSpan(span)
log.G(ctx).Debugf("ImageFsInfo")
defer func() {
if err != nil {
@ -1172,6 +1202,7 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_
} else {
log.G(ctx).Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems)
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ImageFsInfoRequest

View File

@ -92,6 +92,11 @@ const (
// runtimeRunhcsV1 is the runtime type for runhcs.
runtimeRunhcsV1 = "io.containerd.runhcs.v1"
// name prefix for CRI server specific spans
criServerSpanPrefix = "pkg.cri.server"
spanDelimiter = "."
)
// makeSandboxName generates sandbox name from sandbox metadata. The name
@ -510,3 +515,10 @@ func copyResourcesToStatus(spec *runtimespec.Spec, status containerstore.Status)
}
return status
}
func makeSpanName(funcName string) string {
return strings.Join([]string{
criServerSpanPrefix,
funcName,
}, spanDelimiter)
}

View File

@ -36,6 +36,7 @@ import (
"github.com/containerd/imgcrypt"
"github.com/containerd/imgcrypt/images/encryption"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"go.opentelemetry.io/otel/attribute"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd"
@ -48,6 +49,7 @@ import (
distribution "github.com/containerd/containerd/reference/docker"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/config"
"github.com/containerd/containerd/tracing"
)
// For image management:
@ -93,6 +95,7 @@ import (
// PullImage pulls an image with authentication config.
func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (*runtime.PullImageResponse, error) {
span := tracing.CurrentSpan(ctx)
imageRef := r.GetImage().GetImage()
namedRef, err := distribution.ParseDockerRef(imageRef)
if err != nil {
@ -133,7 +136,10 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
return nil, err
}
log.G(ctx).Debugf("PullImage %q with snapshotter %s", ref, snapshotter)
span.SetAttributes(
attribute.String("image.ref", ref),
attribute.String("snapshotter.name", snapshotter),
)
pullOpts := []containerd.RemoteOpt{
containerd.WithSchema1Conversion, //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
containerd.WithResolver(resolver),
@ -165,6 +171,7 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
if err != nil {
return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
}
span.AddEvent("Pull and unpack image complete")
configDesc, err := image.Config(ctx)
if err != nil {

View File

@ -22,6 +22,8 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/tracing"
"go.opentelemetry.io/otel/attribute"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
@ -33,15 +35,17 @@ import (
// Remove the whole image no matter the it's image id or reference. This is the
// semantic defined in CRI now.
func (c *criService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) {
span := tracing.CurrentSpan(ctx)
image, err := c.localResolve(r.GetImage().GetImage())
if err != nil {
if errdefs.IsNotFound(err) {
span.AddEvent(err.Error())
// return empty without error when image not found.
return &runtime.RemoveImageResponse{}, nil
}
return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err)
}
span.SetAttributes(attribute.String("image.id", image.ID))
// Remove all image references.
for i, ref := range image.References {
var opts []images.DeleteOpt

View File

@ -24,6 +24,8 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
"github.com/containerd/containerd/tracing"
"go.opentelemetry.io/otel/attribute"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -33,14 +35,17 @@ import (
// TODO(random-liu): We should change CRI to distinguish image id and image spec. (See
// kubernetes/kubernetes#46255)
func (c *criService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (*runtime.ImageStatusResponse, error) {
span := tracing.CurrentSpan(ctx)
image, err := c.localResolve(r.GetImage().GetImage())
if err != nil {
if errdefs.IsNotFound(err) {
span.AddEvent(err.Error())
// return empty without error when image not found.
return &runtime.ImageStatusResponse{}, nil
}
return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err)
}
span.SetAttributes(attribute.String("image.id", image.ID))
// TODO(random-liu): [P0] Make sure corresponding snapshot exists. What if snapshot
// doesn't exist?

View File

@ -22,6 +22,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/tracing"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
runtime_alpha "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
@ -931,6 +932,8 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("PullImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -939,6 +942,7 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma
log.G(ctx).Infof("PullImage %q returns image reference %q",
r.GetImage().GetImage(), res.GetImageRef())
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.PullImage(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -948,6 +952,8 @@ func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_al
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("PullImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -956,6 +962,7 @@ func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_al
log.G(ctx).Infof("PullImage %q returns image reference %q",
r.GetImage().GetImage(), res.GetImageRef())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.PullImageRequest
@ -986,6 +993,8 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ListImages"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter())
defer func() {
if err != nil {
@ -994,6 +1003,7 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm
log.G(ctx).Tracef("ListImages with filter %+v returns image list %+v",
r.GetFilter(), res.GetImages())
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.ListImages(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1003,6 +1013,8 @@ func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_a
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ListImages"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter())
defer func() {
if err != nil {
@ -1011,6 +1023,7 @@ func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_a
log.G(ctx).Tracef("ListImages with filter %+v returns image list %+v",
r.GetFilter(), res.GetImages())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ListImagesRequest
@ -1041,6 +1054,8 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageStatus"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1049,6 +1064,7 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image
log.G(ctx).Tracef("ImageStatus for %q returns image status %+v",
r.GetImage().GetImage(), res.GetImage())
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.ImageStatus(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1058,6 +1074,8 @@ func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageStatus"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1066,6 +1084,7 @@ func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_
log.G(ctx).Tracef("ImageStatus for %q returns image status %+v",
r.GetImage().GetImage(), res.GetImage())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ImageStatusRequest
@ -1096,6 +1115,8 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("RemoveImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1103,6 +1124,7 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
} else {
log.G(ctx).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage())
}
tracing.SetSpanStatus(span, err)
}()
res, err := in.c.RemoveImage(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1112,6 +1134,8 @@ func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("RemoveImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1119,6 +1143,7 @@ func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_
} else {
log.G(ctx).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.RemoveImageRequest
@ -1149,6 +1174,8 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageFsInfo"))
defer tracing.StopSpan(span)
log.G(ctx).Debugf("ImageFsInfo")
defer func() {
if err != nil {
@ -1156,6 +1183,7 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image
} else {
log.G(ctx).Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems)
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.ImageFsInfo(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1165,6 +1193,8 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageFsInfo"))
defer tracing.StopSpan(span)
log.G(ctx).Debugf("ImageFsInfo")
defer func() {
if err != nil {
@ -1172,6 +1202,7 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_
} else {
log.G(ctx).Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems)
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ImageFsInfoRequest

View File

@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
@ -36,10 +37,12 @@ import (
"github.com/containerd/containerd/pkg/kmutex"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/containerd/tracing"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
@ -160,6 +163,11 @@ func (u *Unpacker) Unpack(h images.Handler) images.Handler {
layers = map[digest.Digest][]ocispec.Descriptor{}
)
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ctx, span := tracing.StartSpan(ctx, "pkg.unpack.unpacker.UnpackHandler")
defer span.End()
span.SetAttributes(
attribute.String("descriptor.media.type", desc.MediaType),
attribute.String("descriptor.digest", desc.Digest.String()))
unlock, err := u.lockBlobDescriptor(ctx, desc)
if err != nil {
return nil, err
@ -174,10 +182,12 @@ func (u *Unpacker) Unpack(h images.Handler) images.Handler {
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
var nonLayers []ocispec.Descriptor
var manifestLayers []ocispec.Descriptor
// Split layers from non-layers, layers will be handled after
// the config
for _, child := range children {
for i, child := range children {
span.SetAttributes(
attribute.StringSlice("descriptor.child."+strconv.Itoa(i), []string{child.MediaType, child.Digest.String()}),
)
if images.IsLayerType(child.MediaType) {
manifestLayers = append(manifestLayers, child)
} else {
@ -223,6 +233,8 @@ func (u *Unpacker) unpack(
layers []ocispec.Descriptor,
) error {
ctx := u.ctx
ctx, layerSpan := tracing.StartSpan(ctx, "pkg.unpack.unpacker.unpack")
defer layerSpan.End()
p, err := content.ReadBlob(ctx, u.content, config)
if err != nil {
return err
@ -398,9 +410,18 @@ func (u *Unpacker) unpack(
}
for i, desc := range layers {
_, layerSpan := tracing.StartSpan(ctx, "pkg.unpack.unpacker.unpackLayer")
layerSpan.SetAttributes(
attribute.String("layer.media.type", desc.MediaType),
attribute.Int64("layer.media.size", desc.Size),
attribute.String("layer.media.digest", desc.Digest.String()),
)
if err := doUnpackFn(i, desc); err != nil {
layerSpan.RecordError(err)
layerSpan.End()
return err
}
layerSpan.End()
}
chainID := identity.ChainID(chain).String()
@ -425,9 +446,14 @@ func (u *Unpacker) unpack(
func (u *Unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error {
eg, ctx2 := errgroup.WithContext(ctx)
for i, desc := range layers {
ctx2, layerSpan := tracing.StartSpan(ctx2, "pkg.unpack.unpacker.fetchLayer")
layerSpan.SetAttributes(
attribute.String("layer.media.type", desc.MediaType),
attribute.Int64("layer.media.size", desc.Size),
attribute.String("layer.media.digest", desc.Digest.String()),
)
desc := desc
i := i
if err := u.acquire(ctx); err != nil {
return err
}
@ -451,6 +477,7 @@ func (u *Unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec
return nil
})
layerSpan.End()
}
return eg.Wait()

24
pull.go
View File

@ -28,14 +28,20 @@ import (
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/schema1" //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
"github.com/containerd/containerd/tracing"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/semaphore"
)
// Pull downloads the provided content into containerd's content store
// and returns a platform specific image object
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Image, retErr error) {
ctx, span := tracing.StartSpan(ctx, "pull.Pull")
defer tracing.StopSpan(span)
pullCtx := defaultRemoteContext()
for _, o := range opts {
if err := o(c, pullCtx); err != nil {
return nil, err
@ -57,6 +63,14 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
}
}
span.SetAttributes(
attribute.String("image.ref", ref),
attribute.Bool("unpack", pullCtx.Unpack),
attribute.Int("max.concurrent.downloads", pullCtx.MaxConcurrentDownloads),
attribute.Int("max.concurrent.upload.layers", pullCtx.MaxConcurrentUploadedLayers),
attribute.Int("platforms.count", len(pullCtx.Platforms)),
)
ctx, done, err := c.WithLease(ctx)
if err != nil {
return nil, err
@ -70,6 +84,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
if err != nil {
return nil, fmt.Errorf("unable to resolve snapshotter: %w", err)
}
span.SetAttributes(attribute.String("snapshotter.name", snapshotterName))
var uconfig UnpackConfig
for _, opt := range pullCtx.UnpackOpts {
if err := opt(ctx, &uconfig); err != nil {
@ -127,9 +142,13 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
// download).
var ur unpack.Result
if unpacker != nil {
_, unpackSpan := tracing.StartSpan(ctx, "pull.UnpackWait")
if ur, err = unpacker.Wait(); err != nil {
unpackSpan.RecordError(err)
tracing.StopSpan(unpackSpan)
return nil, err
}
tracing.StopSpan(unpackSpan)
}
img, err = c.createNewImage(ctx, img)
@ -138,6 +157,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
}
i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher)
span.SetAttributes(attribute.String("image.ref", i.Name()))
if unpacker != nil && ur.Unpacks == 0 {
// Unpack was tried previously but nothing was unpacked
@ -151,6 +171,8 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
}
func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, limit int) (images.Image, error) {
ctx, span := tracing.StartSpan(ctx, "pull.fetch")
defer tracing.StopSpan(span)
store := c.ContentStore()
name, desc, err := rCtx.Resolver.Resolve(ctx, ref)
if err != nil {
@ -254,6 +276,8 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim
}
func (c *Client) createNewImage(ctx context.Context, img images.Image) (images.Image, error) {
ctx, span := tracing.StartSpan(ctx, "pull.createNewImage")
defer tracing.StopSpan(span)
is := c.ImageService()
for {
if created, err := is.Create(ctx, img); err != nil {

View File

@ -34,10 +34,13 @@ import (
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker/schema1" //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
remoteerrors "github.com/containerd/containerd/remotes/errors"
"github.com/containerd/containerd/tracing"
"github.com/containerd/containerd/version"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
oteltrace "go.opentelemetry.io/otel/trace"
)
var (
@ -566,11 +569,20 @@ func (r *request) do(ctx context.Context) (*http.Response, error) {
return nil
}
}
_, httpSpan := tracing.StartSpan(
ctx,
"remotes.docker.resolver.HTTPRequest",
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
oteltrace.WithAttributes(semconv.HTTPClientAttributesFromHTTPRequest(req)...),
)
resp, err := client.Do(req)
if err != nil {
httpSpan.RecordError(err)
tracing.StopSpan(httpSpan)
return nil, fmt.Errorf("failed to do request: %w", err)
}
httpSpan.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(resp.StatusCode)...)
tracing.StopSpan(httpSpan)
log.G(ctx).WithFields(responseFields(resp)).Debug("fetch response received")
return resp, nil
}

View File

@ -137,6 +137,7 @@ func newTracer(ic *plugin.InitContext) (io.Closer, error) {
config := ic.Config.(*TraceConfig)
res, err := resource.New(ctx,
resource.WithHost(),
resource.WithAttributes(
// Service name used to displace traces in backends
semconv.ServiceNameKey.String(config.ServiceName),
@ -146,8 +147,10 @@ func newTracer(ic *plugin.InitContext) (io.Closer, error) {
return nil, fmt.Errorf("failed to create resource: %w", err)
}
sampler := sdktrace.ParentBased(sdktrace.TraceIDRatioBased(config.TraceSamplingRatio))
opts := []sdktrace.TracerProviderOption{
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(config.TraceSamplingRatio)),
sdktrace.WithSampler(sampler),
sdktrace.WithResource(res),
}
@ -175,7 +178,8 @@ func newTracer(ic *plugin.InitContext) (io.Closer, error) {
provider := sdktrace.NewTracerProvider(opts...)
otel.SetTracerProvider(provider)
otel.SetTextMapPropagator(propagation.TraceContext{})
otel.SetTextMapPropagator(propagators())
return &closer{close: func() error {
for _, p := range procs {
@ -186,3 +190,8 @@ func newTracer(ic *plugin.InitContext) (io.Closer, error) {
return nil
}}, nil
}
// Returns a composite TestMap propagator
func propagators() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
}

View File

@ -20,6 +20,7 @@ import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
@ -35,3 +36,19 @@ func StartSpan(ctx context.Context, opName string, opts ...trace.SpanStartOption
func StopSpan(span trace.Span) {
span.End()
}
// CurrentSpan returns current span from context or noopSpan if no span exists.
func CurrentSpan(ctx context.Context) trace.Span {
return trace.SpanFromContext(ctx)
}
// SetSpanStatus sets the status of the current span.
// If an error is encountered, it records the error and sets span status to Error.
func SetSpanStatus(span trace.Span, err error) {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
}