diff --git a/client_test.go b/client_test.go index 4c3992209..5bed570ad 100644 --- a/client_test.go +++ b/client_test.go @@ -15,9 +15,8 @@ import ( ) const ( - defaultRoot = "/var/lib/containerd-test" - defaultState = "/run/containerd-test" - testImage = "docker.io/library/alpine:latest" + defaultRoot = "/var/lib/containerd-test" + testImage = "docker.io/library/alpine:latest" ) var ( @@ -58,25 +57,28 @@ func TestMain(m *testing.M) { // setup a new containerd daemon if !testing.Short cmd = exec.Command("containerd", "--root", defaultRoot, - "--state", defaultState, "--address", address, ) cmd.Stderr = buf if err := cmd.Start(); err != nil { - fmt.Println(err) + cmd.Wait() + fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String()) os.Exit(1) } } client, err := waitForDaemonStart(ctx, address) if err != nil { - fmt.Fprintln(os.Stderr, "immediate fail!", err) + cmd.Wait() + fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String()) os.Exit(1) } // pull a seed image if _, err = client.Pull(ctx, testImage, WithPullUnpack); err != nil { - fmt.Fprintln(os.Stderr, err) + cmd.Process.Signal(syscall.SIGTERM) + cmd.Wait() + fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String()) os.Exit(1) } diff --git a/cmd/containerd/config.go b/cmd/containerd/config.go index ebeddb7cd..cac902a6a 100644 --- a/cmd/containerd/config.go +++ b/cmd/containerd/config.go @@ -1,11 +1,8 @@ package main import ( - "bytes" - "io" "os" - "github.com/BurntSushi/toml" "github.com/urfave/cli" ) @@ -17,77 +14,9 @@ var configCommand = cli.Command{ Name: "default", Usage: "see the output of the default config", Action: func(context *cli.Context) error { - _, err := conf.WriteTo(os.Stdout) + _, err := defaultConfig().WriteTo(os.Stdout) return err }, }, }, } - -// loadConfig loads the config from the provided path -func loadConfig(path string) error { - md, err := toml.DecodeFile(path, conf) - if err != nil { - return err - } - conf.md = md - return nil -} - -// config specifies the containerd configuration file in the TOML format. -// It contains fields to configure various subsystems and containerd as a whole. -type config struct { - // Root is the path to a directory where containerd will store persistent data - Root string `toml:"root"` - // GRPC configuration settings - GRPC grpcConfig `toml:"grpc"` - // Debug and profiling settings - Debug debug `toml:"debug"` - // Metrics and monitoring settings - Metrics metricsConfig `toml:"metrics"` - // Snapshotter specifies which snapshot driver to use - Snapshotter string `toml:"snapshotter"` - // Differ specifies which differ to use. Differ is tightly coupled with the snapshotter - // so not all combinations may work. - Differ string `toml:"differ"` - // Plugins provides plugin specific configuration for the initialization of a plugin - Plugins map[string]toml.Primitive `toml:"plugins"` - // Enable containerd as a subreaper - Subreaper bool `toml:"subreaper"` - // OOMScore adjust the containerd's oom score - OOMScore int `toml:"oom_score"` - - md toml.MetaData -} - -func (c *config) decodePlugin(name string, v interface{}) error { - p, ok := c.Plugins[name] - if !ok { - return nil - } - return c.md.PrimitiveDecode(p, v) -} - -func (c *config) WriteTo(w io.Writer) (int64, error) { - buf := bytes.NewBuffer(nil) - e := toml.NewEncoder(buf) - if err := e.Encode(c); err != nil { - return 0, err - } - return io.Copy(w, buf) -} - -type grpcConfig struct { - Address string `toml:"address"` - Uid int `toml:"uid"` - Gid int `toml:"gid"` -} - -type debug struct { - Address string `toml:"address"` - Level string `toml:"level"` -} - -type metricsConfig struct { - Address string `toml:"address"` -} diff --git a/cmd/containerd/config_linux.go b/cmd/containerd/config_linux.go index a562726b1..efb82309b 100644 --- a/cmd/containerd/config_linux.go +++ b/cmd/containerd/config_linux.go @@ -1,14 +1,17 @@ package main -import "github.com/containerd/containerd" +import ( + "github.com/containerd/containerd" + "github.com/containerd/containerd/server" +) -func defaultConfig() *config { - return &config{ +func defaultConfig() *server.Config { + return &server.Config{ Root: "/var/lib/containerd", - GRPC: grpcConfig{ + GRPC: server.GRPCConfig{ Address: containerd.DefaultAddress, }, - Debug: debug{ + Debug: server.Debug{ Level: "info", Address: "/run/containerd/debug.sock", }, diff --git a/cmd/containerd/config_unix.go b/cmd/containerd/config_unix.go index a96085459..287fd9ef2 100644 --- a/cmd/containerd/config_unix.go +++ b/cmd/containerd/config_unix.go @@ -2,13 +2,15 @@ package main -func defaultConfig() *config { - return &config{ +import "github.com/containerd/containerd/server" + +func defaultConfig() *server.Config { + return &server.Config{ Root: "/var/lib/containerd", - GRPC: grpcConfig{ + GRPC: server.GRPCConfig{ Address: "/run/containerd/containerd.sock", }, - Debug: debug{ + Debug: server.Debug{ Level: "info", Address: "/run/containerd/debug.sock", }, diff --git a/cmd/containerd/config_windows.go b/cmd/containerd/config_windows.go index 4c11627cd..4bf3b802e 100644 --- a/cmd/containerd/config_windows.go +++ b/cmd/containerd/config_windows.go @@ -3,15 +3,17 @@ package main import ( "os" "path/filepath" + + "github.com/containerd/containerd/server" ) -func defaultConfig() *config { - return &config{ +func defaultConfig() *server.Config { + return &server.Config{ Root: filepath.Join(os.Getenv("programfiles"), "containerd", "root"), - GRPC: grpcConfig{ + GRPC: server.GRPCConfig{ Address: `\\.\pipe\containerd-containerd`, }, - Debug: debug{ + Debug: server.Debug{ Level: "info", Address: `\\.\pipe\containerd-debug`, }, diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index bef5a0461..bfdf4fc69 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -1,10 +1,9 @@ package main import ( - _ "expvar" + "context" "fmt" - "net/http" - _ "net/http/pprof" + "net" "os" "os/signal" "path/filepath" @@ -12,27 +11,15 @@ import ( "time" "github.com/boltdb/bolt" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" gocontext "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/health/grpc_health_v1" "github.com/Sirupsen/logrus" - containersapi "github.com/containerd/containerd/api/services/containers/v1" - contentapi "github.com/containerd/containerd/api/services/content/v1" - diffapi "github.com/containerd/containerd/api/services/diff/v1" - imagesapi "github.com/containerd/containerd/api/services/images/v1" - namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1" - snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1" - api "github.com/containerd/containerd/api/services/tasks/v1" - versionapi "github.com/containerd/containerd/api/services/version/v1" "github.com/containerd/containerd/content" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/server" "github.com/containerd/containerd/sys" "github.com/containerd/containerd/version" - metrics "github.com/docker/go-metrics" "github.com/pkg/errors" "github.com/urfave/cli" ) @@ -47,11 +34,6 @@ const usage = ` high performance container runtime ` -var ( - conf = defaultConfig() - global = log.WithModule(gocontext.Background(), "containerd") -) - func init() { cli.VersionPrinter = func(c *cli.Context) { fmt.Println(c.App.Name, version.Package, c.App.Version) @@ -73,10 +55,6 @@ func main() { Name: "log-level,l", Usage: "set the logging level [debug, info, warn, error, fatal, panic]", }, - cli.StringFlag{ - Name: "state", - Usage: "containerd state directory", - }, cli.StringFlag{ Name: "address,a", Usage: "address for containerd's GRPC server", @@ -89,81 +67,66 @@ func main() { app.Commands = []cli.Command{ configCommand, } - app.Before = before app.Action = func(context *cli.Context) error { - start := time.Now() + var ( + start = time.Now() + signals = make(chan os.Signal, 2048) + ctx = log.WithModule(gocontext.Background(), "containerd") + config = defaultConfig() + ) // start the signal handler as soon as we can to make sure that // we don't miss any signals during boot - signals := make(chan os.Signal, 2048) signal.Notify(signals, handledSignals...) - if err := platformInit(context); err != nil { - return err - } - log.G(global).Info("starting containerd boot...") - // load all plugins into containerd - if err := plugin.Load(filepath.Join(conf.Root, "plugins")); err != nil { + if err := server.LoadConfig(context.GlobalString("config"), config); err != nil && !os.IsNotExist(err) { return err } - registerContentStore() - registerMetaDB() - // start debug and metrics APIs - if err := serveDebugAPI(); err != nil { + // apply flags to the config + if err := applyFlags(context, config); err != nil { + return err + } + address := config.GRPC.Address + if address == "" { + return errors.New("grpc address cannot be empty") + } + + if err := platformInit(ctx, config); err != nil { + return err + } + log.G(ctx).Info("starting containerd boot...") + + plugins, err := loadPlugins(config) + if err != nil { return err } - var ( - services []plugin.Service - plugins = make(map[plugin.PluginType][]interface{}) - eventEmitter = events.NewEmitter() - ) - for _, init := range plugin.Graph() { - id := init.URI() - if !shouldLoad(init) { - log.G(global).WithField("type", init.Type).Infof("skipping plugin %q...", id) - continue - } - log.G(global).WithField("type", init.Type).Infof("loading plugin %q...", id) - ic := plugin.NewContext(plugins) - ic.Root = filepath.Join(conf.Root, id) - - rCtx := events.WithPoster(global, eventEmitter) - ic.Context = log.WithModule(rCtx, id) - if init.Config != nil { - if err := loadPluginConfig(init.ID, init.Config, ic); err != nil { - return err - } - } - ic.Emitter = eventEmitter - - p, err := init.Init(ic) + server, err := server.New(ctx, config, plugins) + if err != nil { + return err + } + if config.Debug.Address != "" { + l, err := sys.GetLocalListener(config.Debug.Address, config.Debug.Uid, config.Debug.Gid) if err != nil { return err } - plugins[init.Type] = append(plugins[init.Type], p) - if s, ok := p.(plugin.Service); ok { - services = append(services, s) - } + serve(log.WithModule(ctx, "debug"), l, server.ServeDebug) } - - // start the GRPC api with the execution service registered - server := newGRPCServer() - for _, service := range services { - if err := service.Register(server); err != nil { + if config.Metrics.Address != "" { + l, err := net.Listen("tcp", config.Metrics.Address) + if err != nil { return err } + serve(log.WithModule(ctx, "metrics"), l, server.ServeMetrics) } - // register metrics last after all other services - grpc_prometheus.Register(server) - log.G(global).Info("starting GRPC API server...") - if err := serveGRPC(server); err != nil { + + l, err := sys.GetLocalListener(address, config.GRPC.Uid, config.GRPC.Gid) + if err != nil { return err } - // start the prometheus metrics API for containerd - serveMetricsAPI() + serve(log.WithModule(ctx, "grpc"), l, server.ServeGRPC) - log.G(global).Infof("containerd successfully booted in %fs", time.Since(start).Seconds()) - return handleSignals(signals, server) + log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds()) + return handleSignals(ctx, signals, server) } if err := app.Run(os.Args); err != nil { fmt.Fprintf(os.Stderr, "containerd: %s\n", err) @@ -171,14 +134,21 @@ func main() { } } -func before(context *cli.Context) error { - err := loadConfig(context.GlobalString("config")) - if err != nil && !os.IsNotExist(err) { - return err - } +func serve(ctx context.Context, l net.Listener, serveFunc func(net.Listener) error) { + path := l.Addr().String() + log.G(ctx).WithField("address", path).Info("serving...") + go func() { + defer l.Close() + if err := serveFunc(l); err != nil { + log.G(ctx).WithError(err).WithField("address", path).Fatal("serve failure") + } + }() +} + +func applyFlags(context *cli.Context, config *server.Config) error { // the order for config vs flag values is that flags will always override // the config values if they are set - if err := setLevel(context); err != nil { + if err := setLevel(context, config); err != nil { return err } for _, v := range []struct { @@ -187,11 +157,11 @@ func before(context *cli.Context) error { }{ { name: "root", - d: &conf.Root, + d: &config.Root, }, { name: "address", - d: &conf.GRPC.Address, + d: &config.GRPC.Address, }, } { if s := context.GlobalString(v.name); s != "" { @@ -201,10 +171,10 @@ func before(context *cli.Context) error { return nil } -func setLevel(context *cli.Context) error { +func setLevel(context *cli.Context, config *server.Config) error { l := context.GlobalString("log-level") if l == "" { - l = conf.Debug.Level + l = config.Debug.Level } if l != "" { lvl, err := logrus.ParseLevel(l) @@ -216,43 +186,16 @@ func setLevel(context *cli.Context) error { return nil } -func serveMetricsAPI() { - if conf.Metrics.Address != "" { - log.G(global).WithField("metrics", conf.Metrics.Address).Info("starting metrics API...") - h := newMetricsHandler() - go func() { - if err := http.ListenAndServe(conf.Metrics.Address, h); err != nil { - log.G(global).WithError(err).Fatal("serve metrics API") - } - }() +func loadPlugins(config *server.Config) ([]*plugin.Registration, error) { + // load all plugins into containerd + if err := plugin.Load(filepath.Join(config.Root, "plugins")); err != nil { + return nil, err } -} - -func newMetricsHandler() http.Handler { - m := http.NewServeMux() - m.Handle("/metrics", metrics.Handler()) - return m -} - -func serveDebugAPI() error { - path := conf.Debug.Address - if path == "" { - return errors.New("debug socket path cannot be empty") - } - l, err := sys.GetLocalListener(path, conf.GRPC.Uid, conf.GRPC.Gid) - if err != nil { - return err - } - log.G(global).WithField("debug", path).Info("starting debug API...") - go func() { - defer l.Close() - // pprof and expvars are imported and automatically register their endpoints - // under /debug - if err := http.Serve(l, nil); err != nil { - log.G(global).WithError(err).Fatal("serve debug API") - } - }() - return nil + // load additional plugins that don't automatically register themselves + registerContentStore() + registerMetaDB() + // return the ordered graph for plugins + return plugin.Graph(), nil } func registerContentStore() { @@ -278,63 +221,6 @@ func registerMetaDB() { }) } -func newGRPCServer() *grpc.Server { - s := grpc.NewServer( - grpc.UnaryInterceptor(interceptor), - grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), - ) - return s -} - -func serveGRPC(server *grpc.Server) error { - path := conf.GRPC.Address - if path == "" { - return errors.New("--socket path cannot be empty") - } - l, err := sys.GetLocalListener(path, conf.GRPC.Uid, conf.GRPC.Gid) - if err != nil { - return err - } - go func() { - defer l.Close() - if err := server.Serve(l); err != nil { - log.G(global).WithError(err).Fatal("serve GRPC") - } - }() - return nil -} - -func interceptor(ctx gocontext.Context, - req interface{}, - info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler, -) (interface{}, error) { - ctx = log.WithModule(ctx, "containerd") - switch info.Server.(type) { - case api.TasksServer: - ctx = log.WithModule(ctx, "execution") - case containersapi.ContainersServer: - ctx = log.WithModule(ctx, "containers") - case contentapi.ContentServer: - ctx = log.WithModule(ctx, "content") - case imagesapi.ImagesServer: - ctx = log.WithModule(ctx, "images") - case grpc_health_v1.HealthServer: - // No need to change the context - case versionapi.VersionServer: - ctx = log.WithModule(ctx, "version") - case snapshotapi.SnapshotsServer: - ctx = log.WithModule(ctx, "snapshot") - case diffapi.DiffServer: - ctx = log.WithModule(ctx, "diff") - case namespacesapi.NamespacesServer: - ctx = log.WithModule(ctx, "namespaces") - default: - log.G(ctx).Warnf("unknown GRPC server type: %#v\n", info.Server) - } - return grpc_prometheus.UnaryServerInterceptor(ctx, req, info, handler) -} - func dumpStacks() { var ( buf []byte @@ -349,23 +235,3 @@ func dumpStacks() { buf = buf[:stackSize] logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) } - -func loadPluginConfig(name string, c interface{}, ic *plugin.InitContext) error { - if err := conf.decodePlugin(name, c); err != nil { - return err - } - ic.Config = c - return nil -} - -func shouldLoad(r *plugin.Registration) bool { - // only load certain plugins based on the config values - switch r.Type { - case plugin.SnapshotPlugin: - return r.URI() == conf.Snapshotter - case plugin.DiffPlugin: - return r.URI() == conf.Differ - default: - return true - } -} diff --git a/cmd/containerd/main_linux.go b/cmd/containerd/main_linux.go index 440b3745b..f17be7a86 100644 --- a/cmd/containerd/main_linux.go +++ b/cmd/containerd/main_linux.go @@ -1,15 +1,15 @@ package main import ( + "context" "os" "golang.org/x/sys/unix" "github.com/containerd/containerd/log" "github.com/containerd/containerd/reaper" + "github.com/containerd/containerd/server" "github.com/containerd/containerd/sys" - "github.com/urfave/cli" - "google.golang.org/grpc" ) const defaultConfigPath = "/etc/containerd/config.toml" @@ -21,29 +21,29 @@ var handledSignals = []os.Signal{ unix.SIGCHLD, } -func platformInit(context *cli.Context) error { - if conf.Subreaper { - log.G(global).Info("setting subreaper...") +func platformInit(ctx context.Context, config *server.Config) error { + if config.Subreaper { + log.G(ctx).Info("setting subreaper...") if err := sys.SetSubreaper(1); err != nil { return err } } - if conf.OOMScore != 0 { - log.G(global).Infof("changing OOM score to %d", conf.OOMScore) - if err := sys.SetOOMScore(os.Getpid(), conf.OOMScore); err != nil { + if config.OOMScore != 0 { + log.G(ctx).Infof("changing OOM score to %d", config.OOMScore) + if err := sys.SetOOMScore(os.Getpid(), config.OOMScore); err != nil { return err } } return nil } -func handleSignals(signals chan os.Signal, server *grpc.Server) error { +func handleSignals(ctx context.Context, signals chan os.Signal, server *server.Server) error { for s := range signals { - log.G(global).WithField("signal", s).Debug("received signal") + log.G(ctx).WithField("signal", s).Debug("received signal") switch s { case unix.SIGCHLD: if err := reaper.Reap(); err != nil { - log.G(global).WithError(err).Error("reap containerd processes") + log.G(ctx).WithError(err).Error("reap containerd processes") } case unix.SIGUSR1: dumpStacks() diff --git a/cmd/containerd/main_unix.go b/cmd/containerd/main_unix.go index d82139dbd..f90fbc584 100644 --- a/cmd/containerd/main_unix.go +++ b/cmd/containerd/main_unix.go @@ -3,35 +3,36 @@ package main import ( + "context" "os" "golang.org/x/sys/unix" "github.com/containerd/containerd/log" "github.com/containerd/containerd/reaper" - "github.com/urfave/cli" - "google.golang.org/grpc" + "github.com/containerd/containerd/server" ) -const ( - defaultConfigPath = "/etc/containerd/config.toml" -) +const defaultConfigPath = "/etc/containerd/config.toml" -var ( - handledSignals = []os.Signal{unix.SIGTERM, unix.SIGINT, unix.SIGUSR1, unix.SIGCHLD} -) +var handledSignals = []os.Signal{ + unix.SIGTERM, + unix.SIGINT, + unix.SIGUSR1, + unix.SIGCHLD, +} -func platformInit(context *cli.Context) error { +func platformInit(context context.Context, config *server.Config) error { return nil } -func handleSignals(signals chan os.Signal, server *grpc.Server) error { +func handleSignals(ctx context.Context, signals chan os.Signal, server *server.Server) error { for s := range signals { - log.G(global).WithField("signal", s).Debug("received signal") + log.G(ctx).WithField("signal", s).Debug("received signal") switch s { case unix.SIGCHLD: if err := reaper.Reap(); err != nil { - log.G(global).WithError(err).Error("reap containerd processes") + log.G(ctx).WithError(err).Error("reap containerd processes") } case unix.SIGUSR1: dumpStacks() diff --git a/cmd/containerd/main_windows.go b/cmd/containerd/main_windows.go index 47ef17736..d5df67281 100644 --- a/cmd/containerd/main_windows.go +++ b/cmd/containerd/main_windows.go @@ -1,27 +1,30 @@ package main import ( + "context" "os" "path/filepath" "syscall" "github.com/containerd/containerd/log" - "github.com/urfave/cli" - "google.golang.org/grpc" + "github.com/containerd/containerd/server" ) var ( defaultConfigPath = filepath.Join(os.Getenv("programfiles"), "containerd", "config.toml") - handledSignals = []os.Signal{syscall.SIGTERM, syscall.SIGINT} + handledSignals = []os.Signal{ + syscall.SIGTERM, + syscall.SIGINT, + } ) -func platformInit(context *cli.Context) error { +func platformInit(context context.Context, config *server.Config) error { return nil } -func handleSignals(signals chan os.Signal, server *grpc.Server) error { +func handleSignals(ctx context.Context, signals chan os.Signal, server *server.Server) error { for s := range signals { - log.G(global).WithField("signal", s).Debug("received signal") + log.G(ctx).WithField("signal", s).Debug("received signal") server.Stop() } return nil diff --git a/plugin/context.go b/plugin/context.go index 0cc36c901..6e6d0fc9c 100644 --- a/plugin/context.go +++ b/plugin/context.go @@ -3,13 +3,17 @@ package plugin import ( "context" "fmt" + "path/filepath" "github.com/containerd/containerd/events" + "github.com/containerd/containerd/log" ) -func NewContext(plugins map[PluginType][]interface{}) *InitContext { +func NewContext(ctx context.Context, plugins map[PluginType][]interface{}, root, id string) *InitContext { return &InitContext{ plugins: plugins, + Root: filepath.Join(root, id), + Context: log.WithModule(ctx, id), } } diff --git a/server/config.go b/server/config.go new file mode 100644 index 000000000..3fa53dea1 --- /dev/null +++ b/server/config.go @@ -0,0 +1,86 @@ +package server + +import ( + "bytes" + "io" + + "github.com/BurntSushi/toml" +) + +// Config provides containerd configuration data for the server +type Config struct { + // Root is the path to a directory where containerd will store persistent data + Root string `toml:"root"` + // GRPC configuration settings + GRPC GRPCConfig `toml:"grpc"` + // Debug and profiling settings + Debug Debug `toml:"debug"` + // Metrics and monitoring settings + Metrics MetricsConfig `toml:"metrics"` + // Snapshotter specifies which snapshot driver to use + Snapshotter string `toml:"snapshotter"` + // Differ specifies which differ to use. Differ is tightly coupled with the snapshotter + // so not all combinations may work. + Differ string `toml:"differ"` + // Plugins provides plugin specific configuration for the initialization of a plugin + Plugins map[string]toml.Primitive `toml:"plugins"` + // Enable containerd as a subreaper + Subreaper bool `toml:"subreaper"` + // OOMScore adjust the containerd's oom score + OOMScore int `toml:"oom_score"` + + md toml.MetaData +} + +type GRPCConfig struct { + Address string `toml:"address"` + Uid int `toml:"uid"` + Gid int `toml:"gid"` +} + +type Debug struct { + Address string `toml:"address"` + Uid int `toml:"uid"` + Gid int `toml:"gid"` + Level string `toml:"level"` +} + +type MetricsConfig struct { + Address string `toml:"address"` +} + +// Decode unmarshals a plugin specific configuration by plugin id +func (c *Config) Decode(id string, v interface{}) (interface{}, error) { + data, ok := c.Plugins[id] + if !ok { + return v, nil + } + if err := c.md.PrimitiveDecode(data, v); err != nil { + return nil, err + } + return v, nil +} + +// WriteTo marshals the config to the provided writer +func (c *Config) WriteTo(w io.Writer) (int64, error) { + buf := bytes.NewBuffer(nil) + e := toml.NewEncoder(buf) + if err := e.Encode(c); err != nil { + return 0, err + } + return io.Copy(w, buf) +} + +// LoadConfig loads the containerd server config from the provided path +func LoadConfig(path string, v *Config) error { + if v == nil { + v = &Config{} + } + md, err := toml.DecodeFile(path, v) + if err != nil { + return err + } + v.md = md + return nil + +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 000000000..cfc34c5c2 --- /dev/null +++ b/server/server.go @@ -0,0 +1,175 @@ +package server + +import ( + "errors" + "expvar" + "net" + "net/http" + "net/http/pprof" + "os" + + containers "github.com/containerd/containerd/api/services/containers/v1" + content "github.com/containerd/containerd/api/services/content/v1" + diff "github.com/containerd/containerd/api/services/diff/v1" + images "github.com/containerd/containerd/api/services/images/v1" + namespaces "github.com/containerd/containerd/api/services/namespaces/v1" + snapshot "github.com/containerd/containerd/api/services/snapshot/v1" + tasks "github.com/containerd/containerd/api/services/tasks/v1" + version "github.com/containerd/containerd/api/services/version/v1" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/plugin" + metrics "github.com/docker/go-metrics" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "golang.org/x/net/context" + + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" +) + +// New creates and initializes a new containerd server +func New(ctx context.Context, config *Config, plugins []*plugin.Registration) (*Server, error) { + if config.Root == "" { + return nil, errors.New("root must be specified") + } + if err := os.MkdirAll(config.Root, 0700); err != nil { + return nil, err + } + rpc := grpc.NewServer( + grpc.UnaryInterceptor(interceptor), + grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), + ) + var ( + services []plugin.Service + s = &Server{ + rpc: rpc, + emitter: events.NewEmitter(), + } + initialized = make(map[plugin.PluginType][]interface{}) + ) + for _, p := range plugins { + id := p.URI() + if !shouldLoadPlugin(p, config) { + log.G(ctx).WithField("type", p.Type).Infof("skip loading plugin %q...", id) + continue + } + log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id) + + initContext := plugin.NewContext( + events.WithPoster(ctx, s.emitter), + initialized, + config.Root, + id, + ) + initContext.Emitter = s.emitter + + // load the plugin specific configuration if it is provided + if p.Config != nil { + pluginConfig, err := config.Decode(p.ID, p.Config) + if err != nil { + return nil, err + } + initContext.Config = pluginConfig + } + instance, err := p.Init(initContext) + if err != nil { + return nil, err + } + initialized[p.Type] = append(initialized[p.Type], instance) + // check for grpc services that should be registered with the server + if service, ok := instance.(plugin.Service); ok { + services = append(services, service) + } + } + // register services after all plugins have been initialized + for _, service := range services { + if err := service.Register(rpc); err != nil { + return nil, err + } + } + return s, nil +} + +// Server is the containerd main daemon +type Server struct { + rpc *grpc.Server + emitter *events.Emitter +} + +// ServeGRPC provides the containerd grpc APIs on the provided listener +func (s *Server) ServeGRPC(l net.Listener) error { + // before we start serving the grpc API regster the grpc_prometheus metrics + // handler. This needs to be the last service registered so that it can collect + // metrics for every other service + grpc_prometheus.Register(s.rpc) + return s.rpc.Serve(l) +} + +// ServeMetrics provides a prometheus endpoint for exposing metrics +func (s *Server) ServeMetrics(l net.Listener) error { + m := http.NewServeMux() + m.Handle("/metrics", metrics.Handler()) + return http.Serve(l, m) +} + +// ServeDebug provides a debug endpoint +func (s *Server) ServeDebug(l net.Listener) error { + // don't use the default http server mux to make sure nothing gets registered + // that we don't want to expose via containerd + m := http.NewServeMux() + m.Handle("/debug/vars", expvar.Handler()) + m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + return http.Serve(l, m) +} + +// Stop gracefully stops the containerd server +func (s *Server) Stop() { + s.rpc.GracefulStop() +} + +func shouldLoadPlugin(p *plugin.Registration, config *Config) bool { + switch p.Type { + case plugin.SnapshotPlugin: + return p.URI() == config.Snapshotter + case plugin.DiffPlugin: + return p.URI() == config.Differ + default: + return true + } +} + +func interceptor( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, +) (interface{}, error) { + ctx = log.WithModule(ctx, "containerd") + switch info.Server.(type) { + case tasks.TasksServer: + ctx = log.WithModule(ctx, "execution") + case containers.ContainersServer: + ctx = log.WithModule(ctx, "containers") + case content.ContentServer: + ctx = log.WithModule(ctx, "content") + case images.ImagesServer: + ctx = log.WithModule(ctx, "images") + case grpc_health_v1.HealthServer: + // No need to change the context + case version.VersionServer: + ctx = log.WithModule(ctx, "version") + case snapshot.SnapshotsServer: + ctx = log.WithModule(ctx, "snapshot") + case diff.DiffServer: + ctx = log.WithModule(ctx, "diff") + case namespaces.NamespacesServer: + ctx = log.WithModule(ctx, "namespaces") + default: + log.G(ctx).Warnf("unknown GRPC server type: %#v\n", info.Server) + } + return grpc_prometheus.UnaryServerInterceptor(ctx, req, info, handler) +}