[otel-tracing] Initial opentelemetry support

Add basic intiialization of opentelemetry including minimum support to
be able to read open telemetry config from config.toml and initialize
exporter. Tracer is initialized and ready to be be used for creating
spans, sub spans etc. With no opentelemetry configuration enabled in
config file, this patch is a no-op.

Basic config stub to be added to use opentelemetry is to add following
in config.toml. We use otlp exporter with default port 4317.

[otel]
  exporter_name = "otlp"
  exporter_endpoint = "0.0.0.1:4317"

otel-collector binary needs to run listening at the same port.

Signed-off-by: Alakesh Haloi <alakeshh@amazon.com>
This commit is contained in:
Alakesh Haloi 2021-07-06 11:25:03 -07:00
parent 10824eaf2e
commit 3597ac859d
4 changed files with 166 additions and 2 deletions

View File

@ -34,10 +34,12 @@ import (
"github.com/containerd/containerd/services/server"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/containerd/containerd/sys"
"github.com/containerd/containerd/tracing"
"github.com/containerd/containerd/version"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"go.opentelemetry.io/otel"
"google.golang.org/grpc/grpclog"
)
@ -130,6 +132,20 @@ can be used and modified as necessary as a custom configuration.`
return err
}
// Initialize OpenTelemetry tracing
shutdown, err := tracing.InitOpenTelemetry(config)
if err != nil {
errors.Wrap(err, "failed to initialize OpenTelemetry tracing")
}
if shutdown != nil {
defer shutdown()
}
// Get a tracer
ctrdTracer := otel.Tracer("containerd")
ctx, mainCtrdSpan := ctrdTracer.Start(ctx, "containerd-exporter")
defer mainCtrdSpan.End()
// Make sure top-level directories are created early.
if err := server.CreateTopLevelDirectories(config); err != nil {
return err
@ -243,6 +259,9 @@ can be used and modified as necessary as a custom configuration.`
func serve(ctx gocontext.Context, l net.Listener, serveFunc func(net.Listener) error) {
path := l.Addr().String()
log.G(ctx).WithField("address", path).Info("serving...")
serveSpan, ctx := tracing.StartSpan(ctx, l.Addr().String())
defer tracing.StopSpan(serveSpan)
go func() {
defer l.Close()
if err := serveFunc(l); err != nil {

View File

@ -67,6 +67,8 @@ type Config struct {
Timeouts map[string]string `toml:"timeouts"`
// Imports are additional file path list to config files that can overwrite main config file fields
Imports []string `toml:"imports"`
// OpenTelemetry configuration
OpenTelemetry OpenTelemetryConfig `toml:"otel"`
StreamProcessors map[string]StreamProcessor `toml:"stream_processors"`
}
@ -165,6 +167,14 @@ type ProxyPlugin struct {
Address string `toml:"address"`
}
// OpenTelemetryConfig provides open telemetry configuration
type OpenTelemetryConfig struct {
ServiceName string `toml:"service_name"`
ExporterName string `toml:"exporter_name"`
ExporterEndpoint string `toml:"exporter_endpoint"`
TraceSamplingRatio float64 `toml:"trace_sampling_ratio"`
}
// BoltConfig defines the configuration values for the bolt plugin, which is
// loaded here, rather than back registered in the metadata package.
type BoltConfig struct {
@ -203,6 +213,24 @@ func (bc *BoltConfig) Validate() error {
}
}
const (
// ExporterTypeOTLP represents the open telemetry exporter OTLP
ExporterTypeOTLP = "otlp"
)
// Validate OpenTelemetry config
func (cfg *OpenTelemetryConfig) Validate() error {
switch cfg.ExporterName {
case ExporterTypeOTLP:
if cfg.ServiceName == "" {
return errors.Wrapf(errdefs.ErrInvalidArgument, "missing service name in config %+v", cfg)
}
return nil
default:
return errors.Wrapf(errdefs.ErrInvalidArgument, "unsupported exporter: %+v", cfg)
}
}
// Decode unmarshals a plugin specific configuration by plugin id
func (c *Config) Decode(p *plugin.Registration) (interface{}, error) {
id := p.URI()

View File

@ -51,9 +51,11 @@ import (
"github.com/containerd/containerd/sys"
"github.com/containerd/ttrpc"
metrics "github.com/docker/go-metrics"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
@ -98,8 +100,14 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
}
serverOpts := []grpc.ServerOption{
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
otelgrpc.StreamServerInterceptor(),
grpc.StreamServerInterceptor(grpc_prometheus.StreamServerInterceptor),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
otelgrpc.UnaryServerInterceptor(),
grpc.UnaryServerInterceptor(grpc_prometheus.UnaryServerInterceptor),
)),
}
if config.GRPC.MaxRecvMsgSize > 0 {
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(config.GRPC.MaxRecvMsgSize))

109
tracing/tracing.go Normal file
View File

@ -0,0 +1,109 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tracing
import (
"context"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
)
// InitOpenTelemetry reads config and initializes otel middleware, sets the exporter
// propagator and global tracer provider
func InitOpenTelemetry(config *srvconfig.Config) (func(), error) {
ctx := context.Background()
// Check if tracing is configured
if config.OpenTelemetry == (srvconfig.OpenTelemetryConfig{}) {
logrus.Info("OpenTelemetry configuration not found, tracing is disabled")
return nil, nil
}
// Validate configuration
if err := config.OpenTelemetry.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid open telemetry configuration")
}
res, err := resource.New(ctx,
resource.WithAttributes(
// Service name used to displace traces in backends
semconv.ServiceNameKey.String(config.OpenTelemetry.ServiceName),
),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create resource")
}
// Configure OTLP trace exporter and set it up to connect to OpenTelemetry collector
// running on a local host.
ctrdTraceExporter, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithEndpoint(config.OpenTelemetry.ExporterEndpoint),
otlptracegrpc.WithDialOption(grpc.WithBlock()),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create trace exporter")
}
// Register the trace exporter with a TracerProvider, using a batch span
// process to aggregate spans before export.
ctrdBatchSpanProcessor := sdktrace.NewBatchSpanProcessor(ctrdTraceExporter)
ctrdTracerProvider := sdktrace.NewTracerProvider(
// We use TraceIDRatioBased sampling. Ratio read from config translated into following
// if sampling ratio < 0 it is interpreted as 0. If ratio >= 1, it will always sample.
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(config.OpenTelemetry.TraceSamplingRatio)),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(ctrdBatchSpanProcessor),
)
otel.SetTracerProvider(ctrdTracerProvider)
// set global propagator to tracecontext
otel.SetTextMapPropagator(propagation.TraceContext{})
return func() {
// Shutdown will flush any remaining spans and shut down the exporter.
err := ctrdTracerProvider.Shutdown(ctx)
if err != nil {
logrus.WithError(err).Errorf("failed to shutdown TracerProvider")
}
}, nil
}
// StartSpan starts child span in a context.
func StartSpan(ctx context.Context, opName string, opts ...trace.SpanStartOption) (trace.Span, context.Context) {
parentSpan := trace.SpanFromContext(ctx)
tracer := trace.NewNoopTracerProvider().Tracer("")
if parentSpan.SpanContext().IsValid() {
tracer = parentSpan.TracerProvider().Tracer("")
}
ctx, span := tracer.Start(ctx, opName, opts...)
return span, ctx
}
// StopSpan ends the span specified
func StopSpan(span trace.Span) {
span.End()
}