Create server package for containerd daemon

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2017-06-21 17:14:28 -07:00
parent c4da4ed393
commit a6e77432df
12 changed files with 398 additions and 325 deletions

View File

@ -15,9 +15,8 @@ import (
)
const (
defaultRoot = "/var/lib/containerd-test"
defaultState = "/run/containerd-test"
testImage = "docker.io/library/alpine:latest"
defaultRoot = "/var/lib/containerd-test"
testImage = "docker.io/library/alpine:latest"
)
var (
@ -58,25 +57,28 @@ func TestMain(m *testing.M) {
// setup a new containerd daemon if !testing.Short
cmd = exec.Command("containerd",
"--root", defaultRoot,
"--state", defaultState,
"--address", address,
)
cmd.Stderr = buf
if err := cmd.Start(); err != nil {
fmt.Println(err)
cmd.Wait()
fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String())
os.Exit(1)
}
}
client, err := waitForDaemonStart(ctx, address)
if err != nil {
fmt.Fprintln(os.Stderr, "immediate fail!", err)
cmd.Wait()
fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String())
os.Exit(1)
}
// pull a seed image
if _, err = client.Pull(ctx, testImage, WithPullUnpack); err != nil {
fmt.Fprintln(os.Stderr, err)
cmd.Process.Signal(syscall.SIGTERM)
cmd.Wait()
fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String())
os.Exit(1)
}

View File

@ -1,11 +1,8 @@
package main
import (
"bytes"
"io"
"os"
"github.com/BurntSushi/toml"
"github.com/urfave/cli"
)
@ -17,77 +14,9 @@ var configCommand = cli.Command{
Name: "default",
Usage: "see the output of the default config",
Action: func(context *cli.Context) error {
_, err := conf.WriteTo(os.Stdout)
_, err := defaultConfig().WriteTo(os.Stdout)
return err
},
},
},
}
// loadConfig loads the config from the provided path
func loadConfig(path string) error {
md, err := toml.DecodeFile(path, conf)
if err != nil {
return err
}
conf.md = md
return nil
}
// config specifies the containerd configuration file in the TOML format.
// It contains fields to configure various subsystems and containerd as a whole.
type config struct {
// Root is the path to a directory where containerd will store persistent data
Root string `toml:"root"`
// GRPC configuration settings
GRPC grpcConfig `toml:"grpc"`
// Debug and profiling settings
Debug debug `toml:"debug"`
// Metrics and monitoring settings
Metrics metricsConfig `toml:"metrics"`
// Snapshotter specifies which snapshot driver to use
Snapshotter string `toml:"snapshotter"`
// Differ specifies which differ to use. Differ is tightly coupled with the snapshotter
// so not all combinations may work.
Differ string `toml:"differ"`
// Plugins provides plugin specific configuration for the initialization of a plugin
Plugins map[string]toml.Primitive `toml:"plugins"`
// Enable containerd as a subreaper
Subreaper bool `toml:"subreaper"`
// OOMScore adjust the containerd's oom score
OOMScore int `toml:"oom_score"`
md toml.MetaData
}
func (c *config) decodePlugin(name string, v interface{}) error {
p, ok := c.Plugins[name]
if !ok {
return nil
}
return c.md.PrimitiveDecode(p, v)
}
func (c *config) WriteTo(w io.Writer) (int64, error) {
buf := bytes.NewBuffer(nil)
e := toml.NewEncoder(buf)
if err := e.Encode(c); err != nil {
return 0, err
}
return io.Copy(w, buf)
}
type grpcConfig struct {
Address string `toml:"address"`
Uid int `toml:"uid"`
Gid int `toml:"gid"`
}
type debug struct {
Address string `toml:"address"`
Level string `toml:"level"`
}
type metricsConfig struct {
Address string `toml:"address"`
}

View File

@ -1,14 +1,17 @@
package main
import "github.com/containerd/containerd"
import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/server"
)
func defaultConfig() *config {
return &config{
func defaultConfig() *server.Config {
return &server.Config{
Root: "/var/lib/containerd",
GRPC: grpcConfig{
GRPC: server.GRPCConfig{
Address: containerd.DefaultAddress,
},
Debug: debug{
Debug: server.Debug{
Level: "info",
Address: "/run/containerd/debug.sock",
},

View File

@ -2,13 +2,15 @@
package main
func defaultConfig() *config {
return &config{
import "github.com/containerd/containerd/server"
func defaultConfig() *server.Config {
return &server.Config{
Root: "/var/lib/containerd",
GRPC: grpcConfig{
GRPC: server.GRPCConfig{
Address: "/run/containerd/containerd.sock",
},
Debug: debug{
Debug: server.Debug{
Level: "info",
Address: "/run/containerd/debug.sock",
},

View File

@ -3,15 +3,17 @@ package main
import (
"os"
"path/filepath"
"github.com/containerd/containerd/server"
)
func defaultConfig() *config {
return &config{
func defaultConfig() *server.Config {
return &server.Config{
Root: filepath.Join(os.Getenv("programfiles"), "containerd", "root"),
GRPC: grpcConfig{
GRPC: server.GRPCConfig{
Address: `\\.\pipe\containerd-containerd`,
},
Debug: debug{
Debug: server.Debug{
Level: "info",
Address: `\\.\pipe\containerd-debug`,
},

View File

@ -1,10 +1,9 @@
package main
import (
_ "expvar"
"context"
"fmt"
"net/http"
_ "net/http/pprof"
"net"
"os"
"os/signal"
"path/filepath"
@ -12,27 +11,15 @@ import (
"time"
"github.com/boltdb/bolt"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
gocontext "golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/Sirupsen/logrus"
containersapi "github.com/containerd/containerd/api/services/containers/v1"
contentapi "github.com/containerd/containerd/api/services/content/v1"
diffapi "github.com/containerd/containerd/api/services/diff/v1"
imagesapi "github.com/containerd/containerd/api/services/images/v1"
namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
api "github.com/containerd/containerd/api/services/tasks/v1"
versionapi "github.com/containerd/containerd/api/services/version/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/server"
"github.com/containerd/containerd/sys"
"github.com/containerd/containerd/version"
metrics "github.com/docker/go-metrics"
"github.com/pkg/errors"
"github.com/urfave/cli"
)
@ -47,11 +34,6 @@ const usage = `
high performance container runtime
`
var (
conf = defaultConfig()
global = log.WithModule(gocontext.Background(), "containerd")
)
func init() {
cli.VersionPrinter = func(c *cli.Context) {
fmt.Println(c.App.Name, version.Package, c.App.Version)
@ -73,10 +55,6 @@ func main() {
Name: "log-level,l",
Usage: "set the logging level [debug, info, warn, error, fatal, panic]",
},
cli.StringFlag{
Name: "state",
Usage: "containerd state directory",
},
cli.StringFlag{
Name: "address,a",
Usage: "address for containerd's GRPC server",
@ -89,81 +67,66 @@ func main() {
app.Commands = []cli.Command{
configCommand,
}
app.Before = before
app.Action = func(context *cli.Context) error {
start := time.Now()
var (
start = time.Now()
signals = make(chan os.Signal, 2048)
ctx = log.WithModule(gocontext.Background(), "containerd")
config = defaultConfig()
)
// start the signal handler as soon as we can to make sure that
// we don't miss any signals during boot
signals := make(chan os.Signal, 2048)
signal.Notify(signals, handledSignals...)
if err := platformInit(context); err != nil {
return err
}
log.G(global).Info("starting containerd boot...")
// load all plugins into containerd
if err := plugin.Load(filepath.Join(conf.Root, "plugins")); err != nil {
if err := server.LoadConfig(context.GlobalString("config"), config); err != nil && !os.IsNotExist(err) {
return err
}
registerContentStore()
registerMetaDB()
// start debug and metrics APIs
if err := serveDebugAPI(); err != nil {
// apply flags to the config
if err := applyFlags(context, config); err != nil {
return err
}
address := config.GRPC.Address
if address == "" {
return errors.New("grpc address cannot be empty")
}
if err := platformInit(ctx, config); err != nil {
return err
}
log.G(ctx).Info("starting containerd boot...")
plugins, err := loadPlugins(config)
if err != nil {
return err
}
var (
services []plugin.Service
plugins = make(map[plugin.PluginType][]interface{})
eventEmitter = events.NewEmitter()
)
for _, init := range plugin.Graph() {
id := init.URI()
if !shouldLoad(init) {
log.G(global).WithField("type", init.Type).Infof("skipping plugin %q...", id)
continue
}
log.G(global).WithField("type", init.Type).Infof("loading plugin %q...", id)
ic := plugin.NewContext(plugins)
ic.Root = filepath.Join(conf.Root, id)
rCtx := events.WithPoster(global, eventEmitter)
ic.Context = log.WithModule(rCtx, id)
if init.Config != nil {
if err := loadPluginConfig(init.ID, init.Config, ic); err != nil {
return err
}
}
ic.Emitter = eventEmitter
p, err := init.Init(ic)
server, err := server.New(ctx, config, plugins)
if err != nil {
return err
}
if config.Debug.Address != "" {
l, err := sys.GetLocalListener(config.Debug.Address, config.Debug.Uid, config.Debug.Gid)
if err != nil {
return err
}
plugins[init.Type] = append(plugins[init.Type], p)
if s, ok := p.(plugin.Service); ok {
services = append(services, s)
}
serve(log.WithModule(ctx, "debug"), l, server.ServeDebug)
}
// start the GRPC api with the execution service registered
server := newGRPCServer()
for _, service := range services {
if err := service.Register(server); err != nil {
if config.Metrics.Address != "" {
l, err := net.Listen("tcp", config.Metrics.Address)
if err != nil {
return err
}
serve(log.WithModule(ctx, "metrics"), l, server.ServeMetrics)
}
// register metrics last after all other services
grpc_prometheus.Register(server)
log.G(global).Info("starting GRPC API server...")
if err := serveGRPC(server); err != nil {
l, err := sys.GetLocalListener(address, config.GRPC.Uid, config.GRPC.Gid)
if err != nil {
return err
}
// start the prometheus metrics API for containerd
serveMetricsAPI()
serve(log.WithModule(ctx, "grpc"), l, server.ServeGRPC)
log.G(global).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
return handleSignals(signals, server)
log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
return handleSignals(ctx, signals, server)
}
if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
@ -171,14 +134,21 @@ func main() {
}
}
func before(context *cli.Context) error {
err := loadConfig(context.GlobalString("config"))
if err != nil && !os.IsNotExist(err) {
return err
}
func serve(ctx context.Context, l net.Listener, serveFunc func(net.Listener) error) {
path := l.Addr().String()
log.G(ctx).WithField("address", path).Info("serving...")
go func() {
defer l.Close()
if err := serveFunc(l); err != nil {
log.G(ctx).WithError(err).WithField("address", path).Fatal("serve failure")
}
}()
}
func applyFlags(context *cli.Context, config *server.Config) error {
// the order for config vs flag values is that flags will always override
// the config values if they are set
if err := setLevel(context); err != nil {
if err := setLevel(context, config); err != nil {
return err
}
for _, v := range []struct {
@ -187,11 +157,11 @@ func before(context *cli.Context) error {
}{
{
name: "root",
d: &conf.Root,
d: &config.Root,
},
{
name: "address",
d: &conf.GRPC.Address,
d: &config.GRPC.Address,
},
} {
if s := context.GlobalString(v.name); s != "" {
@ -201,10 +171,10 @@ func before(context *cli.Context) error {
return nil
}
func setLevel(context *cli.Context) error {
func setLevel(context *cli.Context, config *server.Config) error {
l := context.GlobalString("log-level")
if l == "" {
l = conf.Debug.Level
l = config.Debug.Level
}
if l != "" {
lvl, err := logrus.ParseLevel(l)
@ -216,43 +186,16 @@ func setLevel(context *cli.Context) error {
return nil
}
func serveMetricsAPI() {
if conf.Metrics.Address != "" {
log.G(global).WithField("metrics", conf.Metrics.Address).Info("starting metrics API...")
h := newMetricsHandler()
go func() {
if err := http.ListenAndServe(conf.Metrics.Address, h); err != nil {
log.G(global).WithError(err).Fatal("serve metrics API")
}
}()
func loadPlugins(config *server.Config) ([]*plugin.Registration, error) {
// load all plugins into containerd
if err := plugin.Load(filepath.Join(config.Root, "plugins")); err != nil {
return nil, err
}
}
func newMetricsHandler() http.Handler {
m := http.NewServeMux()
m.Handle("/metrics", metrics.Handler())
return m
}
func serveDebugAPI() error {
path := conf.Debug.Address
if path == "" {
return errors.New("debug socket path cannot be empty")
}
l, err := sys.GetLocalListener(path, conf.GRPC.Uid, conf.GRPC.Gid)
if err != nil {
return err
}
log.G(global).WithField("debug", path).Info("starting debug API...")
go func() {
defer l.Close()
// pprof and expvars are imported and automatically register their endpoints
// under /debug
if err := http.Serve(l, nil); err != nil {
log.G(global).WithError(err).Fatal("serve debug API")
}
}()
return nil
// load additional plugins that don't automatically register themselves
registerContentStore()
registerMetaDB()
// return the ordered graph for plugins
return plugin.Graph(), nil
}
func registerContentStore() {
@ -278,63 +221,6 @@ func registerMetaDB() {
})
}
func newGRPCServer() *grpc.Server {
s := grpc.NewServer(
grpc.UnaryInterceptor(interceptor),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
)
return s
}
func serveGRPC(server *grpc.Server) error {
path := conf.GRPC.Address
if path == "" {
return errors.New("--socket path cannot be empty")
}
l, err := sys.GetLocalListener(path, conf.GRPC.Uid, conf.GRPC.Gid)
if err != nil {
return err
}
go func() {
defer l.Close()
if err := server.Serve(l); err != nil {
log.G(global).WithError(err).Fatal("serve GRPC")
}
}()
return nil
}
func interceptor(ctx gocontext.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
ctx = log.WithModule(ctx, "containerd")
switch info.Server.(type) {
case api.TasksServer:
ctx = log.WithModule(ctx, "execution")
case containersapi.ContainersServer:
ctx = log.WithModule(ctx, "containers")
case contentapi.ContentServer:
ctx = log.WithModule(ctx, "content")
case imagesapi.ImagesServer:
ctx = log.WithModule(ctx, "images")
case grpc_health_v1.HealthServer:
// No need to change the context
case versionapi.VersionServer:
ctx = log.WithModule(ctx, "version")
case snapshotapi.SnapshotsServer:
ctx = log.WithModule(ctx, "snapshot")
case diffapi.DiffServer:
ctx = log.WithModule(ctx, "diff")
case namespacesapi.NamespacesServer:
ctx = log.WithModule(ctx, "namespaces")
default:
log.G(ctx).Warnf("unknown GRPC server type: %#v\n", info.Server)
}
return grpc_prometheus.UnaryServerInterceptor(ctx, req, info, handler)
}
func dumpStacks() {
var (
buf []byte
@ -349,23 +235,3 @@ func dumpStacks() {
buf = buf[:stackSize]
logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
}
func loadPluginConfig(name string, c interface{}, ic *plugin.InitContext) error {
if err := conf.decodePlugin(name, c); err != nil {
return err
}
ic.Config = c
return nil
}
func shouldLoad(r *plugin.Registration) bool {
// only load certain plugins based on the config values
switch r.Type {
case plugin.SnapshotPlugin:
return r.URI() == conf.Snapshotter
case plugin.DiffPlugin:
return r.URI() == conf.Differ
default:
return true
}
}

View File

@ -1,15 +1,15 @@
package main
import (
"context"
"os"
"golang.org/x/sys/unix"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/server"
"github.com/containerd/containerd/sys"
"github.com/urfave/cli"
"google.golang.org/grpc"
)
const defaultConfigPath = "/etc/containerd/config.toml"
@ -21,29 +21,29 @@ var handledSignals = []os.Signal{
unix.SIGCHLD,
}
func platformInit(context *cli.Context) error {
if conf.Subreaper {
log.G(global).Info("setting subreaper...")
func platformInit(ctx context.Context, config *server.Config) error {
if config.Subreaper {
log.G(ctx).Info("setting subreaper...")
if err := sys.SetSubreaper(1); err != nil {
return err
}
}
if conf.OOMScore != 0 {
log.G(global).Infof("changing OOM score to %d", conf.OOMScore)
if err := sys.SetOOMScore(os.Getpid(), conf.OOMScore); err != nil {
if config.OOMScore != 0 {
log.G(ctx).Infof("changing OOM score to %d", config.OOMScore)
if err := sys.SetOOMScore(os.Getpid(), config.OOMScore); err != nil {
return err
}
}
return nil
}
func handleSignals(signals chan os.Signal, server *grpc.Server) error {
func handleSignals(ctx context.Context, signals chan os.Signal, server *server.Server) error {
for s := range signals {
log.G(global).WithField("signal", s).Debug("received signal")
log.G(ctx).WithField("signal", s).Debug("received signal")
switch s {
case unix.SIGCHLD:
if err := reaper.Reap(); err != nil {
log.G(global).WithError(err).Error("reap containerd processes")
log.G(ctx).WithError(err).Error("reap containerd processes")
}
case unix.SIGUSR1:
dumpStacks()

View File

@ -3,35 +3,36 @@
package main
import (
"context"
"os"
"golang.org/x/sys/unix"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/reaper"
"github.com/urfave/cli"
"google.golang.org/grpc"
"github.com/containerd/containerd/server"
)
const (
defaultConfigPath = "/etc/containerd/config.toml"
)
const defaultConfigPath = "/etc/containerd/config.toml"
var (
handledSignals = []os.Signal{unix.SIGTERM, unix.SIGINT, unix.SIGUSR1, unix.SIGCHLD}
)
var handledSignals = []os.Signal{
unix.SIGTERM,
unix.SIGINT,
unix.SIGUSR1,
unix.SIGCHLD,
}
func platformInit(context *cli.Context) error {
func platformInit(context context.Context, config *server.Config) error {
return nil
}
func handleSignals(signals chan os.Signal, server *grpc.Server) error {
func handleSignals(ctx context.Context, signals chan os.Signal, server *server.Server) error {
for s := range signals {
log.G(global).WithField("signal", s).Debug("received signal")
log.G(ctx).WithField("signal", s).Debug("received signal")
switch s {
case unix.SIGCHLD:
if err := reaper.Reap(); err != nil {
log.G(global).WithError(err).Error("reap containerd processes")
log.G(ctx).WithError(err).Error("reap containerd processes")
}
case unix.SIGUSR1:
dumpStacks()

View File

@ -1,27 +1,30 @@
package main
import (
"context"
"os"
"path/filepath"
"syscall"
"github.com/containerd/containerd/log"
"github.com/urfave/cli"
"google.golang.org/grpc"
"github.com/containerd/containerd/server"
)
var (
defaultConfigPath = filepath.Join(os.Getenv("programfiles"), "containerd", "config.toml")
handledSignals = []os.Signal{syscall.SIGTERM, syscall.SIGINT}
handledSignals = []os.Signal{
syscall.SIGTERM,
syscall.SIGINT,
}
)
func platformInit(context *cli.Context) error {
func platformInit(context context.Context, config *server.Config) error {
return nil
}
func handleSignals(signals chan os.Signal, server *grpc.Server) error {
func handleSignals(ctx context.Context, signals chan os.Signal, server *server.Server) error {
for s := range signals {
log.G(global).WithField("signal", s).Debug("received signal")
log.G(ctx).WithField("signal", s).Debug("received signal")
server.Stop()
}
return nil

View File

@ -3,13 +3,17 @@ package plugin
import (
"context"
"fmt"
"path/filepath"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
)
func NewContext(plugins map[PluginType][]interface{}) *InitContext {
func NewContext(ctx context.Context, plugins map[PluginType][]interface{}, root, id string) *InitContext {
return &InitContext{
plugins: plugins,
Root: filepath.Join(root, id),
Context: log.WithModule(ctx, id),
}
}

86
server/config.go Normal file
View File

@ -0,0 +1,86 @@
package server
import (
"bytes"
"io"
"github.com/BurntSushi/toml"
)
// Config provides containerd configuration data for the server
type Config struct {
// Root is the path to a directory where containerd will store persistent data
Root string `toml:"root"`
// GRPC configuration settings
GRPC GRPCConfig `toml:"grpc"`
// Debug and profiling settings
Debug Debug `toml:"debug"`
// Metrics and monitoring settings
Metrics MetricsConfig `toml:"metrics"`
// Snapshotter specifies which snapshot driver to use
Snapshotter string `toml:"snapshotter"`
// Differ specifies which differ to use. Differ is tightly coupled with the snapshotter
// so not all combinations may work.
Differ string `toml:"differ"`
// Plugins provides plugin specific configuration for the initialization of a plugin
Plugins map[string]toml.Primitive `toml:"plugins"`
// Enable containerd as a subreaper
Subreaper bool `toml:"subreaper"`
// OOMScore adjust the containerd's oom score
OOMScore int `toml:"oom_score"`
md toml.MetaData
}
type GRPCConfig struct {
Address string `toml:"address"`
Uid int `toml:"uid"`
Gid int `toml:"gid"`
}
type Debug struct {
Address string `toml:"address"`
Uid int `toml:"uid"`
Gid int `toml:"gid"`
Level string `toml:"level"`
}
type MetricsConfig struct {
Address string `toml:"address"`
}
// Decode unmarshals a plugin specific configuration by plugin id
func (c *Config) Decode(id string, v interface{}) (interface{}, error) {
data, ok := c.Plugins[id]
if !ok {
return v, nil
}
if err := c.md.PrimitiveDecode(data, v); err != nil {
return nil, err
}
return v, nil
}
// WriteTo marshals the config to the provided writer
func (c *Config) WriteTo(w io.Writer) (int64, error) {
buf := bytes.NewBuffer(nil)
e := toml.NewEncoder(buf)
if err := e.Encode(c); err != nil {
return 0, err
}
return io.Copy(w, buf)
}
// LoadConfig loads the containerd server config from the provided path
func LoadConfig(path string, v *Config) error {
if v == nil {
v = &Config{}
}
md, err := toml.DecodeFile(path, v)
if err != nil {
return err
}
v.md = md
return nil
}

175
server/server.go Normal file
View File

@ -0,0 +1,175 @@
package server
import (
"errors"
"expvar"
"net"
"net/http"
"net/http/pprof"
"os"
containers "github.com/containerd/containerd/api/services/containers/v1"
content "github.com/containerd/containerd/api/services/content/v1"
diff "github.com/containerd/containerd/api/services/diff/v1"
images "github.com/containerd/containerd/api/services/images/v1"
namespaces "github.com/containerd/containerd/api/services/namespaces/v1"
snapshot "github.com/containerd/containerd/api/services/snapshot/v1"
tasks "github.com/containerd/containerd/api/services/tasks/v1"
version "github.com/containerd/containerd/api/services/version/v1"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/plugin"
metrics "github.com/docker/go-metrics"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
)
// New creates and initializes a new containerd server
func New(ctx context.Context, config *Config, plugins []*plugin.Registration) (*Server, error) {
if config.Root == "" {
return nil, errors.New("root must be specified")
}
if err := os.MkdirAll(config.Root, 0700); err != nil {
return nil, err
}
rpc := grpc.NewServer(
grpc.UnaryInterceptor(interceptor),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
)
var (
services []plugin.Service
s = &Server{
rpc: rpc,
emitter: events.NewEmitter(),
}
initialized = make(map[plugin.PluginType][]interface{})
)
for _, p := range plugins {
id := p.URI()
if !shouldLoadPlugin(p, config) {
log.G(ctx).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
continue
}
log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)
initContext := plugin.NewContext(
events.WithPoster(ctx, s.emitter),
initialized,
config.Root,
id,
)
initContext.Emitter = s.emitter
// load the plugin specific configuration if it is provided
if p.Config != nil {
pluginConfig, err := config.Decode(p.ID, p.Config)
if err != nil {
return nil, err
}
initContext.Config = pluginConfig
}
instance, err := p.Init(initContext)
if err != nil {
return nil, err
}
initialized[p.Type] = append(initialized[p.Type], instance)
// check for grpc services that should be registered with the server
if service, ok := instance.(plugin.Service); ok {
services = append(services, service)
}
}
// register services after all plugins have been initialized
for _, service := range services {
if err := service.Register(rpc); err != nil {
return nil, err
}
}
return s, nil
}
// Server is the containerd main daemon
type Server struct {
rpc *grpc.Server
emitter *events.Emitter
}
// ServeGRPC provides the containerd grpc APIs on the provided listener
func (s *Server) ServeGRPC(l net.Listener) error {
// before we start serving the grpc API regster the grpc_prometheus metrics
// handler. This needs to be the last service registered so that it can collect
// metrics for every other service
grpc_prometheus.Register(s.rpc)
return s.rpc.Serve(l)
}
// ServeMetrics provides a prometheus endpoint for exposing metrics
func (s *Server) ServeMetrics(l net.Listener) error {
m := http.NewServeMux()
m.Handle("/metrics", metrics.Handler())
return http.Serve(l, m)
}
// ServeDebug provides a debug endpoint
func (s *Server) ServeDebug(l net.Listener) error {
// don't use the default http server mux to make sure nothing gets registered
// that we don't want to expose via containerd
m := http.NewServeMux()
m.Handle("/debug/vars", expvar.Handler())
m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
return http.Serve(l, m)
}
// Stop gracefully stops the containerd server
func (s *Server) Stop() {
s.rpc.GracefulStop()
}
func shouldLoadPlugin(p *plugin.Registration, config *Config) bool {
switch p.Type {
case plugin.SnapshotPlugin:
return p.URI() == config.Snapshotter
case plugin.DiffPlugin:
return p.URI() == config.Differ
default:
return true
}
}
func interceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
ctx = log.WithModule(ctx, "containerd")
switch info.Server.(type) {
case tasks.TasksServer:
ctx = log.WithModule(ctx, "execution")
case containers.ContainersServer:
ctx = log.WithModule(ctx, "containers")
case content.ContentServer:
ctx = log.WithModule(ctx, "content")
case images.ImagesServer:
ctx = log.WithModule(ctx, "images")
case grpc_health_v1.HealthServer:
// No need to change the context
case version.VersionServer:
ctx = log.WithModule(ctx, "version")
case snapshot.SnapshotsServer:
ctx = log.WithModule(ctx, "snapshot")
case diff.DiffServer:
ctx = log.WithModule(ctx, "diff")
case namespaces.NamespacesServer:
ctx = log.WithModule(ctx, "namespaces")
default:
log.G(ctx).Warnf("unknown GRPC server type: %#v\n", info.Server)
}
return grpc_prometheus.UnaryServerInterceptor(ctx, req, info, handler)
}