diff --git a/cmd/containerd/command/main.go b/cmd/containerd/command/main.go index ca6acfe39..21967a17b 100644 --- a/cmd/containerd/command/main.go +++ b/cmd/containerd/command/main.go @@ -34,10 +34,12 @@ import ( "github.com/containerd/containerd/services/server" srvconfig "github.com/containerd/containerd/services/server/config" "github.com/containerd/containerd/sys" + "github.com/containerd/containerd/tracing" "github.com/containerd/containerd/version" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" + "go.opentelemetry.io/otel" "google.golang.org/grpc/grpclog" ) @@ -130,6 +132,20 @@ can be used and modified as necessary as a custom configuration.` return err } + // Initialize OpenTelemetry tracing + shutdown, err := tracing.InitOpenTelemetry(config) + if err != nil { + errors.Wrap(err, "failed to initialize OpenTelemetry tracing") + } + if shutdown != nil { + defer shutdown() + } + + // Get a tracer + ctrdTracer := otel.Tracer("containerd") + ctx, mainCtrdSpan := ctrdTracer.Start(ctx, "containerd-exporter") + defer mainCtrdSpan.End() + // Make sure top-level directories are created early. if err := server.CreateTopLevelDirectories(config); err != nil { return err @@ -243,6 +259,9 @@ can be used and modified as necessary as a custom configuration.` func serve(ctx gocontext.Context, l net.Listener, serveFunc func(net.Listener) error) { path := l.Addr().String() log.G(ctx).WithField("address", path).Info("serving...") + serveSpan, ctx := tracing.StartSpan(ctx, l.Addr().String()) + defer tracing.StopSpan(serveSpan) + go func() { defer l.Close() if err := serveFunc(l); err != nil { diff --git a/services/server/config/config.go b/services/server/config/config.go index f4b9f2ab9..5bf0cae13 100644 --- a/services/server/config/config.go +++ b/services/server/config/config.go @@ -67,6 +67,8 @@ type Config struct { Timeouts map[string]string `toml:"timeouts"` // Imports are additional file path list to config files that can overwrite main config file fields Imports []string `toml:"imports"` + // OpenTelemetry configuration + OpenTelemetry OpenTelemetryConfig `toml:"otel"` StreamProcessors map[string]StreamProcessor `toml:"stream_processors"` } @@ -165,6 +167,14 @@ type ProxyPlugin struct { Address string `toml:"address"` } +// OpenTelemetryConfig provides open telemetry configuration +type OpenTelemetryConfig struct { + ServiceName string `toml:"service_name"` + ExporterName string `toml:"exporter_name"` + ExporterEndpoint string `toml:"exporter_endpoint"` + TraceSamplingRatio float64 `toml:"trace_sampling_ratio"` +} + // BoltConfig defines the configuration values for the bolt plugin, which is // loaded here, rather than back registered in the metadata package. type BoltConfig struct { @@ -203,6 +213,24 @@ func (bc *BoltConfig) Validate() error { } } +const ( + // ExporterTypeOTLP represents the open telemetry exporter OTLP + ExporterTypeOTLP = "otlp" +) + +// Validate OpenTelemetry config +func (cfg *OpenTelemetryConfig) Validate() error { + switch cfg.ExporterName { + case ExporterTypeOTLP: + if cfg.ServiceName == "" { + return errors.Wrapf(errdefs.ErrInvalidArgument, "missing service name in config %+v", cfg) + } + return nil + default: + return errors.Wrapf(errdefs.ErrInvalidArgument, "unsupported exporter: %+v", cfg) + } +} + // Decode unmarshals a plugin specific configuration by plugin id func (c *Config) Decode(p *plugin.Registration) (interface{}, error) { id := p.URI() diff --git a/services/server/server.go b/services/server/server.go index 444e64f59..bc3c025f2 100644 --- a/services/server/server.go +++ b/services/server/server.go @@ -51,9 +51,11 @@ import ( "github.com/containerd/containerd/sys" "github.com/containerd/ttrpc" metrics "github.com/docker/go-metrics" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/pkg/errors" bolt "go.etcd.io/bbolt" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" @@ -98,8 +100,14 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { } serverOpts := []grpc.ServerOption{ - grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), - grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + otelgrpc.StreamServerInterceptor(), + grpc.StreamServerInterceptor(grpc_prometheus.StreamServerInterceptor), + )), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + otelgrpc.UnaryServerInterceptor(), + grpc.UnaryServerInterceptor(grpc_prometheus.UnaryServerInterceptor), + )), } if config.GRPC.MaxRecvMsgSize > 0 { serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(config.GRPC.MaxRecvMsgSize)) diff --git a/tracing/tracing.go b/tracing/tracing.go new file mode 100644 index 000000000..d95ea7da6 --- /dev/null +++ b/tracing/tracing.go @@ -0,0 +1,109 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package tracing + +import ( + "context" + + srvconfig "github.com/containerd/containerd/services/server/config" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" +) + +// InitOpenTelemetry reads config and initializes otel middleware, sets the exporter +// propagator and global tracer provider +func InitOpenTelemetry(config *srvconfig.Config) (func(), error) { + ctx := context.Background() + + // Check if tracing is configured + if config.OpenTelemetry == (srvconfig.OpenTelemetryConfig{}) { + logrus.Info("OpenTelemetry configuration not found, tracing is disabled") + return nil, nil + } + + // Validate configuration + if err := config.OpenTelemetry.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid open telemetry configuration") + } + + res, err := resource.New(ctx, + resource.WithAttributes( + // Service name used to displace traces in backends + semconv.ServiceNameKey.String(config.OpenTelemetry.ServiceName), + ), + ) + if err != nil { + return nil, errors.Wrap(err, "failed to create resource") + } + + // Configure OTLP trace exporter and set it up to connect to OpenTelemetry collector + // running on a local host. + ctrdTraceExporter, err := otlptracegrpc.New(ctx, + otlptracegrpc.WithEndpoint(config.OpenTelemetry.ExporterEndpoint), + otlptracegrpc.WithDialOption(grpc.WithBlock()), + ) + if err != nil { + return nil, errors.Wrap(err, "failed to create trace exporter") + } + + // Register the trace exporter with a TracerProvider, using a batch span + // process to aggregate spans before export. + ctrdBatchSpanProcessor := sdktrace.NewBatchSpanProcessor(ctrdTraceExporter) + ctrdTracerProvider := sdktrace.NewTracerProvider( + // We use TraceIDRatioBased sampling. Ratio read from config translated into following + // if sampling ratio < 0 it is interpreted as 0. If ratio >= 1, it will always sample. + sdktrace.WithSampler(sdktrace.TraceIDRatioBased(config.OpenTelemetry.TraceSamplingRatio)), + sdktrace.WithResource(res), + sdktrace.WithSpanProcessor(ctrdBatchSpanProcessor), + ) + otel.SetTracerProvider(ctrdTracerProvider) + + // set global propagator to tracecontext + otel.SetTextMapPropagator(propagation.TraceContext{}) + + return func() { + // Shutdown will flush any remaining spans and shut down the exporter. + err := ctrdTracerProvider.Shutdown(ctx) + if err != nil { + logrus.WithError(err).Errorf("failed to shutdown TracerProvider") + } + }, nil +} + +// StartSpan starts child span in a context. +func StartSpan(ctx context.Context, opName string, opts ...trace.SpanStartOption) (trace.Span, context.Context) { + parentSpan := trace.SpanFromContext(ctx) + tracer := trace.NewNoopTracerProvider().Tracer("") + if parentSpan.SpanContext().IsValid() { + tracer = parentSpan.TracerProvider().Tracer("") + } + ctx, span := tracer.Start(ctx, opName, opts...) + return span, ctx +} + +// StopSpan ends the span specified +func StopSpan(span trace.Span) { + span.End() +}