Merge pull request #6001 from cpuguy83/trace_exporter_plugin

Move tracing to plugin
This commit is contained in:
Phil Estes 2021-09-23 15:10:43 -04:00 committed by GitHub
commit c23f52af30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 150 additions and 121 deletions

View File

@ -35,4 +35,5 @@ import (
_ "github.com/containerd/containerd/services/snapshots"
_ "github.com/containerd/containerd/services/tasks"
_ "github.com/containerd/containerd/services/version"
_ "github.com/containerd/containerd/tracing/plugin"
)

View File

@ -35,12 +35,10 @@ import (
"github.com/containerd/containerd/services/server"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/containerd/containerd/sys"
"github.com/containerd/containerd/tracing"
"github.com/containerd/containerd/version"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"go.opentelemetry.io/otel"
"google.golang.org/grpc/grpclog"
)
@ -135,20 +133,6 @@ can be used and modified as necessary as a custom configuration.`
return err
}
// Initialize OpenTelemetry tracing
shutdown, err := tracing.InitOpenTelemetry(config)
if err != nil {
errors.Wrap(err, "failed to initialize OpenTelemetry tracing")
}
if shutdown != nil {
defer shutdown()
}
// Get a tracer
ctrdTracer := otel.Tracer("containerd")
ctx, mainCtrdSpan := ctrdTracer.Start(ctx, "containerd-exporter")
defer mainCtrdSpan.End()
// Make sure top-level directories are created early.
if err := server.CreateTopLevelDirectories(config); err != nil {
return err
@ -300,9 +284,6 @@ can be used and modified as necessary as a custom configuration.`
func serve(ctx gocontext.Context, l net.Listener, serveFunc func(net.Listener) error) {
path := l.Addr().String()
log.G(ctx).WithField("address", path).Info("serving...")
serveSpan, ctx := tracing.StartSpan(ctx, l.Addr().String())
defer tracing.StopSpan(serveSpan)
go func() {
defer l.Close()
if err := serveFunc(l); err != nil {

View File

@ -77,6 +77,8 @@ const (
GCPlugin Type = "io.containerd.gc.v1"
// EventPlugin implements event handling
EventPlugin Type = "io.containerd.event.v1"
// TracingProcessorPlugin implements a open telemetry span processor
TracingProcessorPlugin Type = "io.containerd.tracing.processor.v1"
)
const (

View File

@ -67,8 +67,6 @@ type Config struct {
Timeouts map[string]string `toml:"timeouts"`
// Imports are additional file path list to config files that can overwrite main config file fields
Imports []string `toml:"imports"`
// OpenTelemetry configuration
OpenTelemetry OpenTelemetryConfig `toml:"otel"`
StreamProcessors map[string]StreamProcessor `toml:"stream_processors"`
}
@ -167,14 +165,6 @@ type ProxyPlugin struct {
Address string `toml:"address"`
}
// OpenTelemetryConfig provides open telemetry configuration
type OpenTelemetryConfig struct {
ServiceName string `toml:"service_name"`
ExporterName string `toml:"exporter_name"`
ExporterEndpoint string `toml:"exporter_endpoint"`
TraceSamplingRatio float64 `toml:"trace_sampling_ratio"`
}
// BoltConfig defines the configuration values for the bolt plugin, which is
// loaded here, rather than back registered in the metadata package.
type BoltConfig struct {
@ -213,24 +203,6 @@ func (bc *BoltConfig) Validate() error {
}
}
const (
// ExporterTypeOTLP represents the open telemetry exporter OTLP
ExporterTypeOTLP = "otlp"
)
// Validate OpenTelemetry config
func (cfg *OpenTelemetryConfig) Validate() error {
switch cfg.ExporterName {
case ExporterTypeOTLP:
if cfg.ServiceName == "" {
return errors.Wrapf(errdefs.ErrInvalidArgument, "missing service name in config %+v", cfg)
}
return nil
default:
return errors.Wrapf(errdefs.ErrInvalidArgument, "unsupported exporter: %+v", cfg)
}
}
// Decode unmarshals a plugin specific configuration by plugin id
func (c *Config) Decode(p *plugin.Registration) (interface{}, error) {
id := p.URI()

143
tracing/plugin/otlp.go Normal file
View File

@ -0,0 +1,143 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"io"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/plugin"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"google.golang.org/grpc"
)
const exporterPlugin = "otlp"
func init() {
plugin.Register(&plugin.Registration{
ID: exporterPlugin,
Type: plugin.TracingProcessorPlugin,
Config: &OTLPConfig{},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
cfg := ic.Config.(*OTLPConfig)
if cfg.Endpoint == "" {
return nil, errors.Wrap(plugin.ErrSkipPlugin, "otlp endpoint not set")
}
dialOpts := []grpc.DialOption{grpc.WithBlock()}
if cfg.Insecure {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
exp, err := otlptracegrpc.New(ic.Context,
otlptracegrpc.WithEndpoint(cfg.Endpoint),
otlptracegrpc.WithDialOption(dialOpts...),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create otlp exporter")
}
return sdktrace.NewBatchSpanProcessor(exp), nil
},
})
plugin.Register(&plugin.Registration{
ID: "tracing",
Type: plugin.InternalPlugin,
Requires: []plugin.Type{plugin.TracingProcessorPlugin},
Config: &TraceConfig{ServiceName: "containerd"},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return newTracer(ic)
},
})
}
// OTLPConfig holds the configurations for the built-in otlp span processor
type OTLPConfig struct {
Endpoint string `toml:"endpoint"`
Insecure bool `toml:"insecure"`
}
// TraceConfig is the common configuration for open telemetry.
type TraceConfig struct {
ServiceName string `toml:"service_name"`
TraceSamplingRatio float64 `toml:"sampling_ratio"`
}
type closer struct {
close func() error
}
func (c *closer) Close() error {
return c.close()
}
// InitOpenTelemetry reads config and initializes otel middleware, sets the exporter
// propagator and global tracer provider
func newTracer(ic *plugin.InitContext) (io.Closer, error) {
ctx := ic.Context
config := ic.Config.(*TraceConfig)
res, err := resource.New(ctx,
resource.WithAttributes(
// Service name used to displace traces in backends
semconv.ServiceNameKey.String(config.ServiceName),
),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create resource")
}
opts := []sdktrace.TracerProviderOption{
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(config.TraceSamplingRatio)),
sdktrace.WithResource(res),
}
ls, err := ic.GetByType(plugin.TracingProcessorPlugin)
if err != nil {
return nil, errors.Wrap(err, "failed to get tracing processors")
}
procs := make([]sdktrace.SpanProcessor, 0, len(ls))
for id, pctx := range ls {
p, err := pctx.Instance()
if err != nil {
log.G(ctx).WithError(err).Errorf("Failed to init tracing processor %q", id)
continue
}
proc := p.(sdktrace.SpanProcessor)
opts = append(opts, sdktrace.WithSpanProcessor(proc))
procs = append(procs, proc)
}
provider := sdktrace.NewTracerProvider(opts...)
otel.SetTracerProvider(provider)
otel.SetTextMapPropagator(propagation.TraceContext{})
return &closer{close: func() error {
for _, p := range procs {
if err := p.Shutdown(ctx); err != nil {
return err
}
}
return nil
}}, nil
}

View File

@ -19,87 +19,17 @@ package tracing
import (
"context"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
)
// InitOpenTelemetry reads config and initializes otel middleware, sets the exporter
// propagator and global tracer provider
func InitOpenTelemetry(config *srvconfig.Config) (func(), error) {
ctx := context.Background()
// Check if tracing is configured
if config.OpenTelemetry == (srvconfig.OpenTelemetryConfig{}) {
logrus.Info("OpenTelemetry configuration not found, tracing is disabled")
return nil, nil
}
// Validate configuration
if err := config.OpenTelemetry.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid open telemetry configuration")
}
res, err := resource.New(ctx,
resource.WithAttributes(
// Service name used to displace traces in backends
semconv.ServiceNameKey.String(config.OpenTelemetry.ServiceName),
),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create resource")
}
// Configure OTLP trace exporter and set it up to connect to OpenTelemetry collector
// running on a local host.
ctrdTraceExporter, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithEndpoint(config.OpenTelemetry.ExporterEndpoint),
otlptracegrpc.WithDialOption(grpc.WithBlock()),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create trace exporter")
}
// Register the trace exporter with a TracerProvider, using a batch span
// process to aggregate spans before export.
ctrdBatchSpanProcessor := sdktrace.NewBatchSpanProcessor(ctrdTraceExporter)
ctrdTracerProvider := sdktrace.NewTracerProvider(
// We use TraceIDRatioBased sampling. Ratio read from config translated into following
// if sampling ratio < 0 it is interpreted as 0. If ratio >= 1, it will always sample.
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(config.OpenTelemetry.TraceSamplingRatio)),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(ctrdBatchSpanProcessor),
)
otel.SetTracerProvider(ctrdTracerProvider)
// set global propagator to tracecontext
otel.SetTextMapPropagator(propagation.TraceContext{})
return func() {
// Shutdown will flush any remaining spans and shut down the exporter.
err := ctrdTracerProvider.Shutdown(ctx)
if err != nil {
logrus.WithError(err).Errorf("failed to shutdown TracerProvider")
}
}, nil
}
// StartSpan starts child span in a context.
func StartSpan(ctx context.Context, opName string, opts ...trace.SpanStartOption) (trace.Span, context.Context) {
parentSpan := trace.SpanFromContext(ctx)
tracer := trace.NewNoopTracerProvider().Tracer("")
if parentSpan.SpanContext().IsValid() {
tracer = parentSpan.TracerProvider().Tracer("")
if parent := trace.SpanFromContext(ctx); parent != nil && parent.SpanContext().IsValid() {
ctx, span := parent.TracerProvider().Tracer("").Start(ctx, opName, opts...)
return span, ctx
}
ctx, span := tracer.Start(ctx, opName, opts...)
ctx, span := otel.Tracer("").Start(ctx, opName, opts...)
return span, ctx
}