containerd/cmd/containerd-shim/main_unix.go
Stephen J Day 5764bf1bad
cmd/containerd-shim: set GOMAXPROCS to 2
The shim doesn't need massive concurrency and a bunch of CPUs to do its
job correctly. We can reduce the number of threads to save memory at
little cost to performance.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-11-27 13:40:35 -08:00

218 lines
5.6 KiB
Go

// +build !windows
package main
import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"net"
"os"
"os/exec"
"runtime"
"strings"
"sync"
"syscall"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/linux/proc"
"github.com/containerd/containerd/linux/shim"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/reaper"
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
)
var (
debugFlag bool
namespaceFlag string
socketFlag string
addressFlag string
workdirFlag string
runtimeRootFlag string
criuFlag string
systemdCgroupFlag bool
containerdBinaryFlag string
)
func init() {
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
flag.StringVar(&workdirFlag, "workdir", "", "path used to storge large temporary data")
flag.StringVar(&runtimeRootFlag, "runtime-root", proc.RuncRoot, "root directory for the runtime")
flag.StringVar(&criuFlag, "criu", "", "path to criu binary")
flag.BoolVar(&systemdCgroupFlag, "systemd-cgroup", false, "set runtime to use systemd-cgroup")
// currently, the `containerd publish` utility is embedded in the daemon binary.
// The daemon invokes `containerd-shim -containerd-binary ...` with its own os.Executable() path.
flag.StringVar(&containerdBinaryFlag, "containerd-binary", "containerd", "path to containerd binary (used for `containerd publish`)")
flag.Parse()
}
func main() {
if debugFlag {
logrus.SetLevel(logrus.DebugLevel)
}
if os.Getenv("GOMAXPROCS") == "" {
// If GOMAXPROCS hasn't been set, we default to a value of 2 to reduce
// the number of Go stacks present in the shim.
runtime.GOMAXPROCS(2)
}
if err := executeShim(); err != nil {
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
os.Exit(1)
}
}
func executeShim() error {
// start handling signals as soon as possible so that things are properly reaped
// or if runtime exits before we hit the handler
signals, err := setupSignals()
if err != nil {
return err
}
path, err := os.Getwd()
if err != nil {
return err
}
server := newServer()
sv, err := shim.NewService(
shim.Config{
Path: path,
Namespace: namespaceFlag,
WorkDir: workdirFlag,
Criu: criuFlag,
SystemdCgroup: systemdCgroupFlag,
RuntimeRoot: runtimeRootFlag,
},
&remoteEventsPublisher{address: addressFlag},
)
if err != nil {
return err
}
logrus.Debug("registering grpc server")
shimapi.RegisterShimServer(server, sv)
socket := socketFlag
if err := serve(server, socket); err != nil {
return err
}
logger := logrus.WithFields(logrus.Fields{
"pid": os.Getpid(),
"path": path,
"namespace": namespaceFlag,
})
return handleSignals(logger, signals, server, sv)
}
// serve serves the grpc API over a unix socket at the provided path
// this function does not block
func serve(server *grpc.Server, path string) error {
var (
l net.Listener
err error
)
if path == "" {
l, err = net.FileListener(os.NewFile(3, "socket"))
path = "[inherited from parent]"
} else {
l, err = net.Listen("unix", "\x00"+path)
}
if err != nil {
return err
}
logrus.WithField("socket", path).Debug("serving api on unix socket")
go func() {
defer l.Close()
if err := server.Serve(l); err != nil &&
!strings.Contains(err.Error(), "use of closed network connection") {
logrus.WithError(err).Fatal("containerd-shim: GRPC server failure")
}
}()
return nil
}
func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *grpc.Server, sv *shim.Service) error {
var (
termOnce sync.Once
done = make(chan struct{})
)
for {
select {
case <-done:
return nil
case s := <-signals:
switch s {
case unix.SIGCHLD:
if err := reaper.Reap(); err != nil {
logger.WithError(err).Error("reap exit status")
}
case unix.SIGTERM, unix.SIGINT:
go termOnce.Do(func() {
server.Stop()
// Ensure our child is dead if any
sv.Kill(context.Background(), &shimapi.KillRequest{
Signal: uint32(syscall.SIGKILL),
All: true,
})
sv.Delete(context.Background(), &ptypes.Empty{})
close(done)
})
case unix.SIGUSR1:
dumpStacks(logger)
}
}
}
}
func dumpStacks(logger *logrus.Entry) {
var (
buf []byte
stackSize int
)
bufferLen := 16384
for stackSize == len(buf) {
buf = make([]byte, bufferLen)
stackSize = runtime.Stack(buf, true)
bufferLen *= 2
}
buf = buf[:stackSize]
logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
}
type remoteEventsPublisher struct {
address string
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, _ := namespaces.Namespace(ctx)
encoded, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
data, err := encoded.Marshal()
if err != nil {
return err
}
cmd := exec.CommandContext(ctx, containerdBinaryFlag, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
c, err := reaper.Default.Start(cmd)
if err != nil {
return err
}
exit := <-c
if exit.Status != 0 {
return errors.New("failed to publish event")
}
return nil
}