Merge pull request #7453 from swagatbora90/trace-cri-image

Add tracing spans in CRI image service and pull.go
This commit is contained in:
Maksym Pavlenko 2022-11-03 20:57:17 -07:00 committed by GitHub
commit b2a01eedf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 308 additions and 74 deletions

View File

@ -82,7 +82,7 @@ func clientWithTrace() error {
defer cancel()
ctx, span := tracing.StartSpan(ctx, "OPERATION NAME")
defer span.End()
defer tracing.StopSpan(span)
...
}
```

View File

@ -33,6 +33,8 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/godbus/dbus/v5 v5.0.6 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
@ -50,6 +52,8 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
golang.org/x/mod v0.6.0 // indirect
golang.org/x/net v0.1.0 // indirect
golang.org/x/sync v0.1.0 // indirect

View File

@ -258,7 +258,9 @@ github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTg
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-logr/zapr v1.2.0/go.mod h1:Qa4Bsj2Vb+FAVeAKsLD8RLQ+YRJB8YDmOAKxaBQf7Ro=
github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0=
@ -704,6 +706,7 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.2
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0/go.mod h1:J0dBVrt7dPS/lKJyQoW0xzQiUr4r2Ik1VwPjAUWnofI=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0/go.mod h1:2AboqHi0CiIZU0qwhtUfCYD1GeUzvvIXWNkhDt7ZMG4=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel/exporters/otlp v0.20.0/go.mod h1:YIieizyaN77rtLJra0buKiNBOm9XQfkPEKBeuhoMwAM=
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0/go.mod h1:M1hVZHNxcbkAlcvrOMlpQ4YOO3Awf+4N2dxkZL3xm04=
@ -717,6 +720,7 @@ go.opentelemetry.io/otel/sdk v1.7.0/go.mod h1:uTEOTwaqIVuTGiJN7ii13Ibp75wJmYUDe3
go.opentelemetry.io/otel/sdk/export/metric v0.20.0/go.mod h1:h7RBNMsDJ5pmI1zExLi+bJK+Dr8NQCh0qGhm1KDnNlE=
go.opentelemetry.io/otel/sdk/metric v0.20.0/go.mod h1:knxiS8Xd4E/N+ZqKmUPf3gTTZ4/0TjTXukfxjzSTpHE=
go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o=
go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.16.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=

View File

@ -95,6 +95,11 @@ const (
// runtimeRunhcsV1 is the runtime type for runhcs.
runtimeRunhcsV1 = "io.containerd.runhcs.v1"
// name prefix for CRI sbserver specific spans
criSbServerSpanPrefix = "pkg.cri.sbserver"
spanDelimiter = "."
)
// makeSandboxName generates sandbox name from sandbox metadata. The name
@ -501,3 +506,10 @@ func copyResourcesToStatus(spec *runtimespec.Spec, status containerstore.Status)
}
return status
}
func makeSpanName(funcName string) string {
return strings.Join([]string{
criSbServerSpanPrefix,
funcName,
}, spanDelimiter)
}

View File

@ -48,6 +48,7 @@ import (
distribution "github.com/containerd/containerd/reference/docker"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/config"
"github.com/containerd/containerd/tracing"
)
// For image management:
@ -93,6 +94,7 @@ import (
// PullImage pulls an image with authentication config.
func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (*runtime.PullImageResponse, error) {
span := tracing.CurrentSpan(ctx)
imageRef := r.GetImage().GetImage()
namedRef, err := distribution.ParseDockerRef(imageRef)
if err != nil {
@ -133,6 +135,10 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
return nil, err
}
log.G(ctx).Debugf("PullImage %q with snapshotter %s", ref, snapshotter)
span.SetAttributes(
tracing.SpanAttribute("image.ref", ref),
tracing.SpanAttribute("snapshotter.name", snapshotter),
)
pullOpts := []containerd.RemoteOpt{
containerd.WithSchema1Conversion, //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
@ -165,6 +171,7 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
if err != nil {
return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
}
span.AddEvent("Pull and unpack image complete")
configDesc, err := image.Config(ctx)
if err != nil {

View File

@ -22,6 +22,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/tracing"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
@ -33,15 +34,17 @@ import (
// Remove the whole image no matter the it's image id or reference. This is the
// semantic defined in CRI now.
func (c *criService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) {
span := tracing.CurrentSpan(ctx)
image, err := c.localResolve(r.GetImage().GetImage())
if err != nil {
if errdefs.IsNotFound(err) {
span.AddEvent(err.Error())
// return empty without error when image not found.
return &runtime.RemoveImageResponse{}, nil
}
return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err)
}
span.SetAttributes(tracing.SpanAttribute("image.id", image.ID))
// Remove all image references.
for i, ref := range image.References {
var opts []images.DeleteOpt

View File

@ -24,6 +24,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
"github.com/containerd/containerd/tracing"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -33,14 +34,17 @@ import (
// TODO(random-liu): We should change CRI to distinguish image id and image spec. (See
// kubernetes/kubernetes#46255)
func (c *criService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (*runtime.ImageStatusResponse, error) {
span := tracing.CurrentSpan(ctx)
image, err := c.localResolve(r.GetImage().GetImage())
if err != nil {
if errdefs.IsNotFound(err) {
span.AddEvent(err.Error())
// return empty without error when image not found.
return &runtime.ImageStatusResponse{}, nil
}
return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err)
}
span.SetAttributes(tracing.SpanAttribute("image.id", image.ID))
// TODO(random-liu): [P0] Make sure corresponding snapshot exists. What if snapshot
// doesn't exist?

View File

@ -22,6 +22,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/tracing"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
runtime_alpha "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
@ -931,6 +932,8 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("PullImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -939,6 +942,7 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma
log.G(ctx).Infof("PullImage %q returns image reference %q",
r.GetImage().GetImage(), res.GetImageRef())
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.PullImage(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -948,6 +952,8 @@ func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_al
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("PullImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -956,6 +962,7 @@ func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_al
log.G(ctx).Infof("PullImage %q returns image reference %q",
r.GetImage().GetImage(), res.GetImageRef())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.PullImageRequest
@ -986,6 +993,8 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ListImages"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter())
defer func() {
if err != nil {
@ -994,6 +1003,7 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm
log.G(ctx).Tracef("ListImages with filter %+v returns image list %+v",
r.GetFilter(), res.GetImages())
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.ListImages(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1003,6 +1013,8 @@ func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_a
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ListImages"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter())
defer func() {
if err != nil {
@ -1011,6 +1023,7 @@ func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_a
log.G(ctx).Tracef("ListImages with filter %+v returns image list %+v",
r.GetFilter(), res.GetImages())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ListImagesRequest
@ -1041,6 +1054,8 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageStatus"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1049,6 +1064,7 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image
log.G(ctx).Tracef("ImageStatus for %q returns image status %+v",
r.GetImage().GetImage(), res.GetImage())
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.ImageStatus(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1058,6 +1074,8 @@ func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageStatus"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1066,6 +1084,7 @@ func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_
log.G(ctx).Tracef("ImageStatus for %q returns image status %+v",
r.GetImage().GetImage(), res.GetImage())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ImageStatusRequest
@ -1096,6 +1115,8 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("RemoveImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1103,6 +1124,7 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
} else {
log.G(ctx).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage())
}
tracing.SetSpanStatus(span, err)
}()
res, err := in.c.RemoveImage(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1112,6 +1134,8 @@ func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("RemoveImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1119,6 +1143,7 @@ func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_
} else {
log.G(ctx).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.RemoveImageRequest
@ -1149,6 +1174,8 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageFsInfo"))
defer tracing.StopSpan(span)
log.G(ctx).Debugf("ImageFsInfo")
defer func() {
if err != nil {
@ -1156,6 +1183,7 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image
} else {
log.G(ctx).Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems)
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.ImageFsInfo(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1165,6 +1193,8 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageFsInfo"))
defer tracing.StopSpan(span)
log.G(ctx).Debugf("ImageFsInfo")
defer func() {
if err != nil {
@ -1172,6 +1202,7 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_
} else {
log.G(ctx).Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems)
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ImageFsInfoRequest

View File

@ -92,6 +92,11 @@ const (
// runtimeRunhcsV1 is the runtime type for runhcs.
runtimeRunhcsV1 = "io.containerd.runhcs.v1"
// name prefix for CRI server specific spans
criServerSpanPrefix = "pkg.cri.server"
spanDelimiter = "."
)
// makeSandboxName generates sandbox name from sandbox metadata. The name
@ -510,3 +515,10 @@ func copyResourcesToStatus(spec *runtimespec.Spec, status containerstore.Status)
}
return status
}
func makeSpanName(funcName string) string {
return strings.Join([]string{
criServerSpanPrefix,
funcName,
}, spanDelimiter)
}

View File

@ -48,6 +48,7 @@ import (
distribution "github.com/containerd/containerd/reference/docker"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/config"
"github.com/containerd/containerd/tracing"
)
// For image management:
@ -93,6 +94,7 @@ import (
// PullImage pulls an image with authentication config.
func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (*runtime.PullImageResponse, error) {
span := tracing.CurrentSpan(ctx)
imageRef := r.GetImage().GetImage()
namedRef, err := distribution.ParseDockerRef(imageRef)
if err != nil {
@ -133,7 +135,10 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
return nil, err
}
log.G(ctx).Debugf("PullImage %q with snapshotter %s", ref, snapshotter)
span.SetAttributes(
tracing.SpanAttribute("image.ref", ref),
tracing.SpanAttribute("snapshotter.name", snapshotter),
)
pullOpts := []containerd.RemoteOpt{
containerd.WithSchema1Conversion, //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
containerd.WithResolver(resolver),
@ -165,6 +170,7 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
if err != nil {
return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
}
span.AddEvent("Pull and unpack image complete")
configDesc, err := image.Config(ctx)
if err != nil {

View File

@ -22,6 +22,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/tracing"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
@ -33,15 +34,17 @@ import (
// Remove the whole image no matter the it's image id or reference. This is the
// semantic defined in CRI now.
func (c *criService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) {
span := tracing.CurrentSpan(ctx)
image, err := c.localResolve(r.GetImage().GetImage())
if err != nil {
if errdefs.IsNotFound(err) {
span.AddEvent(err.Error())
// return empty without error when image not found.
return &runtime.RemoveImageResponse{}, nil
}
return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err)
}
span.SetAttributes(tracing.SpanAttribute("image.id", image.ID))
// Remove all image references.
for i, ref := range image.References {
var opts []images.DeleteOpt

View File

@ -24,6 +24,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
"github.com/containerd/containerd/tracing"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -33,14 +34,17 @@ import (
// TODO(random-liu): We should change CRI to distinguish image id and image spec. (See
// kubernetes/kubernetes#46255)
func (c *criService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (*runtime.ImageStatusResponse, error) {
span := tracing.CurrentSpan(ctx)
image, err := c.localResolve(r.GetImage().GetImage())
if err != nil {
if errdefs.IsNotFound(err) {
span.AddEvent(err.Error())
// return empty without error when image not found.
return &runtime.ImageStatusResponse{}, nil
}
return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err)
}
span.SetAttributes(tracing.SpanAttribute("image.id", image.ID))
// TODO(random-liu): [P0] Make sure corresponding snapshot exists. What if snapshot
// doesn't exist?

View File

@ -22,6 +22,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/tracing"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
runtime_alpha "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
@ -931,6 +932,8 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("PullImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -939,6 +942,7 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma
log.G(ctx).Infof("PullImage %q returns image reference %q",
r.GetImage().GetImage(), res.GetImageRef())
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.PullImage(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -948,6 +952,8 @@ func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_al
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("PullImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -956,6 +962,7 @@ func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_al
log.G(ctx).Infof("PullImage %q returns image reference %q",
r.GetImage().GetImage(), res.GetImageRef())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.PullImageRequest
@ -986,6 +993,8 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ListImages"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter())
defer func() {
if err != nil {
@ -994,6 +1003,7 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm
log.G(ctx).Tracef("ListImages with filter %+v returns image list %+v",
r.GetFilter(), res.GetImages())
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.ListImages(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1003,6 +1013,8 @@ func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_a
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ListImages"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter())
defer func() {
if err != nil {
@ -1011,6 +1023,7 @@ func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_a
log.G(ctx).Tracef("ListImages with filter %+v returns image list %+v",
r.GetFilter(), res.GetImages())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ListImagesRequest
@ -1041,6 +1054,8 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageStatus"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1049,6 +1064,7 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image
log.G(ctx).Tracef("ImageStatus for %q returns image status %+v",
r.GetImage().GetImage(), res.GetImage())
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.ImageStatus(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1058,6 +1074,8 @@ func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageStatus"))
defer tracing.StopSpan(span)
log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1066,6 +1084,7 @@ func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_
log.G(ctx).Tracef("ImageStatus for %q returns image status %+v",
r.GetImage().GetImage(), res.GetImage())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ImageStatusRequest
@ -1096,6 +1115,8 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("RemoveImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1103,6 +1124,7 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
} else {
log.G(ctx).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage())
}
tracing.SetSpanStatus(span, err)
}()
res, err := in.c.RemoveImage(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1112,6 +1134,8 @@ func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("RemoveImage"))
defer tracing.StopSpan(span)
log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@ -1119,6 +1143,7 @@ func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_
} else {
log.G(ctx).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage())
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.RemoveImageRequest
@ -1149,6 +1174,8 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageFsInfo"))
defer tracing.StopSpan(span)
log.G(ctx).Debugf("ImageFsInfo")
defer func() {
if err != nil {
@ -1156,6 +1183,7 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image
} else {
log.G(ctx).Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems)
}
tracing.SetSpanStatus(span, err)
}()
res, err = in.c.ImageFsInfo(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
@ -1165,6 +1193,8 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_
if err := in.checkInitialized(); err != nil {
return nil, err
}
ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageFsInfo"))
defer tracing.StopSpan(span)
log.G(ctx).Debugf("ImageFsInfo")
defer func() {
if err != nil {
@ -1172,6 +1202,7 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_
} else {
log.G(ctx).Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems)
}
tracing.SetSpanStatus(span, err)
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ImageFsInfoRequest

View File

@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
@ -36,6 +37,7 @@ import (
"github.com/containerd/containerd/pkg/kmutex"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/containerd/tracing"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -160,6 +162,11 @@ func (u *Unpacker) Unpack(h images.Handler) images.Handler {
layers = map[digest.Digest][]ocispec.Descriptor{}
)
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ctx, span := tracing.StartSpan(ctx, "pkg.unpack.unpacker.UnpackHandler")
defer span.End()
span.SetAttributes(
tracing.SpanAttribute("descriptor.media.type", desc.MediaType),
tracing.SpanAttribute("descriptor.digest", desc.Digest.String()))
unlock, err := u.lockBlobDescriptor(ctx, desc)
if err != nil {
return nil, err
@ -174,10 +181,12 @@ func (u *Unpacker) Unpack(h images.Handler) images.Handler {
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
var nonLayers []ocispec.Descriptor
var manifestLayers []ocispec.Descriptor
// Split layers from non-layers, layers will be handled after
// the config
for _, child := range children {
for i, child := range children {
span.SetAttributes(
tracing.SpanAttribute("descriptor.child."+strconv.Itoa(i), []string{child.MediaType, child.Digest.String()}),
)
if images.IsLayerType(child.MediaType) {
manifestLayers = append(manifestLayers, child)
} else {
@ -223,6 +232,8 @@ func (u *Unpacker) unpack(
layers []ocispec.Descriptor,
) error {
ctx := u.ctx
ctx, layerSpan := tracing.StartSpan(ctx, "pkg.unpack.unpacker.unpack")
defer layerSpan.End()
p, err := content.ReadBlob(ctx, u.content, config)
if err != nil {
return err
@ -398,9 +409,18 @@ func (u *Unpacker) unpack(
}
for i, desc := range layers {
_, layerSpan := tracing.StartSpan(ctx, "pkg.unpack.unpacker.unpackLayer")
layerSpan.SetAttributes(
tracing.SpanAttribute("layer.media.type", desc.MediaType),
tracing.SpanAttribute("layer.media.size", desc.Size),
tracing.SpanAttribute("layer.media.digest", desc.Digest.String()),
)
if err := doUnpackFn(i, desc); err != nil {
layerSpan.RecordError(err)
layerSpan.End()
return err
}
layerSpan.End()
}
chainID := identity.ChainID(chain).String()
@ -425,9 +445,14 @@ func (u *Unpacker) unpack(
func (u *Unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error {
eg, ctx2 := errgroup.WithContext(ctx)
for i, desc := range layers {
ctx2, layerSpan := tracing.StartSpan(ctx2, "pkg.unpack.unpacker.fetchLayer")
layerSpan.SetAttributes(
tracing.SpanAttribute("layer.media.type", desc.MediaType),
tracing.SpanAttribute("layer.media.size", desc.Size),
tracing.SpanAttribute("layer.media.digest", desc.Digest.String()),
)
desc := desc
i := i
if err := u.acquire(ctx); err != nil {
return err
}
@ -451,6 +476,7 @@ func (u *Unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec
return nil
})
layerSpan.End()
}
return eg.Wait()

23
pull.go
View File

@ -28,6 +28,7 @@ import (
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/schema1" //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
"github.com/containerd/containerd/tracing"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/semaphore"
)
@ -35,7 +36,11 @@ import (
// Pull downloads the provided content into containerd's content store
// and returns a platform specific image object
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Image, retErr error) {
ctx, span := tracing.StartSpan(ctx, "pull.Pull")
defer tracing.StopSpan(span)
pullCtx := defaultRemoteContext()
for _, o := range opts {
if err := o(c, pullCtx); err != nil {
return nil, err
@ -57,6 +62,14 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
}
}
span.SetAttributes(
tracing.SpanAttribute("image.ref", ref),
tracing.SpanAttribute("unpack", pullCtx.Unpack),
tracing.SpanAttribute("max.concurrent.downloads", pullCtx.MaxConcurrentDownloads),
tracing.SpanAttribute("max.concurrent.upload.layers", pullCtx.MaxConcurrentUploadedLayers),
tracing.SpanAttribute("platforms.count", len(pullCtx.Platforms)),
)
ctx, done, err := c.WithLease(ctx)
if err != nil {
return nil, err
@ -70,6 +83,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
if err != nil {
return nil, fmt.Errorf("unable to resolve snapshotter: %w", err)
}
span.SetAttributes(tracing.SpanAttribute("snapshotter.name", snapshotterName))
var uconfig UnpackConfig
for _, opt := range pullCtx.UnpackOpts {
if err := opt(ctx, &uconfig); err != nil {
@ -127,9 +141,13 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
// download).
var ur unpack.Result
if unpacker != nil {
_, unpackSpan := tracing.StartSpan(ctx, "pull.UnpackWait")
if ur, err = unpacker.Wait(); err != nil {
unpackSpan.RecordError(err)
tracing.StopSpan(unpackSpan)
return nil, err
}
tracing.StopSpan(unpackSpan)
}
img, err = c.createNewImage(ctx, img)
@ -138,6 +156,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
}
i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher)
span.SetAttributes(tracing.SpanAttribute("image.ref", i.Name()))
if unpacker != nil && ur.Unpacks == 0 {
// Unpack was tried previously but nothing was unpacked
@ -151,6 +170,8 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
}
func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, limit int) (images.Image, error) {
ctx, span := tracing.StartSpan(ctx, "pull.fetch")
defer tracing.StopSpan(span)
store := c.ContentStore()
name, desc, err := rCtx.Resolver.Resolve(ctx, ref)
if err != nil {
@ -254,6 +275,8 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim
}
func (c *Client) createNewImage(ctx context.Context, img images.Image) (images.Image, error) {
ctx, span := tracing.StartSpan(ctx, "pull.createNewImage")
defer tracing.StopSpan(span)
is := c.ImageService()
for {
if created, err := is.Create(ctx, img); err != nil {

View File

@ -34,10 +34,13 @@ import (
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker/schema1" //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
remoteerrors "github.com/containerd/containerd/remotes/errors"
"github.com/containerd/containerd/tracing"
"github.com/containerd/containerd/version"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
oteltrace "go.opentelemetry.io/otel/trace"
)
var (
@ -566,11 +569,20 @@ func (r *request) do(ctx context.Context) (*http.Response, error) {
return nil
}
}
_, httpSpan := tracing.StartSpan(
ctx,
"remotes.docker.resolver.HTTPRequest",
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
oteltrace.WithAttributes(semconv.HTTPClientAttributesFromHTTPRequest(req)...),
)
resp, err := client.Do(req)
if err != nil {
httpSpan.RecordError(err)
tracing.StopSpan(httpSpan)
return nil, fmt.Errorf("failed to do request: %w", err)
}
httpSpan.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(resp.StatusCode)...)
tracing.StopSpan(httpSpan)
log.G(ctx).WithFields(responseFields(resp)).Debug("fetch response received")
return resp, nil
}

85
tracing/helpers.go Normal file
View File

@ -0,0 +1,85 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tracing
import (
"encoding/json"
"fmt"
"go.opentelemetry.io/otel/attribute"
)
func any(k string, v interface{}) attribute.KeyValue {
if v == nil {
return attribute.String(k, "<nil>")
}
switch typed := v.(type) {
case bool:
return attribute.Bool(k, typed)
case []bool:
return attribute.BoolSlice(k, typed)
case int:
return attribute.Int(k, typed)
case []int:
return attribute.IntSlice(k, typed)
case int8:
return attribute.Int(k, int(typed))
case []int8:
ls := make([]int, 0, len(typed))
for _, i := range typed {
ls = append(ls, int(i))
}
return attribute.IntSlice(k, ls)
case int16:
return attribute.Int(k, int(typed))
case []int16:
ls := make([]int, 0, len(typed))
for _, i := range typed {
ls = append(ls, int(i))
}
return attribute.IntSlice(k, ls)
case int32:
return attribute.Int64(k, int64(typed))
case []int32:
ls := make([]int64, 0, len(typed))
for _, i := range typed {
ls = append(ls, int64(i))
}
return attribute.Int64Slice(k, ls)
case int64:
return attribute.Int64(k, typed)
case []int64:
return attribute.Int64Slice(k, typed)
case float64:
return attribute.Float64(k, typed)
case []float64:
return attribute.Float64Slice(k, typed)
case string:
return attribute.String(k, typed)
case []string:
return attribute.StringSlice(k, typed)
}
if stringer, ok := v.(fmt.Stringer); ok {
return attribute.String(k, stringer.String())
}
if b, err := json.Marshal(v); b != nil && err == nil {
return attribute.String(k, string(b))
}
return attribute.String(k, fmt.Sprintf("%v", v))
}

View File

@ -17,9 +17,6 @@
package tracing
import (
"encoding/json"
"fmt"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -67,64 +64,3 @@ func logrusDataToAttrs(data logrus.Fields) []attribute.KeyValue {
}
return attrs
}
func any(k string, v interface{}) attribute.KeyValue {
if v == nil {
return attribute.String(k, "<nil>")
}
switch typed := v.(type) {
case bool:
return attribute.Bool(k, typed)
case []bool:
return attribute.BoolSlice(k, typed)
case int:
return attribute.Int(k, typed)
case []int:
return attribute.IntSlice(k, typed)
case int8:
return attribute.Int(k, int(typed))
case []int8:
ls := make([]int, 0, len(typed))
for _, i := range typed {
ls = append(ls, int(i))
}
return attribute.IntSlice(k, ls)
case int16:
return attribute.Int(k, int(typed))
case []int16:
ls := make([]int, 0, len(typed))
for _, i := range typed {
ls = append(ls, int(i))
}
return attribute.IntSlice(k, ls)
case int32:
return attribute.Int64(k, int64(typed))
case []int32:
ls := make([]int64, 0, len(typed))
for _, i := range typed {
ls = append(ls, int64(i))
}
return attribute.Int64Slice(k, ls)
case int64:
return attribute.Int64(k, typed)
case []int64:
return attribute.Int64Slice(k, typed)
case float64:
return attribute.Float64(k, typed)
case []float64:
return attribute.Float64Slice(k, typed)
case string:
return attribute.String(k, typed)
case []string:
return attribute.StringSlice(k, typed)
}
if stringer, ok := v.(fmt.Stringer); ok {
return attribute.String(k, stringer.String())
}
if b, err := json.Marshal(v); b != nil && err == nil {
return attribute.String(k, string(b))
}
return attribute.String(k, fmt.Sprintf("%v", v))
}

View File

@ -137,6 +137,7 @@ func newTracer(ic *plugin.InitContext) (io.Closer, error) {
config := ic.Config.(*TraceConfig)
res, err := resource.New(ctx,
resource.WithHost(),
resource.WithAttributes(
// Service name used to displace traces in backends
semconv.ServiceNameKey.String(config.ServiceName),
@ -146,8 +147,10 @@ func newTracer(ic *plugin.InitContext) (io.Closer, error) {
return nil, fmt.Errorf("failed to create resource: %w", err)
}
sampler := sdktrace.ParentBased(sdktrace.TraceIDRatioBased(config.TraceSamplingRatio))
opts := []sdktrace.TracerProviderOption{
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(config.TraceSamplingRatio)),
sdktrace.WithSampler(sampler),
sdktrace.WithResource(res),
}
@ -175,7 +178,8 @@ func newTracer(ic *plugin.InitContext) (io.Closer, error) {
provider := sdktrace.NewTracerProvider(opts...)
otel.SetTracerProvider(provider)
otel.SetTextMapPropagator(propagation.TraceContext{})
otel.SetTextMapPropagator(propagators())
return &closer{close: func() error {
for _, p := range procs {
@ -186,3 +190,8 @@ func newTracer(ic *plugin.InitContext) (io.Closer, error) {
return nil
}}, nil
}
// Returns a composite TestMap propagator
func propagators() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
}

View File

@ -20,6 +20,8 @@ import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
@ -35,3 +37,23 @@ func StartSpan(ctx context.Context, opName string, opts ...trace.SpanStartOption
func StopSpan(span trace.Span) {
span.End()
}
// CurrentSpan returns current span from context or noopSpan if no span exists.
func CurrentSpan(ctx context.Context) trace.Span {
return trace.SpanFromContext(ctx)
}
// SetSpanStatus sets the status of the current span.
// If an error is encountered, it records the error and sets span status to Error.
func SetSpanStatus(span trace.Span, err error) {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
}
func SpanAttribute(k string, v interface{}) attribute.KeyValue {
return any(k, v)
}