containerd/runtime/v2/shim/shim.go
Wei Fu d895118c7c runtime/v2/runc: fix leaking socket path
When runC shimv2 starts, the StartShim interface will re-exec itself as
long-running process, which will read the `address` during initializing.

```happycase
Process

containerd-shim-runc-v1/v2 start             containerd-shim-runc-v1/v2

	initializing socket

	reexec containerd-shim-runc-v1/v2

	write address into file

						initializing

							read address

	write back to containerd daemon

						serving

						...

						remove address in Shutdown call
```

However, there is no synchronization after reexec. Then the data race is
like:

```leaking-case
Process

containerd-shim-runc-v1/v2 start             containerd-shim-runc-v1/v2

	initializing socket

	reexec containerd-shim-runc-v1/v2

						initializing

							read address

	write address into file

	write back to containerd daemon

						serving

						...

						fail to remove address
						because of empty address
```

The `address` should be writen into file first before reexec.

And if shutdown the whole service before cleanup temporary
resource (like socket file), the Shutdown caller will receive `ttrpc: closed`
sometime, which depends on go runtime scheduler. Then it also causes leaking
socket files.

Since the shimV2-Delete binary API must be called to cleanup shim temporary
resource and shimV2-runC-v1 doesn't support grouping multi containers in one,
it is safe to remove the socket file in the binary call for shimV2-runC-v1.
But for the shimV2-runC-v2 shim, we still cleanup socket in Shutdown.
Hopefully we can find a way to cleanup socket in shimV2-Delete binary
call.

Fix: #5173

Signed-off-by: Wei Fu <fuweid89@gmail.com>
2021-03-15 18:32:00 +08:00

332 lines
8.1 KiB
Go

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"flag"
"fmt"
"io"
"os"
"runtime"
"runtime/debug"
"strings"
"time"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
shimapi "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/containerd/version"
"github.com/containerd/ttrpc"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// Client for a shim server
type Client struct {
service shimapi.TaskService
context context.Context
signals chan os.Signal
}
// Publisher for events
type Publisher interface {
events.Publisher
io.Closer
}
// Init func for the creation of a shim server
type Init func(context.Context, string, Publisher, func()) (Shim, error)
// Shim server interface
type Shim interface {
shimapi.TaskService
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error)
}
// OptsKey is the context key for the Opts value.
type OptsKey struct{}
// Opts are context options associated with the shim invocation.
type Opts struct {
BundlePath string
Debug bool
}
// BinaryOpts allows the configuration of a shims binary setup
type BinaryOpts func(*Config)
// Config of shim binary options provided by shim implementations
type Config struct {
// NoSubreaper disables setting the shim as a child subreaper
NoSubreaper bool
// NoReaper disables the shim binary from reaping any child process implicitly
NoReaper bool
// NoSetupLogger disables automatic configuration of logrus to use the shim FIFO
NoSetupLogger bool
}
var (
debugFlag bool
versionFlag bool
idFlag string
namespaceFlag string
socketFlag string
bundlePath string
addressFlag string
containerdBinaryFlag string
action string
)
const (
ttrpcAddressEnv = "TTRPC_ADDRESS"
)
func parseFlags() {
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
flag.BoolVar(&versionFlag, "v", false, "show the shim version and exit")
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
flag.StringVar(&idFlag, "id", "", "id of the task")
flag.StringVar(&socketFlag, "socket", "", "socket path to serve")
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
flag.Parse()
action = flag.Arg(0)
}
func setRuntime() {
debug.SetGCPercent(40)
go func() {
for range time.Tick(30 * time.Second) {
debug.FreeOSMemory()
}
}()
if os.Getenv("GOMAXPROCS") == "" {
// If GOMAXPROCS hasn't been set, we default to a value of 2 to reduce
// the number of Go stacks present in the shim.
runtime.GOMAXPROCS(2)
}
}
func setLogger(ctx context.Context, id string) error {
logrus.SetFormatter(&logrus.TextFormatter{
TimestampFormat: log.RFC3339NanoFixed,
FullTimestamp: true,
})
if debugFlag {
logrus.SetLevel(logrus.DebugLevel)
}
f, err := openLog(ctx, id)
if err != nil {
return err
}
logrus.SetOutput(f)
return nil
}
// Run initializes and runs a shim server
func Run(id string, initFunc Init, opts ...BinaryOpts) {
var config Config
for _, o := range opts {
o(&config)
}
if err := run(id, initFunc, config); err != nil {
fmt.Fprintf(os.Stderr, "%s: %s\n", id, err)
os.Exit(1)
}
}
func run(id string, initFunc Init, config Config) error {
parseFlags()
if versionFlag {
fmt.Printf("%s:\n", os.Args[0])
fmt.Println(" Version: ", version.Version)
fmt.Println(" Revision:", version.Revision)
fmt.Println(" Go version:", version.GoVersion)
fmt.Println("")
return nil
}
setRuntime()
signals, err := setupSignals(config)
if err != nil {
return err
}
if !config.NoSubreaper {
if err := subreaper(); err != nil {
return err
}
}
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
publisher, err := NewPublisher(ttrpcAddress)
if err != nil {
return err
}
defer publisher.Close()
if namespaceFlag == "" {
return fmt.Errorf("shim namespace cannot be empty")
}
ctx := namespaces.WithNamespace(context.Background(), namespaceFlag)
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id))
ctx, cancel := context.WithCancel(ctx)
service, err := initFunc(ctx, idFlag, publisher, cancel)
if err != nil {
return err
}
switch action {
case "delete":
logger := logrus.WithFields(logrus.Fields{
"pid": os.Getpid(),
"namespace": namespaceFlag,
})
go handleSignals(ctx, logger, signals)
response, err := service.Cleanup(ctx)
if err != nil {
return err
}
data, err := proto.Marshal(response)
if err != nil {
return err
}
if _, err := os.Stdout.Write(data); err != nil {
return err
}
return nil
case "start":
address, err := service.StartShim(ctx, idFlag, containerdBinaryFlag, addressFlag, ttrpcAddress)
if err != nil {
return err
}
if _, err := os.Stdout.WriteString(address); err != nil {
return err
}
return nil
default:
if !config.NoSetupLogger {
if err := setLogger(ctx, idFlag); err != nil {
return err
}
}
client := NewShimClient(ctx, service, signals)
if err := client.Serve(); err != nil {
if err != context.Canceled {
return err
}
}
// NOTE: If the shim server is down(like oom killer), the address
// socket might be leaking.
if address, err := ReadAddress("address"); err == nil {
_ = RemoveSocket(address)
}
select {
case <-publisher.Done():
return nil
case <-time.After(5 * time.Second):
return errors.New("publisher not closed")
}
}
}
// NewShimClient creates a new shim server client
func NewShimClient(ctx context.Context, svc shimapi.TaskService, signals chan os.Signal) *Client {
s := &Client{
service: svc,
context: ctx,
signals: signals,
}
return s
}
// Serve the shim server
func (s *Client) Serve() error {
dump := make(chan os.Signal, 32)
setupDumpStacks(dump)
path, err := os.Getwd()
if err != nil {
return err
}
server, err := newServer()
if err != nil {
return errors.Wrap(err, "failed creating server")
}
logrus.Debug("registering ttrpc server")
shimapi.RegisterTaskService(server, s.service)
if err := serve(s.context, server, socketFlag); err != nil {
return err
}
logger := logrus.WithFields(logrus.Fields{
"pid": os.Getpid(),
"path": path,
"namespace": namespaceFlag,
})
go func() {
for range dump {
dumpStacks(logger)
}
}()
return handleSignals(s.context, logger, s.signals)
}
// serve serves the ttrpc API over a unix socket at the provided path
// this function does not block
func serve(ctx context.Context, server *ttrpc.Server, path string) error {
l, err := serveListener(path)
if err != nil {
return err
}
go func() {
defer l.Close()
if err := server.Serve(ctx, l); err != nil &&
!strings.Contains(err.Error(), "use of closed network connection") {
logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure")
}
}()
return nil
}
func dumpStacks(logger *logrus.Entry) {
var (
buf []byte
stackSize int
)
bufferLen := 16384
for stackSize == len(buf) {
buf = make([]byte, bufferLen)
stackSize = runtime.Stack(buf, true)
bufferLen *= 2
}
buf = buf[:stackSize]
logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
}