diff --git a/integration/failpoint/cmd/containerd-shim-runc-fp-v1/plugin_linux.go b/integration/failpoint/cmd/containerd-shim-runc-fp-v1/plugin_linux.go index 322c64ed1..7183f5032 100644 --- a/integration/failpoint/cmd/containerd-shim-runc-fp-v1/plugin_linux.go +++ b/integration/failpoint/cmd/containerd-shim-runc-fp-v1/plugin_linux.go @@ -74,7 +74,7 @@ func init() { } var ( - _ = shim.TTRPCServerOptioner(&taskServiceWithFp{}) + _ = shim.TTRPCServerUnaryOptioner(&taskServiceWithFp{}) ) type taskServiceWithFp struct { @@ -87,7 +87,7 @@ func (s *taskServiceWithFp) RegisterTTRPC(server *ttrpc.Server) error { return nil } -func (s *taskServiceWithFp) UnaryInterceptor() ttrpc.UnaryServerInterceptor { +func (s *taskServiceWithFp) UnaryServerInterceptor() ttrpc.UnaryServerInterceptor { return func(ctx context.Context, unmarshal ttrpc.Unmarshaler, info *ttrpc.UnaryServerInfo, method ttrpc.Method) (interface{}, error) { methodName := filepath.Base(info.FullMethod) if fp, ok := s.fps[methodName]; ok { diff --git a/pkg/shim/shim.go b/pkg/shim/shim.go index 915b766fa..ad87092c1 100644 --- a/pkg/shim/shim.go +++ b/pkg/shim/shim.go @@ -43,7 +43,6 @@ import ( "github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/version" "github.com/containerd/log" - "github.com/containerd/otelttrpc" "github.com/containerd/plugin" "github.com/containerd/plugin/registry" "github.com/containerd/ttrpc" @@ -112,10 +111,12 @@ type TTRPCService interface { RegisterTTRPC(*ttrpc.Server) error } -type TTRPCServerOptioner interface { - TTRPCService +type TTRPCServerUnaryOptioner interface { + UnaryServerInterceptor() ttrpc.UnaryServerInterceptor +} - UnaryInterceptor() ttrpc.UnaryServerInterceptor +type TTRPCClientUnaryOptioner interface { + UnaryClientInterceptor() ttrpc.UnaryClientInterceptor } var ( @@ -249,13 +250,6 @@ func run(ctx context.Context, manager Manager, config Config) error { } ttrpcAddress := os.Getenv(ttrpcAddressEnv) - publisher, err := NewPublisher(ttrpcAddress, WithPublishTTRPCOpts( - ttrpc.WithUnaryClientInterceptor(otelttrpc.UnaryClientInterceptor()), - )) - if err != nil { - return err - } - defer publisher.Close() ctx = namespaces.WithNamespace(ctx, namespaceFlag) ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) @@ -333,7 +327,15 @@ func run(ctx context.Context, manager Manager, config Config) error { Type: plugins.EventPlugin, ID: "publisher", InitFn: func(ic *plugin.InitContext) (interface{}, error) { - return publisher, nil + return NewPublisher(ttrpcAddress, func(cfg *publisherConfig) { + p, _ := ic.GetByID(plugins.TTRPCPlugin, "otelttrpc") + if p == nil { + return + } + + opts := ttrpc.WithUnaryClientInterceptor(p.(TTRPCClientUnaryOptioner).UnaryClientInterceptor()) + WithPublishTTRPCOpts(opts)(cfg) + }) }, }) @@ -389,11 +391,12 @@ func run(ctx context.Context, manager Manager, config Config) error { if src, ok := instance.(TTRPCService); ok { log.G(ctx).WithField("id", pID).Debug("registering ttrpc service") ttrpcServices = append(ttrpcServices, src) - } - if src, ok := instance.(TTRPCServerOptioner); ok { - ttrpcUnaryInterceptors = append(ttrpcUnaryInterceptors, src.UnaryInterceptor()) + if src, ok := instance.(TTRPCServerUnaryOptioner); ok { + ttrpcUnaryInterceptors = append(ttrpcUnaryInterceptors, src.UnaryServerInterceptor()) + } + } } @@ -401,8 +404,6 @@ func run(ctx context.Context, manager Manager, config Config) error { return fmt.Errorf("required that ttrpc service") } - ttrpcUnaryInterceptors = append(ttrpcUnaryInterceptors, otelttrpc.UnaryServerInterceptor()) - unaryInterceptor := chainUnaryServerInterceptors(ttrpcUnaryInterceptors...) server, err := newServer(ttrpc.WithUnaryServerInterceptor(unaryInterceptor)) if err != nil { diff --git a/pkg/tracing/plugin/ttrpc.go b/pkg/tracing/plugin/ttrpc.go new file mode 100644 index 000000000..9383e411d --- /dev/null +++ b/pkg/tracing/plugin/ttrpc.go @@ -0,0 +1,47 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package plugin + +import ( + "github.com/containerd/containerd/v2/plugins" + "github.com/containerd/otelttrpc" + "github.com/containerd/plugin" + "github.com/containerd/plugin/registry" + "github.com/containerd/ttrpc" +) + +func init() { + const pluginName = "otelttrpc" + + registry.Register(&plugin.Registration{ + ID: pluginName, + Type: plugins.TTRPCPlugin, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + return otelttrpcopts{}, nil + }, + }) +} + +type otelttrpcopts struct{} + +func (otelttrpcopts) UnaryServerInterceptor() ttrpc.UnaryServerInterceptor { + return otelttrpc.UnaryServerInterceptor() +} + +func (otelttrpcopts) UnaryClientInterceptor() ttrpc.UnaryClientInterceptor { + return otelttrpc.UnaryClientInterceptor() +}