shim: Move ttrpc interceptors to plugins
This makes it so we don't need to import otelttrpc unless the shim is compiled with the `shim_tracing` build tag. This way otel is no longer compiled into the binary at all unless `shim_tracing` is set. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
parent
6ffdabf725
commit
b2681dfbdb
@ -74,7 +74,7 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
_ = shim.TTRPCServerOptioner(&taskServiceWithFp{})
|
_ = shim.TTRPCServerUnaryOptioner(&taskServiceWithFp{})
|
||||||
)
|
)
|
||||||
|
|
||||||
type taskServiceWithFp struct {
|
type taskServiceWithFp struct {
|
||||||
@ -87,7 +87,7 @@ func (s *taskServiceWithFp) RegisterTTRPC(server *ttrpc.Server) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *taskServiceWithFp) UnaryInterceptor() ttrpc.UnaryServerInterceptor {
|
func (s *taskServiceWithFp) UnaryServerInterceptor() ttrpc.UnaryServerInterceptor {
|
||||||
return func(ctx context.Context, unmarshal ttrpc.Unmarshaler, info *ttrpc.UnaryServerInfo, method ttrpc.Method) (interface{}, error) {
|
return func(ctx context.Context, unmarshal ttrpc.Unmarshaler, info *ttrpc.UnaryServerInfo, method ttrpc.Method) (interface{}, error) {
|
||||||
methodName := filepath.Base(info.FullMethod)
|
methodName := filepath.Base(info.FullMethod)
|
||||||
if fp, ok := s.fps[methodName]; ok {
|
if fp, ok := s.fps[methodName]; ok {
|
||||||
|
@ -43,7 +43,6 @@ import (
|
|||||||
"github.com/containerd/containerd/v2/plugins"
|
"github.com/containerd/containerd/v2/plugins"
|
||||||
"github.com/containerd/containerd/v2/version"
|
"github.com/containerd/containerd/v2/version"
|
||||||
"github.com/containerd/log"
|
"github.com/containerd/log"
|
||||||
"github.com/containerd/otelttrpc"
|
|
||||||
"github.com/containerd/plugin"
|
"github.com/containerd/plugin"
|
||||||
"github.com/containerd/plugin/registry"
|
"github.com/containerd/plugin/registry"
|
||||||
"github.com/containerd/ttrpc"
|
"github.com/containerd/ttrpc"
|
||||||
@ -112,10 +111,12 @@ type TTRPCService interface {
|
|||||||
RegisterTTRPC(*ttrpc.Server) error
|
RegisterTTRPC(*ttrpc.Server) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type TTRPCServerOptioner interface {
|
type TTRPCServerUnaryOptioner interface {
|
||||||
TTRPCService
|
UnaryServerInterceptor() ttrpc.UnaryServerInterceptor
|
||||||
|
}
|
||||||
|
|
||||||
UnaryInterceptor() ttrpc.UnaryServerInterceptor
|
type TTRPCClientUnaryOptioner interface {
|
||||||
|
UnaryClientInterceptor() ttrpc.UnaryClientInterceptor
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -249,13 +250,6 @@ func run(ctx context.Context, manager Manager, config Config) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
|
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
|
||||||
publisher, err := NewPublisher(ttrpcAddress, WithPublishTTRPCOpts(
|
|
||||||
ttrpc.WithUnaryClientInterceptor(otelttrpc.UnaryClientInterceptor()),
|
|
||||||
))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer publisher.Close()
|
|
||||||
|
|
||||||
ctx = namespaces.WithNamespace(ctx, namespaceFlag)
|
ctx = namespaces.WithNamespace(ctx, namespaceFlag)
|
||||||
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
|
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
|
||||||
@ -333,7 +327,15 @@ func run(ctx context.Context, manager Manager, config Config) error {
|
|||||||
Type: plugins.EventPlugin,
|
Type: plugins.EventPlugin,
|
||||||
ID: "publisher",
|
ID: "publisher",
|
||||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||||
return publisher, nil
|
return NewPublisher(ttrpcAddress, func(cfg *publisherConfig) {
|
||||||
|
p, _ := ic.GetByID(plugins.TTRPCPlugin, "otelttrpc")
|
||||||
|
if p == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := ttrpc.WithUnaryClientInterceptor(p.(TTRPCClientUnaryOptioner).UnaryClientInterceptor())
|
||||||
|
WithPublishTTRPCOpts(opts)(cfg)
|
||||||
|
})
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -389,11 +391,12 @@ func run(ctx context.Context, manager Manager, config Config) error {
|
|||||||
if src, ok := instance.(TTRPCService); ok {
|
if src, ok := instance.(TTRPCService); ok {
|
||||||
log.G(ctx).WithField("id", pID).Debug("registering ttrpc service")
|
log.G(ctx).WithField("id", pID).Debug("registering ttrpc service")
|
||||||
ttrpcServices = append(ttrpcServices, src)
|
ttrpcServices = append(ttrpcServices, src)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if src, ok := instance.(TTRPCServerOptioner); ok {
|
if src, ok := instance.(TTRPCServerUnaryOptioner); ok {
|
||||||
ttrpcUnaryInterceptors = append(ttrpcUnaryInterceptors, src.UnaryInterceptor())
|
ttrpcUnaryInterceptors = append(ttrpcUnaryInterceptors, src.UnaryServerInterceptor())
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -401,8 +404,6 @@ func run(ctx context.Context, manager Manager, config Config) error {
|
|||||||
return fmt.Errorf("required that ttrpc service")
|
return fmt.Errorf("required that ttrpc service")
|
||||||
}
|
}
|
||||||
|
|
||||||
ttrpcUnaryInterceptors = append(ttrpcUnaryInterceptors, otelttrpc.UnaryServerInterceptor())
|
|
||||||
|
|
||||||
unaryInterceptor := chainUnaryServerInterceptors(ttrpcUnaryInterceptors...)
|
unaryInterceptor := chainUnaryServerInterceptors(ttrpcUnaryInterceptors...)
|
||||||
server, err := newServer(ttrpc.WithUnaryServerInterceptor(unaryInterceptor))
|
server, err := newServer(ttrpc.WithUnaryServerInterceptor(unaryInterceptor))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
47
pkg/tracing/plugin/ttrpc.go
Normal file
47
pkg/tracing/plugin/ttrpc.go
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
/*
|
||||||
|
Copyright The containerd Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package plugin
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/containerd/containerd/v2/plugins"
|
||||||
|
"github.com/containerd/otelttrpc"
|
||||||
|
"github.com/containerd/plugin"
|
||||||
|
"github.com/containerd/plugin/registry"
|
||||||
|
"github.com/containerd/ttrpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
const pluginName = "otelttrpc"
|
||||||
|
|
||||||
|
registry.Register(&plugin.Registration{
|
||||||
|
ID: pluginName,
|
||||||
|
Type: plugins.TTRPCPlugin,
|
||||||
|
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||||
|
return otelttrpcopts{}, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type otelttrpcopts struct{}
|
||||||
|
|
||||||
|
func (otelttrpcopts) UnaryServerInterceptor() ttrpc.UnaryServerInterceptor {
|
||||||
|
return otelttrpc.UnaryServerInterceptor()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (otelttrpcopts) UnaryClientInterceptor() ttrpc.UnaryClientInterceptor {
|
||||||
|
return otelttrpc.UnaryClientInterceptor()
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user