From 7def13dde3e15743209400ea574e254c5005bb65 Mon Sep 17 00:00:00 2001 From: Swagat Bora Date: Wed, 9 Nov 2022 21:47:44 +0000 Subject: [PATCH] Add a thin wrapper around otel Span object Signed-off-by: Swagat Bora --- docs/tracing.md | 3 +- pkg/cri/sbserver/helpers.go | 11 +---- pkg/cri/sbserver/image_pull.go | 6 +-- pkg/cri/sbserver/image_remove.go | 4 +- pkg/cri/sbserver/image_status.go | 4 +- pkg/cri/sbserver/instrumented_service.go | 60 ++++++++++++------------ pkg/cri/server/helpers.go | 11 +---- pkg/cri/server/image_pull.go | 6 +-- pkg/cri/server/image_remove.go | 4 +- pkg/cri/server/image_status.go | 4 +- pkg/cri/server/instrumented_service.go | 60 ++++++++++++------------ pkg/unpack/unpacker.go | 29 ++++++------ pull.go | 38 ++++++++------- remotes/docker/resolver.go | 8 ++-- tracing/helpers.go | 9 ++++ tracing/tracing.go | 57 ++++++++++++++++------ 16 files changed, 168 insertions(+), 146 deletions(-) diff --git a/docs/tracing.md b/docs/tracing.md index 4e23ce61a..60a8263b5 100644 --- a/docs/tracing.md +++ b/docs/tracing.md @@ -82,7 +82,6 @@ func clientWithTrace() error { defer cancel() ctx, span := tracing.StartSpan(ctx, "OPERATION NAME") - defer tracing.StopSpan(span) + defer span.End() ... } -``` diff --git a/pkg/cri/sbserver/helpers.go b/pkg/cri/sbserver/helpers.go index 58b12328b..e7523b889 100644 --- a/pkg/cri/sbserver/helpers.go +++ b/pkg/cri/sbserver/helpers.go @@ -97,9 +97,7 @@ const ( runtimeRunhcsV1 = "io.containerd.runhcs.v1" // name prefix for CRI sbserver specific spans - criSbServerSpanPrefix = "pkg.cri.sbserver" - - spanDelimiter = "." + criSpanPrefix = "pkg.cri.sbserver" ) // makeSandboxName generates sandbox name from sandbox metadata. The name @@ -506,10 +504,3 @@ func copyResourcesToStatus(spec *runtimespec.Spec, status containerstore.Status) } return status } - -func makeSpanName(funcName string) string { - return strings.Join([]string{ - criSbServerSpanPrefix, - funcName, - }, spanDelimiter) -} diff --git a/pkg/cri/sbserver/image_pull.go b/pkg/cri/sbserver/image_pull.go index 2151221fe..b13d8e2d4 100644 --- a/pkg/cri/sbserver/image_pull.go +++ b/pkg/cri/sbserver/image_pull.go @@ -94,7 +94,7 @@ import ( // PullImage pulls an image with authentication config. func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (*runtime.PullImageResponse, error) { - span := tracing.CurrentSpan(ctx) + span := tracing.SpanFromContext(ctx) imageRef := r.GetImage().GetImage() namedRef, err := distribution.ParseDockerRef(imageRef) if err != nil { @@ -136,8 +136,8 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) } log.G(ctx).Debugf("PullImage %q with snapshotter %s", ref, snapshotter) span.SetAttributes( - tracing.SpanAttribute("image.ref", ref), - tracing.SpanAttribute("snapshotter.name", snapshotter), + tracing.Attribute("image.ref", ref), + tracing.Attribute("snapshotter.name", snapshotter), ) pullOpts := []containerd.RemoteOpt{ diff --git a/pkg/cri/sbserver/image_remove.go b/pkg/cri/sbserver/image_remove.go index 7806b5dd7..b43655c49 100644 --- a/pkg/cri/sbserver/image_remove.go +++ b/pkg/cri/sbserver/image_remove.go @@ -34,7 +34,7 @@ import ( // Remove the whole image no matter the it's image id or reference. This is the // semantic defined in CRI now. func (c *criService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) { - span := tracing.CurrentSpan(ctx) + span := tracing.SpanFromContext(ctx) image, err := c.localResolve(r.GetImage().GetImage()) if err != nil { if errdefs.IsNotFound(err) { @@ -44,7 +44,7 @@ func (c *criService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequ } return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err) } - span.SetAttributes(tracing.SpanAttribute("image.id", image.ID)) + span.SetAttributes(tracing.Attribute("image.id", image.ID)) // Remove all image references. for i, ref := range image.References { var opts []images.DeleteOpt diff --git a/pkg/cri/sbserver/image_status.go b/pkg/cri/sbserver/image_status.go index 5caebfdf9..f874889b6 100644 --- a/pkg/cri/sbserver/image_status.go +++ b/pkg/cri/sbserver/image_status.go @@ -34,7 +34,7 @@ import ( // TODO(random-liu): We should change CRI to distinguish image id and image spec. (See // kubernetes/kubernetes#46255) func (c *criService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (*runtime.ImageStatusResponse, error) { - span := tracing.CurrentSpan(ctx) + span := tracing.SpanFromContext(ctx) image, err := c.localResolve(r.GetImage().GetImage()) if err != nil { if errdefs.IsNotFound(err) { @@ -44,7 +44,7 @@ func (c *criService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequ } return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err) } - span.SetAttributes(tracing.SpanAttribute("image.id", image.ID)) + span.SetAttributes(tracing.Attribute("image.id", image.ID)) // TODO(random-liu): [P0] Make sure corresponding snapshot exists. What if snapshot // doesn't exist? diff --git a/pkg/cri/sbserver/instrumented_service.go b/pkg/cri/sbserver/instrumented_service.go index 4692dfafb..e19457cdd 100644 --- a/pkg/cri/sbserver/instrumented_service.go +++ b/pkg/cri/sbserver/instrumented_service.go @@ -932,8 +932,8 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("PullImage")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "PullImage")) + defer span.End() log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -942,7 +942,7 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma log.G(ctx).Infof("PullImage %q returns image reference %q", r.GetImage().GetImage(), res.GetImageRef()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() res, err = in.c.PullImage(ctrdutil.WithNamespace(ctx), r) return res, errdefs.ToGRPC(err) @@ -952,8 +952,8 @@ func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_al if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("PullImage")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "PullImage")) + defer span.End() log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -962,7 +962,7 @@ func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_al log.G(ctx).Infof("PullImage %q returns image reference %q", r.GetImage().GetImage(), res.GetImageRef()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() // converts request and response for earlier CRI version to call and get response from the current version var v1r runtime.PullImageRequest @@ -993,8 +993,8 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("ListImages")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "ListImages")) + defer span.End() log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter()) defer func() { if err != nil { @@ -1003,7 +1003,7 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm log.G(ctx).Tracef("ListImages with filter %+v returns image list %+v", r.GetFilter(), res.GetImages()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() res, err = in.c.ListImages(ctrdutil.WithNamespace(ctx), r) return res, errdefs.ToGRPC(err) @@ -1013,8 +1013,8 @@ func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_a if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("ListImages")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "ListImages")) + defer span.End() log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter()) defer func() { if err != nil { @@ -1023,7 +1023,7 @@ func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_a log.G(ctx).Tracef("ListImages with filter %+v returns image list %+v", r.GetFilter(), res.GetImages()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() // converts request and response for earlier CRI version to call and get response from the current version var v1r runtime.ListImagesRequest @@ -1054,8 +1054,8 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageStatus")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "ImageStatus")) + defer span.End() log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -1064,7 +1064,7 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image log.G(ctx).Tracef("ImageStatus for %q returns image status %+v", r.GetImage().GetImage(), res.GetImage()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() res, err = in.c.ImageStatus(ctrdutil.WithNamespace(ctx), r) return res, errdefs.ToGRPC(err) @@ -1074,8 +1074,8 @@ func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_ if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageStatus")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "ImageStatus")) + defer span.End() log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -1084,7 +1084,7 @@ func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_ log.G(ctx).Tracef("ImageStatus for %q returns image status %+v", r.GetImage().GetImage(), res.GetImage()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() // converts request and response for earlier CRI version to call and get response from the current version var v1r runtime.ImageStatusRequest @@ -1115,8 +1115,8 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("RemoveImage")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "RemoveImage")) + defer span.End() log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -1124,7 +1124,7 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov } else { log.G(ctx).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() res, err := in.c.RemoveImage(ctrdutil.WithNamespace(ctx), r) return res, errdefs.ToGRPC(err) @@ -1134,8 +1134,8 @@ func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_ if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("RemoveImage")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "RemoveImage")) + defer span.End() log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -1143,7 +1143,7 @@ func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_ } else { log.G(ctx).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() // converts request and response for earlier CRI version to call and get response from the current version var v1r runtime.RemoveImageRequest @@ -1174,8 +1174,8 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageFsInfo")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "ImageFsInfo")) + defer span.End() log.G(ctx).Debugf("ImageFsInfo") defer func() { if err != nil { @@ -1183,7 +1183,7 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image } else { log.G(ctx).Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() res, err = in.c.ImageFsInfo(ctrdutil.WithNamespace(ctx), r) return res, errdefs.ToGRPC(err) @@ -1193,8 +1193,8 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_ if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageFsInfo")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "ImageFsInfo")) + defer span.End() log.G(ctx).Debugf("ImageFsInfo") defer func() { if err != nil { @@ -1202,7 +1202,7 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_ } else { log.G(ctx).Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() // converts request and response for earlier CRI version to call and get response from the current version var v1r runtime.ImageFsInfoRequest diff --git a/pkg/cri/server/helpers.go b/pkg/cri/server/helpers.go index 91d76c349..06d7b588e 100644 --- a/pkg/cri/server/helpers.go +++ b/pkg/cri/server/helpers.go @@ -94,9 +94,7 @@ const ( runtimeRunhcsV1 = "io.containerd.runhcs.v1" // name prefix for CRI server specific spans - criServerSpanPrefix = "pkg.cri.server" - - spanDelimiter = "." + criSpanPrefix = "pkg.cri.server" ) // makeSandboxName generates sandbox name from sandbox metadata. The name @@ -515,10 +513,3 @@ func copyResourcesToStatus(spec *runtimespec.Spec, status containerstore.Status) } return status } - -func makeSpanName(funcName string) string { - return strings.Join([]string{ - criServerSpanPrefix, - funcName, - }, spanDelimiter) -} diff --git a/pkg/cri/server/image_pull.go b/pkg/cri/server/image_pull.go index e885a95b6..69747d962 100644 --- a/pkg/cri/server/image_pull.go +++ b/pkg/cri/server/image_pull.go @@ -94,7 +94,7 @@ import ( // PullImage pulls an image with authentication config. func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (*runtime.PullImageResponse, error) { - span := tracing.CurrentSpan(ctx) + span := tracing.SpanFromContext(ctx) imageRef := r.GetImage().GetImage() namedRef, err := distribution.ParseDockerRef(imageRef) if err != nil { @@ -136,8 +136,8 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) } log.G(ctx).Debugf("PullImage %q with snapshotter %s", ref, snapshotter) span.SetAttributes( - tracing.SpanAttribute("image.ref", ref), - tracing.SpanAttribute("snapshotter.name", snapshotter), + tracing.Attribute("image.ref", ref), + tracing.Attribute("snapshotter.name", snapshotter), ) pullOpts := []containerd.RemoteOpt{ containerd.WithSchema1Conversion, //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. diff --git a/pkg/cri/server/image_remove.go b/pkg/cri/server/image_remove.go index df071a1d1..e10062a31 100644 --- a/pkg/cri/server/image_remove.go +++ b/pkg/cri/server/image_remove.go @@ -34,7 +34,7 @@ import ( // Remove the whole image no matter the it's image id or reference. This is the // semantic defined in CRI now. func (c *criService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) { - span := tracing.CurrentSpan(ctx) + span := tracing.SpanFromContext(ctx) image, err := c.localResolve(r.GetImage().GetImage()) if err != nil { if errdefs.IsNotFound(err) { @@ -44,7 +44,7 @@ func (c *criService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequ } return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err) } - span.SetAttributes(tracing.SpanAttribute("image.id", image.ID)) + span.SetAttributes(tracing.Attribute("image.id", image.ID)) // Remove all image references. for i, ref := range image.References { var opts []images.DeleteOpt diff --git a/pkg/cri/server/image_status.go b/pkg/cri/server/image_status.go index dc96d2835..f53750228 100644 --- a/pkg/cri/server/image_status.go +++ b/pkg/cri/server/image_status.go @@ -34,7 +34,7 @@ import ( // TODO(random-liu): We should change CRI to distinguish image id and image spec. (See // kubernetes/kubernetes#46255) func (c *criService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (*runtime.ImageStatusResponse, error) { - span := tracing.CurrentSpan(ctx) + span := tracing.SpanFromContext(ctx) image, err := c.localResolve(r.GetImage().GetImage()) if err != nil { if errdefs.IsNotFound(err) { @@ -44,7 +44,7 @@ func (c *criService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequ } return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err) } - span.SetAttributes(tracing.SpanAttribute("image.id", image.ID)) + span.SetAttributes(tracing.Attribute("image.id", image.ID)) // TODO(random-liu): [P0] Make sure corresponding snapshot exists. What if snapshot // doesn't exist? diff --git a/pkg/cri/server/instrumented_service.go b/pkg/cri/server/instrumented_service.go index 2f8ced526..6976e0c4d 100644 --- a/pkg/cri/server/instrumented_service.go +++ b/pkg/cri/server/instrumented_service.go @@ -932,8 +932,8 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("PullImage")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "PullImage")) + defer span.End() log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -942,7 +942,7 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma log.G(ctx).Infof("PullImage %q returns image reference %q", r.GetImage().GetImage(), res.GetImageRef()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() res, err = in.c.PullImage(ctrdutil.WithNamespace(ctx), r) return res, errdefs.ToGRPC(err) @@ -952,8 +952,8 @@ func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_al if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("PullImage")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "PullImage")) + defer span.End() log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -962,7 +962,7 @@ func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_al log.G(ctx).Infof("PullImage %q returns image reference %q", r.GetImage().GetImage(), res.GetImageRef()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() // converts request and response for earlier CRI version to call and get response from the current version var v1r runtime.PullImageRequest @@ -993,8 +993,8 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("ListImages")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "ListImages")) + defer span.End() log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter()) defer func() { if err != nil { @@ -1003,7 +1003,7 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm log.G(ctx).Tracef("ListImages with filter %+v returns image list %+v", r.GetFilter(), res.GetImages()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() res, err = in.c.ListImages(ctrdutil.WithNamespace(ctx), r) return res, errdefs.ToGRPC(err) @@ -1013,8 +1013,8 @@ func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_a if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("ListImages")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "ListImages")) + defer span.End() log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter()) defer func() { if err != nil { @@ -1023,7 +1023,7 @@ func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_a log.G(ctx).Tracef("ListImages with filter %+v returns image list %+v", r.GetFilter(), res.GetImages()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() // converts request and response for earlier CRI version to call and get response from the current version var v1r runtime.ListImagesRequest @@ -1054,8 +1054,8 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageStatus")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "ImageStatus")) + defer span.End() log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -1064,7 +1064,7 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image log.G(ctx).Tracef("ImageStatus for %q returns image status %+v", r.GetImage().GetImage(), res.GetImage()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() res, err = in.c.ImageStatus(ctrdutil.WithNamespace(ctx), r) return res, errdefs.ToGRPC(err) @@ -1074,8 +1074,8 @@ func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_ if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageStatus")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "ImageStatus")) + defer span.End() log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -1084,7 +1084,7 @@ func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_ log.G(ctx).Tracef("ImageStatus for %q returns image status %+v", r.GetImage().GetImage(), res.GetImage()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() // converts request and response for earlier CRI version to call and get response from the current version var v1r runtime.ImageStatusRequest @@ -1115,8 +1115,8 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("RemoveImage")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "RemoveImage")) + defer span.End() log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -1124,7 +1124,7 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov } else { log.G(ctx).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() res, err := in.c.RemoveImage(ctrdutil.WithNamespace(ctx), r) return res, errdefs.ToGRPC(err) @@ -1134,8 +1134,8 @@ func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_ if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("RemoveImage")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "RemoveImage")) + defer span.End() log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -1143,7 +1143,7 @@ func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_ } else { log.G(ctx).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage()) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() // converts request and response for earlier CRI version to call and get response from the current version var v1r runtime.RemoveImageRequest @@ -1174,8 +1174,8 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageFsInfo")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "ImageFsInfo")) + defer span.End() log.G(ctx).Debugf("ImageFsInfo") defer func() { if err != nil { @@ -1183,7 +1183,7 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image } else { log.G(ctx).Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() res, err = in.c.ImageFsInfo(ctrdutil.WithNamespace(ctx), r) return res, errdefs.ToGRPC(err) @@ -1193,8 +1193,8 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_ if err := in.checkInitialized(); err != nil { return nil, err } - ctx, span := tracing.StartSpan(ctx, makeSpanName("ImageFsInfo")) - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(criSpanPrefix, "ImageFsInfo")) + defer span.End() log.G(ctx).Debugf("ImageFsInfo") defer func() { if err != nil { @@ -1202,7 +1202,7 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_ } else { log.G(ctx).Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems) } - tracing.SetSpanStatus(span, err) + span.SetStatus(err) }() // converts request and response for earlier CRI version to call and get response from the current version var v1r runtime.ImageFsInfoRequest diff --git a/pkg/unpack/unpacker.go b/pkg/unpack/unpacker.go index a9f93182a..c7b664fb3 100644 --- a/pkg/unpack/unpacker.go +++ b/pkg/unpack/unpacker.go @@ -48,6 +48,7 @@ import ( const ( labelSnapshotRef = "containerd.io/snapshot.ref" + unpackSpanPrefix = "pkg.unpack.unpacker" ) // Result returns information about the unpacks which were completed. @@ -162,11 +163,11 @@ func (u *Unpacker) Unpack(h images.Handler) images.Handler { layers = map[digest.Digest][]ocispec.Descriptor{} ) return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - ctx, span := tracing.StartSpan(ctx, "pkg.unpack.unpacker.UnpackHandler") + ctx, span := tracing.StartSpan(ctx, tracing.Name(unpackSpanPrefix, "UnpackHandler")) defer span.End() span.SetAttributes( - tracing.SpanAttribute("descriptor.media.type", desc.MediaType), - tracing.SpanAttribute("descriptor.digest", desc.Digest.String())) + tracing.Attribute("descriptor.media.type", desc.MediaType), + tracing.Attribute("descriptor.digest", desc.Digest.String())) unlock, err := u.lockBlobDescriptor(ctx, desc) if err != nil { return nil, err @@ -185,7 +186,7 @@ func (u *Unpacker) Unpack(h images.Handler) images.Handler { // the config for i, child := range children { span.SetAttributes( - tracing.SpanAttribute("descriptor.child."+strconv.Itoa(i), []string{child.MediaType, child.Digest.String()}), + tracing.Attribute("descriptor.child."+strconv.Itoa(i), []string{child.MediaType, child.Digest.String()}), ) if images.IsLayerType(child.MediaType) { manifestLayers = append(manifestLayers, child) @@ -232,7 +233,7 @@ func (u *Unpacker) unpack( layers []ocispec.Descriptor, ) error { ctx := u.ctx - ctx, layerSpan := tracing.StartSpan(ctx, "pkg.unpack.unpacker.unpack") + ctx, layerSpan := tracing.StartSpan(ctx, tracing.Name(unpackSpanPrefix, "unpack")) defer layerSpan.End() p, err := content.ReadBlob(ctx, u.content, config) if err != nil { @@ -409,14 +410,14 @@ func (u *Unpacker) unpack( } for i, desc := range layers { - _, layerSpan := tracing.StartSpan(ctx, "pkg.unpack.unpacker.unpackLayer") + _, layerSpan := tracing.StartSpan(ctx, tracing.Name(unpackSpanPrefix, "unpackLayer")) layerSpan.SetAttributes( - tracing.SpanAttribute("layer.media.type", desc.MediaType), - tracing.SpanAttribute("layer.media.size", desc.Size), - tracing.SpanAttribute("layer.media.digest", desc.Digest.String()), + tracing.Attribute("layer.media.type", desc.MediaType), + tracing.Attribute("layer.media.size", desc.Size), + tracing.Attribute("layer.media.digest", desc.Digest.String()), ) if err := doUnpackFn(i, desc); err != nil { - layerSpan.RecordError(err) + layerSpan.SetStatus(err) layerSpan.End() return err } @@ -445,11 +446,11 @@ func (u *Unpacker) unpack( func (u *Unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error { eg, ctx2 := errgroup.WithContext(ctx) for i, desc := range layers { - ctx2, layerSpan := tracing.StartSpan(ctx2, "pkg.unpack.unpacker.fetchLayer") + ctx2, layerSpan := tracing.StartSpan(ctx2, tracing.Name(unpackSpanPrefix, "fetchLayer")) layerSpan.SetAttributes( - tracing.SpanAttribute("layer.media.type", desc.MediaType), - tracing.SpanAttribute("layer.media.size", desc.Size), - tracing.SpanAttribute("layer.media.digest", desc.Digest.String()), + tracing.Attribute("layer.media.type", desc.MediaType), + tracing.Attribute("layer.media.size", desc.Size), + tracing.Attribute("layer.media.digest", desc.Digest.String()), ) desc := desc i := i diff --git a/pull.go b/pull.go index 293e5e5ce..b79055a60 100644 --- a/pull.go +++ b/pull.go @@ -33,11 +33,15 @@ import ( "golang.org/x/sync/semaphore" ) +const ( + pullSpanPrefix = "pull" +) + // Pull downloads the provided content into containerd's content store // and returns a platform specific image object func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Image, retErr error) { - ctx, span := tracing.StartSpan(ctx, "pull.Pull") - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(pullSpanPrefix, "Pull")) + defer span.End() pullCtx := defaultRemoteContext() @@ -63,11 +67,11 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima } span.SetAttributes( - tracing.SpanAttribute("image.ref", ref), - tracing.SpanAttribute("unpack", pullCtx.Unpack), - tracing.SpanAttribute("max.concurrent.downloads", pullCtx.MaxConcurrentDownloads), - tracing.SpanAttribute("max.concurrent.upload.layers", pullCtx.MaxConcurrentUploadedLayers), - tracing.SpanAttribute("platforms.count", len(pullCtx.Platforms)), + tracing.Attribute("image.ref", ref), + tracing.Attribute("unpack", pullCtx.Unpack), + tracing.Attribute("max.concurrent.downloads", pullCtx.MaxConcurrentDownloads), + tracing.Attribute("max.concurrent.upload.layers", pullCtx.MaxConcurrentUploadedLayers), + tracing.Attribute("platforms.count", len(pullCtx.Platforms)), ) ctx, done, err := c.WithLease(ctx) @@ -83,7 +87,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima if err != nil { return nil, fmt.Errorf("unable to resolve snapshotter: %w", err) } - span.SetAttributes(tracing.SpanAttribute("snapshotter.name", snapshotterName)) + span.SetAttributes(tracing.Attribute("snapshotter.name", snapshotterName)) var uconfig UnpackConfig for _, opt := range pullCtx.UnpackOpts { if err := opt(ctx, &uconfig); err != nil { @@ -141,13 +145,13 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima // download). var ur unpack.Result if unpacker != nil { - _, unpackSpan := tracing.StartSpan(ctx, "pull.UnpackWait") + _, unpackSpan := tracing.StartSpan(ctx, tracing.Name(pullSpanPrefix, "UnpackWait")) if ur, err = unpacker.Wait(); err != nil { - unpackSpan.RecordError(err) - tracing.StopSpan(unpackSpan) + unpackSpan.SetStatus(err) + unpackSpan.End() return nil, err } - tracing.StopSpan(unpackSpan) + unpackSpan.End() } img, err = c.createNewImage(ctx, img) @@ -156,7 +160,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima } i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher) - span.SetAttributes(tracing.SpanAttribute("image.ref", i.Name())) + span.SetAttributes(tracing.Attribute("image.ref", i.Name())) if unpacker != nil && ur.Unpacks == 0 { // Unpack was tried previously but nothing was unpacked @@ -170,8 +174,8 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima } func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, limit int) (images.Image, error) { - ctx, span := tracing.StartSpan(ctx, "pull.fetch") - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(pullSpanPrefix, "fetch")) + defer span.End() store := c.ContentStore() name, desc, err := rCtx.Resolver.Resolve(ctx, ref) if err != nil { @@ -275,8 +279,8 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim } func (c *Client) createNewImage(ctx context.Context, img images.Image) (images.Image, error) { - ctx, span := tracing.StartSpan(ctx, "pull.createNewImage") - defer tracing.StopSpan(span) + ctx, span := tracing.StartSpan(ctx, tracing.Name(pullSpanPrefix, "pull.createNewImage")) + defer span.End() is := c.ImageService() for { if created, err := is.Create(ctx, img); err != nil { diff --git a/remotes/docker/resolver.go b/remotes/docker/resolver.go index 0ad06ef62..931b6b19a 100644 --- a/remotes/docker/resolver.go +++ b/remotes/docker/resolver.go @@ -571,18 +571,18 @@ func (r *request) do(ctx context.Context) (*http.Response, error) { } _, httpSpan := tracing.StartSpan( ctx, - "remotes.docker.resolver.HTTPRequest", + tracing.Name("remotes.docker.resolver", "HTTPRequest"), oteltrace.WithSpanKind(oteltrace.SpanKindClient), oteltrace.WithAttributes(semconv.HTTPClientAttributesFromHTTPRequest(req)...), ) resp, err := client.Do(req) if err != nil { - httpSpan.RecordError(err) - tracing.StopSpan(httpSpan) + httpSpan.SetStatus(err) + httpSpan.End() return nil, fmt.Errorf("failed to do request: %w", err) } httpSpan.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(resp.StatusCode)...) - tracing.StopSpan(httpSpan) + httpSpan.End() log.G(ctx).WithFields(responseFields(resp)).Debug("fetch response received") return resp, nil } diff --git a/tracing/helpers.go b/tracing/helpers.go index 035770998..981da6c79 100644 --- a/tracing/helpers.go +++ b/tracing/helpers.go @@ -19,10 +19,19 @@ package tracing import ( "encoding/json" "fmt" + "strings" "go.opentelemetry.io/otel/attribute" ) +const ( + spanDelimiter = "." +) + +func makeSpanName(names ...string) string { + return strings.Join(names, spanDelimiter) +} + func any(k string, v interface{}) attribute.KeyValue { if v == nil { return attribute.String(k, "") diff --git a/tracing/tracing.go b/tracing/tracing.go index 37366e4fc..d9cb7cc0c 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -25,35 +25,62 @@ import ( "go.opentelemetry.io/otel/trace" ) -// StartSpan starts child span in a context. -func StartSpan(ctx context.Context, opName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { +// StartSan starts child span in a context. +func StartSpan(ctx context.Context, opName string, opts ...trace.SpanStartOption) (context.Context, *Span) { + tracer := otel.Tracer("") if parent := trace.SpanFromContext(ctx); parent != nil && parent.SpanContext().IsValid() { - return parent.TracerProvider().Tracer("").Start(ctx, opName, opts...) + tracer = parent.TracerProvider().Tracer("") } - return otel.Tracer("").Start(ctx, opName, opts...) + ctx, span := tracer.Start(ctx, opName, opts...) + return ctx, &Span{otelSpan: span} } -// StopSpan ends the span specified -func StopSpan(span trace.Span) { - span.End() +// SpanFromContext returns the current Span from the context. +func SpanFromContext(ctx context.Context) *Span { + return &Span{ + otelSpan: trace.SpanFromContext(ctx), + } } -// CurrentSpan returns current span from context or noopSpan if no span exists. -func CurrentSpan(ctx context.Context) trace.Span { - return trace.SpanFromContext(ctx) +// Span is wrapper around otel trace.Span. +// Span is the individual component of a trace. It represents a +// single named and timed operation of a workflow that is traced. +type Span struct { + otelSpan trace.Span +} + +// End completes the span. +func (s *Span) End() { + s.otelSpan.End() +} + +// AddEvent adds an event with provided name and options. +func (s *Span) AddEvent(name string, options ...trace.EventOption) { + s.otelSpan.AddEvent(name, options...) } // SetSpanStatus sets the status of the current span. // If an error is encountered, it records the error and sets span status to Error. -func SetSpanStatus(span trace.Span, err error) { +func (s *Span) SetStatus(err error) { if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) + s.otelSpan.RecordError(err) + s.otelSpan.SetStatus(codes.Error, err.Error()) } else { - span.SetStatus(codes.Ok, "") + s.otelSpan.SetStatus(codes.Ok, "") } } -func SpanAttribute(k string, v interface{}) attribute.KeyValue { +// SetAttributes sets kv as attributes of the span. +func (s *Span) SetAttributes(kv ...attribute.KeyValue) { + s.otelSpan.SetAttributes(kv...) +} + +// Name sets the span name by joining a list of strings in dot separated format. +func Name(names ...string) string { + return makeSpanName(names...) +} + +// Attribute takes a key value pair and returns attribute.KeyValue type. +func Attribute(k string, v interface{}) attribute.KeyValue { return any(k, v) }