526 lines
13 KiB
Go
526 lines
13 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package shim
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"runtime/debug"
|
|
"time"
|
|
|
|
shimapi "github.com/containerd/containerd/api/runtime/task/v2"
|
|
"github.com/containerd/containerd/events"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/namespaces"
|
|
"github.com/containerd/containerd/pkg/shutdown"
|
|
"github.com/containerd/containerd/plugin"
|
|
"github.com/containerd/containerd/protobuf"
|
|
"github.com/containerd/containerd/protobuf/proto"
|
|
"github.com/containerd/containerd/version"
|
|
"github.com/containerd/ttrpc"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// Publisher for events
|
|
type Publisher interface {
|
|
events.Publisher
|
|
io.Closer
|
|
}
|
|
|
|
// StartOpts describes shim start configuration received from containerd
|
|
type StartOpts struct {
|
|
ID string // TODO(2.0): Remove ID, passed directly to start for call symmetry
|
|
ContainerdBinary string // TODO(2.0): Remove ContainerdBinary, use the TTRPC_ADDRESS env to forward events
|
|
Address string
|
|
TTRPCAddress string
|
|
Debug bool
|
|
}
|
|
|
|
type StopStatus struct {
|
|
Pid int
|
|
ExitStatus int
|
|
ExitedAt time.Time
|
|
}
|
|
|
|
// Init func for the creation of a shim server
|
|
// TODO(2.0): Remove init function
|
|
type Init func(context.Context, string, Publisher, func()) (Shim, error)
|
|
|
|
// Shim server interface
|
|
// TODO(2.0): Remove unified shim interface
|
|
type Shim interface {
|
|
shimapi.TaskService
|
|
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
|
|
StartShim(ctx context.Context, opts StartOpts) (string, error)
|
|
}
|
|
|
|
// Manager is the interface which manages the shim process
|
|
type Manager interface {
|
|
Name() string
|
|
Start(ctx context.Context, id string, opts StartOpts) (string, error)
|
|
Stop(ctx context.Context, id string) (StopStatus, error)
|
|
}
|
|
|
|
// OptsKey is the context key for the Opts value.
|
|
type OptsKey struct{}
|
|
|
|
// Opts are context options associated with the shim invocation.
|
|
type Opts struct {
|
|
BundlePath string
|
|
Debug bool
|
|
}
|
|
|
|
// BinaryOpts allows the configuration of a shims binary setup
|
|
type BinaryOpts func(*Config)
|
|
|
|
// Config of shim binary options provided by shim implementations
|
|
type Config struct {
|
|
// NoSubreaper disables setting the shim as a child subreaper
|
|
NoSubreaper bool
|
|
// NoReaper disables the shim binary from reaping any child process implicitly
|
|
NoReaper bool
|
|
// NoSetupLogger disables automatic configuration of logrus to use the shim FIFO
|
|
NoSetupLogger bool
|
|
}
|
|
|
|
type ttrpcService interface {
|
|
RegisterTTRPC(*ttrpc.Server) error
|
|
}
|
|
|
|
type ttrpcServerOptioner interface {
|
|
ttrpcService
|
|
|
|
UnaryInterceptor() ttrpc.UnaryServerInterceptor
|
|
}
|
|
|
|
type taskService struct {
|
|
shimapi.TaskService
|
|
}
|
|
|
|
func (t taskService) RegisterTTRPC(server *ttrpc.Server) error {
|
|
shimapi.RegisterTaskService(server, t.TaskService)
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
debugFlag bool
|
|
versionFlag bool
|
|
id string
|
|
namespaceFlag string
|
|
socketFlag string
|
|
bundlePath string
|
|
addressFlag string
|
|
containerdBinaryFlag string
|
|
action string
|
|
)
|
|
|
|
const (
|
|
ttrpcAddressEnv = "TTRPC_ADDRESS"
|
|
)
|
|
|
|
func parseFlags() {
|
|
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
|
|
flag.BoolVar(&versionFlag, "v", false, "show the shim version and exit")
|
|
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
|
|
flag.StringVar(&id, "id", "", "id of the task")
|
|
flag.StringVar(&socketFlag, "socket", "", "socket path to serve")
|
|
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
|
|
|
|
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
|
|
flag.StringVar(&containerdBinaryFlag, "publish-binary", "",
|
|
fmt.Sprintf("path to publish binary (used for publishing events), but %s will ignore this flag, please use the %s env", os.Args[0], ttrpcAddressEnv),
|
|
)
|
|
|
|
flag.Parse()
|
|
action = flag.Arg(0)
|
|
}
|
|
|
|
func setRuntime() {
|
|
debug.SetGCPercent(40)
|
|
go func() {
|
|
for range time.Tick(30 * time.Second) {
|
|
debug.FreeOSMemory()
|
|
}
|
|
}()
|
|
if os.Getenv("GOMAXPROCS") == "" {
|
|
// If GOMAXPROCS hasn't been set, we default to a value of 2 to reduce
|
|
// the number of Go stacks present in the shim.
|
|
runtime.GOMAXPROCS(2)
|
|
}
|
|
}
|
|
|
|
func setLogger(ctx context.Context, id string) (context.Context, error) {
|
|
l := log.G(ctx)
|
|
l.Logger.SetFormatter(&logrus.TextFormatter{
|
|
TimestampFormat: log.RFC3339NanoFixed,
|
|
FullTimestamp: true,
|
|
})
|
|
if debugFlag {
|
|
l.Logger.SetLevel(logrus.DebugLevel)
|
|
}
|
|
f, err := openLog(ctx, id)
|
|
if err != nil {
|
|
return ctx, err
|
|
}
|
|
l.Logger.SetOutput(f)
|
|
return log.WithLogger(ctx, l), nil
|
|
}
|
|
|
|
// Run initializes and runs a shim server
|
|
// TODO(2.0): Remove function
|
|
func Run(name string, initFunc Init, opts ...BinaryOpts) {
|
|
var config Config
|
|
for _, o := range opts {
|
|
o(&config)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", name))
|
|
|
|
if err := run(ctx, nil, initFunc, name, config); err != nil {
|
|
fmt.Fprintf(os.Stderr, "%s: %s", name, err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
// TODO(2.0): Remove this type
|
|
type shimToManager struct {
|
|
shim Shim
|
|
name string
|
|
}
|
|
|
|
func (stm shimToManager) Name() string {
|
|
return stm.name
|
|
}
|
|
|
|
func (stm shimToManager) Start(ctx context.Context, id string, opts StartOpts) (string, error) {
|
|
opts.ID = id
|
|
return stm.shim.StartShim(ctx, opts)
|
|
}
|
|
|
|
func (stm shimToManager) Stop(ctx context.Context, id string) (StopStatus, error) {
|
|
// shim must already have id
|
|
dr, err := stm.shim.Cleanup(ctx)
|
|
if err != nil {
|
|
return StopStatus{}, err
|
|
}
|
|
return StopStatus{
|
|
Pid: int(dr.Pid),
|
|
ExitStatus: int(dr.ExitStatus),
|
|
ExitedAt: protobuf.FromTimestamp(dr.ExitedAt),
|
|
}, nil
|
|
}
|
|
|
|
// RunManager initialzes and runs a shim server
|
|
// TODO(2.0): Rename to Run
|
|
func RunManager(ctx context.Context, manager Manager, opts ...BinaryOpts) {
|
|
var config Config
|
|
for _, o := range opts {
|
|
o(&config)
|
|
}
|
|
|
|
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", manager.Name()))
|
|
|
|
if err := run(ctx, manager, nil, "", config); err != nil {
|
|
fmt.Fprintf(os.Stderr, "%s: %s", manager.Name(), err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func run(ctx context.Context, manager Manager, initFunc Init, name string, config Config) error {
|
|
parseFlags()
|
|
if versionFlag {
|
|
fmt.Printf("%s:\n", filepath.Base(os.Args[0]))
|
|
fmt.Println(" Version: ", version.Version)
|
|
fmt.Println(" Revision:", version.Revision)
|
|
fmt.Println(" Go version:", version.GoVersion)
|
|
fmt.Println("")
|
|
return nil
|
|
}
|
|
|
|
if namespaceFlag == "" {
|
|
return fmt.Errorf("shim namespace cannot be empty")
|
|
}
|
|
|
|
setRuntime()
|
|
|
|
signals, err := setupSignals(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !config.NoSubreaper {
|
|
if err := subreaper(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
|
|
publisher, err := NewPublisher(ttrpcAddress)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer publisher.Close()
|
|
|
|
ctx = namespaces.WithNamespace(ctx, namespaceFlag)
|
|
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
|
|
ctx, sd := shutdown.WithShutdown(ctx)
|
|
defer sd.Shutdown()
|
|
|
|
if manager == nil {
|
|
service, err := initFunc(ctx, id, publisher, sd.Shutdown)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
plugin.Register(&plugin.Registration{
|
|
Type: plugin.TTRPCPlugin,
|
|
ID: "task",
|
|
Requires: []plugin.Type{
|
|
plugin.EventPlugin,
|
|
},
|
|
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
|
return taskService{service}, nil
|
|
},
|
|
})
|
|
manager = shimToManager{
|
|
shim: service,
|
|
name: name,
|
|
}
|
|
}
|
|
|
|
// Handle explicit actions
|
|
switch action {
|
|
case "delete":
|
|
if debugFlag {
|
|
logrus.SetLevel(logrus.DebugLevel)
|
|
}
|
|
logger := log.G(ctx).WithFields(log.Fields{
|
|
"pid": os.Getpid(),
|
|
"namespace": namespaceFlag,
|
|
})
|
|
go reap(ctx, logger, signals)
|
|
ss, err := manager.Stop(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
data, err := proto.Marshal(&shimapi.DeleteResponse{
|
|
Pid: uint32(ss.Pid),
|
|
ExitStatus: uint32(ss.ExitStatus),
|
|
ExitedAt: protobuf.ToTimestamp(ss.ExitedAt),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := os.Stdout.Write(data); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
case "start":
|
|
opts := StartOpts{
|
|
Address: addressFlag,
|
|
TTRPCAddress: ttrpcAddress,
|
|
Debug: debugFlag,
|
|
}
|
|
|
|
address, err := manager.Start(ctx, id, opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := os.Stdout.WriteString(address); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if !config.NoSetupLogger {
|
|
ctx, err = setLogger(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
plugin.Register(&plugin.Registration{
|
|
Type: plugin.InternalPlugin,
|
|
ID: "shutdown",
|
|
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
|
return sd, nil
|
|
},
|
|
})
|
|
|
|
// Register event plugin
|
|
plugin.Register(&plugin.Registration{
|
|
Type: plugin.EventPlugin,
|
|
ID: "publisher",
|
|
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
|
return publisher, nil
|
|
},
|
|
})
|
|
|
|
var (
|
|
initialized = plugin.NewPluginSet()
|
|
ttrpcServices = []ttrpcService{}
|
|
|
|
ttrpcUnaryInterceptors = []ttrpc.UnaryServerInterceptor{}
|
|
)
|
|
plugins := plugin.Graph(func(*plugin.Registration) bool { return false })
|
|
for _, p := range plugins {
|
|
id := p.URI()
|
|
log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)
|
|
|
|
initContext := plugin.NewContext(
|
|
ctx,
|
|
p,
|
|
initialized,
|
|
// NOTE: Root is empty since the shim does not support persistent storage,
|
|
// shim plugins should make use state directory for writing files to disk.
|
|
// The state directory will be destroyed when the shim if cleaned up or
|
|
// on reboot
|
|
"",
|
|
bundlePath,
|
|
)
|
|
initContext.Address = addressFlag
|
|
initContext.TTRPCAddress = ttrpcAddress
|
|
|
|
// load the plugin specific configuration if it is provided
|
|
// TODO: Read configuration passed into shim, or from state directory?
|
|
// if p.Config != nil {
|
|
// pc, err := config.Decode(p)
|
|
// if err != nil {
|
|
// return nil, err
|
|
// }
|
|
// initContext.Config = pc
|
|
// }
|
|
|
|
result := p.Init(initContext)
|
|
if err := initialized.Add(result); err != nil {
|
|
return fmt.Errorf("could not add plugin result to plugin set: %w", err)
|
|
}
|
|
|
|
instance, err := result.Instance()
|
|
if err != nil {
|
|
if plugin.IsSkipPlugin(err) {
|
|
log.G(ctx).WithError(err).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
|
|
continue
|
|
}
|
|
return fmt.Errorf("failed to load plugin %s: %w", id, err)
|
|
}
|
|
|
|
if src, ok := instance.(ttrpcService); ok {
|
|
logrus.WithField("id", id).Debug("registering ttrpc service")
|
|
ttrpcServices = append(ttrpcServices, src)
|
|
|
|
}
|
|
|
|
if src, ok := instance.(ttrpcServerOptioner); ok {
|
|
ttrpcUnaryInterceptors = append(ttrpcUnaryInterceptors, src.UnaryInterceptor())
|
|
}
|
|
}
|
|
|
|
if len(ttrpcServices) == 0 {
|
|
return fmt.Errorf("required that ttrpc service")
|
|
}
|
|
|
|
unaryInterceptor := chainUnaryServerInterceptors(ttrpcUnaryInterceptors...)
|
|
server, err := newServer(ttrpc.WithUnaryServerInterceptor(unaryInterceptor))
|
|
if err != nil {
|
|
return fmt.Errorf("failed creating server: %w", err)
|
|
}
|
|
|
|
for _, srv := range ttrpcServices {
|
|
if err := srv.RegisterTTRPC(server); err != nil {
|
|
return fmt.Errorf("failed to register service: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := serve(ctx, server, signals, sd.Shutdown); err != nil {
|
|
if err != shutdown.ErrShutdown {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// NOTE: If the shim server is down(like oom killer), the address
|
|
// socket might be leaking.
|
|
if address, err := ReadAddress("address"); err == nil {
|
|
_ = RemoveSocket(address)
|
|
}
|
|
|
|
select {
|
|
case <-publisher.Done():
|
|
return nil
|
|
case <-time.After(5 * time.Second):
|
|
return errors.New("publisher not closed")
|
|
}
|
|
}
|
|
|
|
// serve serves the ttrpc API over a unix socket in the current working directory
|
|
// and blocks until the context is canceled
|
|
func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal, shutdown func()) error {
|
|
dump := make(chan os.Signal, 32)
|
|
setupDumpStacks(dump)
|
|
|
|
path, err := os.Getwd()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
l, err := serveListener(socketFlag)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
go func() {
|
|
defer l.Close()
|
|
if err := server.Serve(ctx, l); err != nil && !errors.Is(err, net.ErrClosed) {
|
|
log.G(ctx).WithError(err).Fatal("containerd-shim: ttrpc server failure")
|
|
}
|
|
}()
|
|
logger := log.G(ctx).WithFields(log.Fields{
|
|
"pid": os.Getpid(),
|
|
"path": path,
|
|
"namespace": namespaceFlag,
|
|
})
|
|
go func() {
|
|
for range dump {
|
|
dumpStacks(logger)
|
|
}
|
|
}()
|
|
|
|
go handleExitSignals(ctx, logger, shutdown)
|
|
return reap(ctx, logger, signals)
|
|
}
|
|
|
|
func dumpStacks(logger *logrus.Entry) {
|
|
var (
|
|
buf []byte
|
|
stackSize int
|
|
)
|
|
bufferLen := 16384
|
|
for stackSize == len(buf) {
|
|
buf = make([]byte, bufferLen)
|
|
stackSize = runtime.Stack(buf, true)
|
|
bufferLen *= 2
|
|
}
|
|
buf = buf[:stackSize]
|
|
logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
|
|
}
|