Merge pull request #10761 from cpuguy83/shim_remove_nethttp

More shim imports cleanup
This commit is contained in:
Phil Estes 2024-10-03 14:56:25 +00:00 committed by GitHub
commit 59ffbf4ce3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 163 additions and 55 deletions

View File

@ -96,7 +96,11 @@ GO_BUILDTAGS += ${DEBUG_TAGS}
ifneq ($(STATIC),) ifneq ($(STATIC),)
GO_BUILDTAGS += osusergo netgo static_build GO_BUILDTAGS += osusergo netgo static_build
endif endif
SHIM_GO_BUILDTAGS := $(GO_BUILDTAGS) no_grpc
GO_TAGS=$(if $(GO_BUILDTAGS),-tags "$(strip $(GO_BUILDTAGS))",) GO_TAGS=$(if $(GO_BUILDTAGS),-tags "$(strip $(GO_BUILDTAGS))",)
SHIM_GO_TAGS=$(if $(SHIM_GO_BUILDTAGS),-tags "$(strip $(SHIM_GO_BUILDTAGS))",)
GO_LDFLAGS=-ldflags '-X $(PKG)/version.Version=$(VERSION) -X $(PKG)/version.Revision=$(REVISION) -X $(PKG)/version.Package=$(PACKAGE) $(EXTRA_LDFLAGS) GO_LDFLAGS=-ldflags '-X $(PKG)/version.Version=$(VERSION) -X $(PKG)/version.Revision=$(REVISION) -X $(PKG)/version.Package=$(PACKAGE) $(EXTRA_LDFLAGS)
ifneq ($(STATIC),) ifneq ($(STATIC),)
@ -150,7 +154,6 @@ GOTEST ?= $(GO) test
OUTPUTDIR = $(join $(ROOTDIR), _output) OUTPUTDIR = $(join $(ROOTDIR), _output)
CRIDIR=$(OUTPUTDIR)/cri CRIDIR=$(OUTPUTDIR)/cri
SHIM_GO_TAGS := --tags no_grpc
.PHONY: clean all AUTHORS build binaries test integration generate protos check-protos coverage ci check help install uninstall vendor release static-release mandir install-man install-doc genman install-cri-deps cri-release cri-cni-release cri-integration install-deps bin/cri-integration.test remove-replace clean-vendor .PHONY: clean all AUTHORS build binaries test integration generate protos check-protos coverage ci check help install uninstall vendor release static-release mandir install-man install-doc genman install-cri-deps cri-release cri-cni-release cri-integration install-deps bin/cri-integration.test remove-replace clean-vendor
.DEFAULT: default .DEFAULT: default
@ -267,7 +270,7 @@ bin/gen-manpages: cmd/gen-manpages FORCE
bin/containerd-shim-runc-v2: cmd/containerd-shim-runc-v2 FORCE # set !cgo and omit pie for a static shim build: https://github.com/golang/go/issues/17789#issuecomment-258542220 bin/containerd-shim-runc-v2: cmd/containerd-shim-runc-v2 FORCE # set !cgo and omit pie for a static shim build: https://github.com/golang/go/issues/17789#issuecomment-258542220
@echo "$(WHALE) $@" @echo "$(WHALE) $@"
CGO_ENABLED=${SHIM_CGO_ENABLED} $(GO) build ${GO_BUILD_FLAGS} -o $@ ${SHIM_GO_LDFLAGS} ${GO_TAGS} ${SHIM_GO_TAGS} ./cmd/containerd-shim-runc-v2 CGO_ENABLED=${SHIM_CGO_ENABLED} $(GO) build ${GO_BUILD_FLAGS} -o $@ ${SHIM_GO_LDFLAGS} ${SHIM_GO_TAGS} ./cmd/containerd-shim-runc-v2
binaries: $(BINARIES) ## build binaries binaries: $(BINARIES) ## build binaries
@echo "$(WHALE) $@" @echo "$(WHALE) $@"

View File

@ -18,4 +18,7 @@
package main package main
import _ "github.com/containerd/containerd/v2/pkg/tracing/plugin" import (
_ "github.com/containerd/containerd/v2/internal/pprof"
_ "github.com/containerd/containerd/v2/pkg/tracing/plugin"
)

View File

@ -74,7 +74,7 @@ func init() {
} }
var ( var (
_ = shim.TTRPCServerOptioner(&taskServiceWithFp{}) _ = shim.TTRPCServerUnaryOptioner(&taskServiceWithFp{})
) )
type taskServiceWithFp struct { type taskServiceWithFp struct {
@ -87,7 +87,7 @@ func (s *taskServiceWithFp) RegisterTTRPC(server *ttrpc.Server) error {
return nil return nil
} }
func (s *taskServiceWithFp) UnaryInterceptor() ttrpc.UnaryServerInterceptor { func (s *taskServiceWithFp) UnaryServerInterceptor() ttrpc.UnaryServerInterceptor {
return func(ctx context.Context, unmarshal ttrpc.Unmarshaler, info *ttrpc.UnaryServerInfo, method ttrpc.Method) (interface{}, error) { return func(ctx context.Context, unmarshal ttrpc.Unmarshaler, info *ttrpc.UnaryServerInfo, method ttrpc.Method) (interface{}, error) {
methodName := filepath.Base(info.FullMethod) methodName := filepath.Base(info.FullMethod)
if fp, ok := s.fps[methodName]; ok { if fp, ok := s.fps[methodName]; ok {

55
internal/pprof/plugin.go Normal file
View File

@ -0,0 +1,55 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pprof
import (
"expvar"
"net/http"
"net/http/pprof"
"time"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
)
const pluginName = "pprof"
func init() {
registry.Register(&plugin.Registration{
ID: pluginName,
Type: plugins.HTTPHandler,
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return newHandler(), nil
},
})
}
func newHandler() *http.Server {
m := http.NewServeMux()
m.Handle("/debug/vars", expvar.Handler())
m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
return &http.Server{
Handler: m,
ReadHeaderTimeout: 5 * time.Minute,
}
}

View File

@ -20,13 +20,10 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"expvar"
"flag" "flag"
"fmt" "fmt"
"io" "io"
"net" "net"
"net/http"
"net/http/pprof"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
@ -43,7 +40,6 @@ import (
"github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/version" "github.com/containerd/containerd/v2/version"
"github.com/containerd/log" "github.com/containerd/log"
"github.com/containerd/otelttrpc"
"github.com/containerd/plugin" "github.com/containerd/plugin"
"github.com/containerd/plugin/registry" "github.com/containerd/plugin/registry"
"github.com/containerd/ttrpc" "github.com/containerd/ttrpc"
@ -112,10 +108,12 @@ type TTRPCService interface {
RegisterTTRPC(*ttrpc.Server) error RegisterTTRPC(*ttrpc.Server) error
} }
type TTRPCServerOptioner interface { type TTRPCServerUnaryOptioner interface {
TTRPCService UnaryServerInterceptor() ttrpc.UnaryServerInterceptor
}
UnaryInterceptor() ttrpc.UnaryServerInterceptor type TTRPCClientUnaryOptioner interface {
UnaryClientInterceptor() ttrpc.UnaryClientInterceptor
} }
var ( var (
@ -249,13 +247,6 @@ func run(ctx context.Context, manager Manager, config Config) error {
} }
ttrpcAddress := os.Getenv(ttrpcAddressEnv) ttrpcAddress := os.Getenv(ttrpcAddressEnv)
publisher, err := NewPublisher(ttrpcAddress, WithPublishTTRPCOpts(
ttrpc.WithUnaryClientInterceptor(otelttrpc.UnaryClientInterceptor()),
))
if err != nil {
return err
}
defer publisher.Close()
ctx = namespaces.WithNamespace(ctx, namespaceFlag) ctx = namespaces.WithNamespace(ctx, namespaceFlag)
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
@ -333,7 +324,15 @@ func run(ctx context.Context, manager Manager, config Config) error {
Type: plugins.EventPlugin, Type: plugins.EventPlugin,
ID: "publisher", ID: "publisher",
InitFn: func(ic *plugin.InitContext) (interface{}, error) { InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return publisher, nil return NewPublisher(ttrpcAddress, func(cfg *publisherConfig) {
p, _ := ic.GetByID(plugins.TTRPCPlugin, "otelttrpc")
if p == nil {
return
}
opts := ttrpc.WithUnaryClientInterceptor(p.(TTRPCClientUnaryOptioner).UnaryClientInterceptor())
WithPublishTTRPCOpts(opts)(cfg)
})
}, },
}) })
@ -342,6 +341,8 @@ func run(ctx context.Context, manager Manager, config Config) error {
ttrpcServices = []TTRPCService{} ttrpcServices = []TTRPCService{}
ttrpcUnaryInterceptors = []ttrpc.UnaryServerInterceptor{} ttrpcUnaryInterceptors = []ttrpc.UnaryServerInterceptor{}
pprofHandler server
) )
for _, p := range registry.Graph(func(*plugin.Registration) bool { return false }) { for _, p := range registry.Graph(func(*plugin.Registration) bool { return false }) {
@ -389,11 +390,16 @@ func run(ctx context.Context, manager Manager, config Config) error {
if src, ok := instance.(TTRPCService); ok { if src, ok := instance.(TTRPCService); ok {
log.G(ctx).WithField("id", pID).Debug("registering ttrpc service") log.G(ctx).WithField("id", pID).Debug("registering ttrpc service")
ttrpcServices = append(ttrpcServices, src) ttrpcServices = append(ttrpcServices, src)
} }
if src, ok := instance.(TTRPCServerOptioner); ok { if src, ok := instance.(TTRPCServerUnaryOptioner); ok {
ttrpcUnaryInterceptors = append(ttrpcUnaryInterceptors, src.UnaryInterceptor()) ttrpcUnaryInterceptors = append(ttrpcUnaryInterceptors, src.UnaryServerInterceptor())
}
if result.Registration.ID == "pprof" {
if src, ok := instance.(server); ok {
pprofHandler = src
}
} }
} }
@ -401,8 +407,6 @@ func run(ctx context.Context, manager Manager, config Config) error {
return fmt.Errorf("required that ttrpc service") return fmt.Errorf("required that ttrpc service")
} }
ttrpcUnaryInterceptors = append(ttrpcUnaryInterceptors, otelttrpc.UnaryServerInterceptor())
unaryInterceptor := chainUnaryServerInterceptors(ttrpcUnaryInterceptors...) unaryInterceptor := chainUnaryServerInterceptors(ttrpcUnaryInterceptors...)
server, err := newServer(ttrpc.WithUnaryServerInterceptor(unaryInterceptor)) server, err := newServer(ttrpc.WithUnaryServerInterceptor(unaryInterceptor))
if err != nil { if err != nil {
@ -415,7 +419,7 @@ func run(ctx context.Context, manager Manager, config Config) error {
} }
} }
if err := serve(ctx, server, signals, sd.Shutdown); err != nil { if err := serve(ctx, server, signals, sd.Shutdown, pprofHandler); err != nil {
if !errors.Is(err, shutdown.ErrShutdown) { if !errors.Is(err, shutdown.ErrShutdown) {
cleanupSockets(ctx) cleanupSockets(ctx)
return err return err
@ -436,7 +440,7 @@ func run(ctx context.Context, manager Manager, config Config) error {
// serve serves the ttrpc API over a unix socket in the current working directory // serve serves the ttrpc API over a unix socket in the current working directory
// and blocks until the context is canceled // and blocks until the context is canceled
func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal, shutdown func()) error { func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal, shutdown func(), pprof server) error {
dump := make(chan os.Signal, 32) dump := make(chan os.Signal, 32)
setupDumpStacks(dump) setupDumpStacks(dump)
@ -456,9 +460,9 @@ func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal, sh
} }
}() }()
if debugFlag { if debugFlag && pprof != nil {
if err := serveDebug(ctx); err != nil { if err := setupPprof(ctx, pprof); err != nil {
return err log.G(ctx).WithError(err).Warn("Could not setup pprof")
} }
} }
@ -477,31 +481,6 @@ func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal, sh
return reap(ctx, logger, signals) return reap(ctx, logger, signals)
} }
func serveDebug(ctx context.Context) error {
l, err := serveListener(debugSocketFlag, 4)
if err != nil {
return err
}
go func() {
defer l.Close()
m := http.NewServeMux()
m.Handle("/debug/vars", expvar.Handler())
m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
srv := &http.Server{
Handler: m,
ReadHeaderTimeout: 5 * time.Minute,
}
if err := srv.Serve(l); err != nil && !errors.Is(err, net.ErrClosed) {
log.G(ctx).WithError(err).Fatal("containerd-shim: pprof endpoint failure")
}
}()
return nil
}
func dumpStacks(logger *log.Entry) { func dumpStacks(logger *log.Entry) {
var ( var (
buf []byte buf []byte
@ -516,3 +495,22 @@ func dumpStacks(logger *log.Entry) {
buf = buf[:stackSize] buf = buf[:stackSize]
logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
} }
type server interface {
Serve(net.Listener) error
}
func setupPprof(ctx context.Context, srv server) error {
l, err := serveListener(debugSocketFlag, 4)
if err != nil {
return fmt.Errorf("could not setup pprof listener: %w", err)
}
go func() {
if err := srv.Serve(l); err != nil && !errors.Is(err, net.ErrClosed) {
log.G(ctx).WithError(err).Fatal("containerd-shim: pprof endpoint failure")
}
}()
return nil
}

View File

@ -0,0 +1,47 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/otelttrpc"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
"github.com/containerd/ttrpc"
)
func init() {
const pluginName = "otelttrpc"
registry.Register(&plugin.Registration{
ID: pluginName,
Type: plugins.TTRPCPlugin,
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return otelttrpcopts{}, nil
},
})
}
type otelttrpcopts struct{}
func (otelttrpcopts) UnaryServerInterceptor() ttrpc.UnaryServerInterceptor {
return otelttrpc.UnaryServerInterceptor()
}
func (otelttrpcopts) UnaryClientInterceptor() ttrpc.UnaryClientInterceptor {
return otelttrpc.UnaryClientInterceptor()
}

View File

@ -73,6 +73,8 @@ const (
CRIServicePlugin plugin.Type = "io.containerd.cri.v1" CRIServicePlugin plugin.Type = "io.containerd.cri.v1"
// ShimPlugin implements a shim service // ShimPlugin implements a shim service
ShimPlugin plugin.Type = "io.containerd.shim.v1" ShimPlugin plugin.Type = "io.containerd.shim.v1"
// HTTPHandler implements an http handler
HTTPHandler plugin.Type = "io.containerd.http.v1"
) )
const ( const (