diff --git a/cmd/containerd/builtins.go b/cmd/containerd/builtins.go index 3d79eee09..16e5df061 100644 --- a/cmd/containerd/builtins.go +++ b/cmd/containerd/builtins.go @@ -35,4 +35,5 @@ import ( _ "github.com/containerd/containerd/services/snapshots" _ "github.com/containerd/containerd/services/tasks" _ "github.com/containerd/containerd/services/version" + _ "github.com/containerd/containerd/tracing/plugin" ) diff --git a/cmd/containerd/command/main.go b/cmd/containerd/command/main.go index ff633dff5..2815bc325 100644 --- a/cmd/containerd/command/main.go +++ b/cmd/containerd/command/main.go @@ -35,12 +35,10 @@ import ( "github.com/containerd/containerd/services/server" srvconfig "github.com/containerd/containerd/services/server/config" "github.com/containerd/containerd/sys" - "github.com/containerd/containerd/tracing" "github.com/containerd/containerd/version" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" - "go.opentelemetry.io/otel" "google.golang.org/grpc/grpclog" ) @@ -135,20 +133,6 @@ can be used and modified as necessary as a custom configuration.` return err } - // Initialize OpenTelemetry tracing - shutdown, err := tracing.InitOpenTelemetry(config) - if err != nil { - errors.Wrap(err, "failed to initialize OpenTelemetry tracing") - } - if shutdown != nil { - defer shutdown() - } - - // Get a tracer - ctrdTracer := otel.Tracer("containerd") - ctx, mainCtrdSpan := ctrdTracer.Start(ctx, "containerd-exporter") - defer mainCtrdSpan.End() - // Make sure top-level directories are created early. if err := server.CreateTopLevelDirectories(config); err != nil { return err @@ -300,9 +284,6 @@ can be used and modified as necessary as a custom configuration.` func serve(ctx gocontext.Context, l net.Listener, serveFunc func(net.Listener) error) { path := l.Addr().String() log.G(ctx).WithField("address", path).Info("serving...") - serveSpan, ctx := tracing.StartSpan(ctx, l.Addr().String()) - defer tracing.StopSpan(serveSpan) - go func() { defer l.Close() if err := serveFunc(l); err != nil { diff --git a/plugin/plugin.go b/plugin/plugin.go index b8e5157c5..f24a3c77b 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -77,6 +77,8 @@ const ( GCPlugin Type = "io.containerd.gc.v1" // EventPlugin implements event handling EventPlugin Type = "io.containerd.event.v1" + // TracingProcessorPlugin implements a open telemetry span processor + TracingProcessorPlugin Type = "io.containerd.tracing.processor.v1" ) const ( diff --git a/services/server/config/config.go b/services/server/config/config.go index 5bf0cae13..f4b9f2ab9 100644 --- a/services/server/config/config.go +++ b/services/server/config/config.go @@ -67,8 +67,6 @@ type Config struct { Timeouts map[string]string `toml:"timeouts"` // Imports are additional file path list to config files that can overwrite main config file fields Imports []string `toml:"imports"` - // OpenTelemetry configuration - OpenTelemetry OpenTelemetryConfig `toml:"otel"` StreamProcessors map[string]StreamProcessor `toml:"stream_processors"` } @@ -167,14 +165,6 @@ type ProxyPlugin struct { Address string `toml:"address"` } -// OpenTelemetryConfig provides open telemetry configuration -type OpenTelemetryConfig struct { - ServiceName string `toml:"service_name"` - ExporterName string `toml:"exporter_name"` - ExporterEndpoint string `toml:"exporter_endpoint"` - TraceSamplingRatio float64 `toml:"trace_sampling_ratio"` -} - // BoltConfig defines the configuration values for the bolt plugin, which is // loaded here, rather than back registered in the metadata package. type BoltConfig struct { @@ -213,24 +203,6 @@ func (bc *BoltConfig) Validate() error { } } -const ( - // ExporterTypeOTLP represents the open telemetry exporter OTLP - ExporterTypeOTLP = "otlp" -) - -// Validate OpenTelemetry config -func (cfg *OpenTelemetryConfig) Validate() error { - switch cfg.ExporterName { - case ExporterTypeOTLP: - if cfg.ServiceName == "" { - return errors.Wrapf(errdefs.ErrInvalidArgument, "missing service name in config %+v", cfg) - } - return nil - default: - return errors.Wrapf(errdefs.ErrInvalidArgument, "unsupported exporter: %+v", cfg) - } -} - // Decode unmarshals a plugin specific configuration by plugin id func (c *Config) Decode(p *plugin.Registration) (interface{}, error) { id := p.URI() diff --git a/tracing/plugin/otlp.go b/tracing/plugin/otlp.go new file mode 100644 index 000000000..bc52115b7 --- /dev/null +++ b/tracing/plugin/otlp.go @@ -0,0 +1,143 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package plugin + +import ( + "io" + + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/plugin" + "github.com/pkg/errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "google.golang.org/grpc" +) + +const exporterPlugin = "otlp" + +func init() { + plugin.Register(&plugin.Registration{ + ID: exporterPlugin, + Type: plugin.TracingProcessorPlugin, + Config: &OTLPConfig{}, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + cfg := ic.Config.(*OTLPConfig) + if cfg.Endpoint == "" { + return nil, errors.Wrap(plugin.ErrSkipPlugin, "otlp endpoint not set") + } + dialOpts := []grpc.DialOption{grpc.WithBlock()} + if cfg.Insecure { + dialOpts = append(dialOpts, grpc.WithInsecure()) + } + + exp, err := otlptracegrpc.New(ic.Context, + otlptracegrpc.WithEndpoint(cfg.Endpoint), + otlptracegrpc.WithDialOption(dialOpts...), + ) + if err != nil { + return nil, errors.Wrap(err, "failed to create otlp exporter") + } + return sdktrace.NewBatchSpanProcessor(exp), nil + }, + }) + plugin.Register(&plugin.Registration{ + ID: "tracing", + Type: plugin.InternalPlugin, + Requires: []plugin.Type{plugin.TracingProcessorPlugin}, + Config: &TraceConfig{ServiceName: "containerd"}, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + return newTracer(ic) + }, + }) +} + +// OTLPConfig holds the configurations for the built-in otlp span processor +type OTLPConfig struct { + Endpoint string `toml:"endpoint"` + Insecure bool `toml:"insecure"` +} + +// TraceConfig is the common configuration for open telemetry. +type TraceConfig struct { + ServiceName string `toml:"service_name"` + TraceSamplingRatio float64 `toml:"sampling_ratio"` +} + +type closer struct { + close func() error +} + +func (c *closer) Close() error { + return c.close() +} + +// InitOpenTelemetry reads config and initializes otel middleware, sets the exporter +// propagator and global tracer provider +func newTracer(ic *plugin.InitContext) (io.Closer, error) { + ctx := ic.Context + config := ic.Config.(*TraceConfig) + + res, err := resource.New(ctx, + resource.WithAttributes( + // Service name used to displace traces in backends + semconv.ServiceNameKey.String(config.ServiceName), + ), + ) + if err != nil { + return nil, errors.Wrap(err, "failed to create resource") + } + + opts := []sdktrace.TracerProviderOption{ + sdktrace.WithSampler(sdktrace.TraceIDRatioBased(config.TraceSamplingRatio)), + sdktrace.WithResource(res), + } + + ls, err := ic.GetByType(plugin.TracingProcessorPlugin) + if err != nil { + return nil, errors.Wrap(err, "failed to get tracing processors") + } + + procs := make([]sdktrace.SpanProcessor, 0, len(ls)) + for id, pctx := range ls { + p, err := pctx.Instance() + if err != nil { + log.G(ctx).WithError(err).Errorf("Failed to init tracing processor %q", id) + continue + } + proc := p.(sdktrace.SpanProcessor) + opts = append(opts, sdktrace.WithSpanProcessor(proc)) + procs = append(procs, proc) + } + + provider := sdktrace.NewTracerProvider(opts...) + + otel.SetTracerProvider(provider) + otel.SetTextMapPropagator(propagation.TraceContext{}) + + return &closer{close: func() error { + for _, p := range procs { + if err := p.Shutdown(ctx); err != nil { + return err + } + } + return nil + }}, nil +} diff --git a/tracing/tracing.go b/tracing/tracing.go index d95ea7da6..433739aa6 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -19,87 +19,17 @@ package tracing import ( "context" - srvconfig "github.com/containerd/containerd/services/server/config" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.opentelemetry.io/otel/trace" - "google.golang.org/grpc" ) -// InitOpenTelemetry reads config and initializes otel middleware, sets the exporter -// propagator and global tracer provider -func InitOpenTelemetry(config *srvconfig.Config) (func(), error) { - ctx := context.Background() - - // Check if tracing is configured - if config.OpenTelemetry == (srvconfig.OpenTelemetryConfig{}) { - logrus.Info("OpenTelemetry configuration not found, tracing is disabled") - return nil, nil - } - - // Validate configuration - if err := config.OpenTelemetry.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid open telemetry configuration") - } - - res, err := resource.New(ctx, - resource.WithAttributes( - // Service name used to displace traces in backends - semconv.ServiceNameKey.String(config.OpenTelemetry.ServiceName), - ), - ) - if err != nil { - return nil, errors.Wrap(err, "failed to create resource") - } - - // Configure OTLP trace exporter and set it up to connect to OpenTelemetry collector - // running on a local host. - ctrdTraceExporter, err := otlptracegrpc.New(ctx, - otlptracegrpc.WithEndpoint(config.OpenTelemetry.ExporterEndpoint), - otlptracegrpc.WithDialOption(grpc.WithBlock()), - ) - if err != nil { - return nil, errors.Wrap(err, "failed to create trace exporter") - } - - // Register the trace exporter with a TracerProvider, using a batch span - // process to aggregate spans before export. - ctrdBatchSpanProcessor := sdktrace.NewBatchSpanProcessor(ctrdTraceExporter) - ctrdTracerProvider := sdktrace.NewTracerProvider( - // We use TraceIDRatioBased sampling. Ratio read from config translated into following - // if sampling ratio < 0 it is interpreted as 0. If ratio >= 1, it will always sample. - sdktrace.WithSampler(sdktrace.TraceIDRatioBased(config.OpenTelemetry.TraceSamplingRatio)), - sdktrace.WithResource(res), - sdktrace.WithSpanProcessor(ctrdBatchSpanProcessor), - ) - otel.SetTracerProvider(ctrdTracerProvider) - - // set global propagator to tracecontext - otel.SetTextMapPropagator(propagation.TraceContext{}) - - return func() { - // Shutdown will flush any remaining spans and shut down the exporter. - err := ctrdTracerProvider.Shutdown(ctx) - if err != nil { - logrus.WithError(err).Errorf("failed to shutdown TracerProvider") - } - }, nil -} - // StartSpan starts child span in a context. func StartSpan(ctx context.Context, opName string, opts ...trace.SpanStartOption) (trace.Span, context.Context) { - parentSpan := trace.SpanFromContext(ctx) - tracer := trace.NewNoopTracerProvider().Tracer("") - if parentSpan.SpanContext().IsValid() { - tracer = parentSpan.TracerProvider().Tracer("") + if parent := trace.SpanFromContext(ctx); parent != nil && parent.SpanContext().IsValid() { + ctx, span := parent.TracerProvider().Tracer("").Start(ctx, opName, opts...) + return span, ctx } - ctx, span := tracer.Start(ctx, opName, opts...) + ctx, span := otel.Tracer("").Start(ctx, opName, opts...) return span, ctx }