tracing: support OTLP/HTTP in addition to gRPC

This change adds OTLP/HTTP, specifically http/protobuf support.

http/protobuf is recommended in
https://github.com/open-telemetry/opentelemetry-specification/blob/v1.8.0/specification/protocol/exporter.md.

However kube-apiserver and CRI-O use gRPC, kubelet may support
gRPC in future. So we should support gRPC as well.

Signed-off-by: Kazuyoshi Kato <katokazu@amazon.com>
This commit is contained in:
Kazuyoshi Kato
2022-01-19 00:57:02 +00:00
parent a43703fcba
commit e751f1f44f
13 changed files with 1041 additions and 27 deletions

View File

@@ -20,24 +20,24 @@ import (
"context"
"fmt"
"io"
"net/url"
"time"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/plugin"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"google.golang.org/grpc"
)
const exporterPlugin = "otlp"
func init() {
const timeout = 5 * time.Second
plugin.Register(&plugin.Registration{
ID: exporterPlugin,
Type: plugin.TracingProcessorPlugin,
@@ -45,28 +45,9 @@ func init() {
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
cfg := ic.Config.(*OTLPConfig)
if cfg.Endpoint == "" {
return nil, fmt.Errorf("otlp endpoint not set: %w", plugin.ErrSkipPlugin)
return nil, fmt.Errorf("no OpenTelemetry endpoint: %w", plugin.ErrSkipPlugin)
}
opts := []otlptracegrpc.Option{
otlptracegrpc.WithEndpoint(cfg.Endpoint),
otlptracegrpc.WithDialOption(
grpc.WithBlock(),
grpc.WithReturnConnectionError(),
),
}
if cfg.Insecure {
opts = append(opts, otlptracegrpc.WithInsecure())
}
ctx, cancel := context.WithTimeout(ic.Context, timeout)
defer cancel()
exp, err := otlptracegrpc.New(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create otlp exporter: %w", err)
}
return sdktrace.NewBatchSpanProcessor(exp), nil
return newExporter(ic.Context, cfg)
},
})
plugin.Register(&plugin.Registration{
@@ -83,6 +64,7 @@ func init() {
// OTLPConfig holds the configurations for the built-in otlp span processor
type OTLPConfig struct {
Endpoint string `toml:"endpoint"`
Protocol string `toml:"protocol"`
Insecure bool `toml:"insecure"`
}
@@ -100,8 +82,46 @@ func (c *closer) Close() error {
return c.close()
}
// InitOpenTelemetry reads config and initializes otel middleware, sets the exporter
// propagator and global tracer provider
// newExporter creates an exporter based on the given configuration.
//
// The default protocol is http/protobuf since it is recommended by
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.8.0/specification/protocol/exporter.md#specify-protocol.
func newExporter(ctx context.Context, cfg *OTLPConfig) (*otlptrace.Exporter, error) {
const timeout = 5 * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if cfg.Protocol == "http/protobuf" || cfg.Protocol == "" {
u, err := url.Parse(cfg.Endpoint)
if err != nil {
return nil, fmt.Errorf("OpenTelemetry endpoint %q is invalid: %w", cfg.Endpoint, err)
}
opts := []otlptracehttp.Option{
otlptracehttp.WithEndpoint(u.Host),
}
if u.Scheme == "http" {
opts = append(opts, otlptracehttp.WithInsecure())
}
return otlptracehttp.New(ctx, opts...)
} else if cfg.Protocol == "grpc" {
opts := []otlptracegrpc.Option{
otlptracegrpc.WithEndpoint(cfg.Endpoint),
}
if cfg.Insecure {
opts = append(opts, otlptracegrpc.WithInsecure())
}
return otlptracegrpc.New(ctx, opts...)
} else {
// Other protocols such as "http/json" are not supported.
return nil, fmt.Errorf("OpenTelemetry protocol %q is not supported", cfg.Protocol)
}
}
// newTracer configures protocol-agonostic tracing settings such as
// its sampling ratio and returns io.Closer.
//
// Note that this function sets process-wide tracing configuration.
func newTracer(ic *plugin.InitContext) (io.Closer, error) {
ctx := ic.Context
config := ic.Config.(*TraceConfig)
@@ -130,7 +150,7 @@ func newTracer(ic *plugin.InitContext) (io.Closer, error) {
for id, pctx := range ls {
p, err := pctx.Instance()
if err != nil {
log.G(ctx).WithError(err).Errorf("Failed to init tracing processor %q", id)
log.G(ctx).WithError(err).Errorf("failed to initialize a tracing processor %q", id)
continue
}
proc := p.(sdktrace.SpanProcessor)