cmd/containerd: split package for cli.App

Signed-off-by: Akihiro Suda <suda.akihiro@lab.ntt.co.jp>
This commit is contained in:
Akihiro Suda
2018-02-14 17:44:22 +09:00
parent af593cf5ab
commit d7280ce2fb
9 changed files with 209 additions and 195 deletions

View File

@@ -0,0 +1,53 @@
package command
import (
"io"
"os"
"github.com/BurntSushi/toml"
"github.com/containerd/containerd/server"
"github.com/urfave/cli"
)
// Config is a wrapper of server config for printing out.
type Config struct {
*server.Config
// Plugins overrides `Plugins map[string]toml.Primitive` in server config.
Plugins map[string]interface{} `toml:"plugins"`
}
// WriteTo marshals the config to the provided writer
func (c *Config) WriteTo(w io.Writer) (int64, error) {
return 0, toml.NewEncoder(w).Encode(c)
}
var configCommand = cli.Command{
Name: "config",
Usage: "information on the containerd config",
Subcommands: []cli.Command{
{
Name: "default",
Usage: "see the output of the default config",
Action: func(context *cli.Context) error {
config := &Config{
Config: defaultConfig(),
}
plugins, err := server.LoadPlugins(config.Config)
if err != nil {
return err
}
if len(plugins) != 0 {
config.Plugins = make(map[string]interface{})
for _, p := range plugins {
if p.Config == nil {
continue
}
config.Plugins[p.ID] = p.Config
}
}
_, err = config.WriteTo(os.Stdout)
return err
},
},
},
}

View File

@@ -0,0 +1,16 @@
package command
import (
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/server"
)
func defaultConfig() *server.Config {
return &server.Config{
Root: defaults.DefaultRootDir,
State: defaults.DefaultStateDir,
GRPC: server.GRPCConfig{
Address: defaults.DefaultAddress,
},
}
}

View File

@@ -0,0 +1,22 @@
// +build !linux,!windows,!solaris
package command
import (
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/server"
)
func defaultConfig() *server.Config {
return &server.Config{
Root: defaults.DefaultRootDir,
State: defaults.DefaultStateDir,
GRPC: server.GRPCConfig{
Address: defaults.DefaultAddress,
},
Debug: server.Debug{
Level: "info",
Address: defaults.DefaultDebugAddress,
},
}
}

View File

@@ -0,0 +1,16 @@
package command
import (
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/server"
)
func defaultConfig() *server.Config {
return &server.Config{
Root: defaults.DefaultRootDir,
State: defaults.DefaultStateDir,
GRPC: server.GRPCConfig{
Address: defaults.DefaultAddress,
},
}
}

View File

@@ -0,0 +1,200 @@
package command
import (
"context"
"fmt"
"io/ioutil"
golog "log"
"net"
"os"
"os/signal"
"path/filepath"
"time"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/server"
"github.com/containerd/containerd/sys"
"github.com/containerd/containerd/version"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
gocontext "golang.org/x/net/context"
"google.golang.org/grpc/grpclog"
)
const usage = `
__ _ __
_________ ____ / /_____ _(_)___ ___ _________/ /
/ ___/ __ \/ __ \/ __/ __ ` + "`" + `/ / __ \/ _ \/ ___/ __ /
/ /__/ /_/ / / / / /_/ /_/ / / / / / __/ / / /_/ /
\___/\____/_/ /_/\__/\__,_/_/_/ /_/\___/_/ \__,_/
high performance container runtime
`
func init() {
// Discard grpc logs so that they don't mess with our stdio
grpclog.SetLogger(golog.New(ioutil.Discard, "", golog.LstdFlags))
cli.VersionPrinter = func(c *cli.Context) {
fmt.Println(c.App.Name, version.Package, c.App.Version, version.Revision)
}
}
// App returns a *cli.App instance.
func App() *cli.App {
app := cli.NewApp()
app.Name = "containerd"
app.Version = version.Version
app.Usage = usage
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "config,c",
Usage: "path to the configuration file",
Value: defaultConfigPath,
},
cli.StringFlag{
Name: "log-level,l",
Usage: "set the logging level [trace, debug, info, warn, error, fatal, panic]",
},
cli.StringFlag{
Name: "address,a",
Usage: "address for containerd's GRPC server",
},
cli.StringFlag{
Name: "root",
Usage: "containerd root directory",
},
cli.StringFlag{
Name: "state",
Usage: "containerd state directory",
},
}
app.Commands = []cli.Command{
configCommand,
publishCommand,
}
app.Action = func(context *cli.Context) error {
var (
start = time.Now()
signals = make(chan os.Signal, 2048)
serverC = make(chan *server.Server, 1)
ctx = gocontext.Background()
config = defaultConfig()
)
done := handleSignals(ctx, signals, serverC)
// start the signal handler as soon as we can to make sure that
// we don't miss any signals during boot
signal.Notify(signals, handledSignals...)
if err := server.LoadConfig(context.GlobalString("config"), config); err != nil && !os.IsNotExist(err) {
return err
}
// apply flags to the config
if err := applyFlags(context, config); err != nil {
return err
}
address := config.GRPC.Address
if address == "" {
return errors.New("grpc address cannot be empty")
}
log.G(ctx).WithFields(logrus.Fields{
"version": version.Version,
"revision": version.Revision,
}).Info("starting containerd")
server, err := server.New(ctx, config)
if err != nil {
return err
}
serverC <- server
if config.Debug.Address != "" {
var l net.Listener
if filepath.IsAbs(config.Debug.Address) {
if l, err = sys.GetLocalListener(config.Debug.Address, config.Debug.UID, config.Debug.GID); err != nil {
return errors.Wrapf(err, "failed to get listener for debug endpoint")
}
} else {
if l, err = net.Listen("tcp", config.Debug.Address); err != nil {
return errors.Wrapf(err, "failed to get listener for debug endpoint")
}
}
serve(ctx, l, server.ServeDebug)
}
if config.Metrics.Address != "" {
l, err := net.Listen("tcp", config.Metrics.Address)
if err != nil {
return errors.Wrapf(err, "failed to get listener for metrics endpoint")
}
serve(ctx, l, server.ServeMetrics)
}
l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID)
if err != nil {
return errors.Wrapf(err, "failed to get listener for main endpoint")
}
serve(ctx, l, server.ServeGRPC)
log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
<-done
return nil
}
return app
}
func serve(ctx context.Context, l net.Listener, serveFunc func(net.Listener) error) {
path := l.Addr().String()
log.G(ctx).WithField("address", path).Info("serving...")
go func() {
defer l.Close()
if err := serveFunc(l); err != nil {
log.G(ctx).WithError(err).WithField("address", path).Fatal("serve failure")
}
}()
}
func applyFlags(context *cli.Context, config *server.Config) error {
// the order for config vs flag values is that flags will always override
// the config values if they are set
if err := setLevel(context, config); err != nil {
return err
}
for _, v := range []struct {
name string
d *string
}{
{
name: "root",
d: &config.Root,
},
{
name: "state",
d: &config.State,
},
{
name: "address",
d: &config.GRPC.Address,
},
} {
if s := context.GlobalString(v.name); s != "" {
*v.d = s
}
}
return nil
}
func setLevel(context *cli.Context, config *server.Config) error {
l := context.GlobalString("log-level")
if l == "" {
l = config.Debug.Level
}
if l != "" {
lvl, err := log.ParseLevel(l)
if err != nil {
return err
}
logrus.SetLevel(lvl)
}
return nil
}

View File

@@ -0,0 +1,67 @@
// +build linux darwin freebsd solaris
package command
import (
"context"
"os"
"runtime"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/server"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
const defaultConfigPath = "/etc/containerd/config.toml"
var handledSignals = []os.Signal{
unix.SIGTERM,
unix.SIGINT,
unix.SIGUSR1,
unix.SIGPIPE,
}
func handleSignals(ctx context.Context, signals chan os.Signal, serverC chan *server.Server) chan struct{} {
done := make(chan struct{}, 1)
go func() {
var server *server.Server
for {
select {
case s := <-serverC:
server = s
case s := <-signals:
log.G(ctx).WithField("signal", s).Debug("received signal")
switch s {
case unix.SIGUSR1:
dumpStacks()
case unix.SIGPIPE:
continue
default:
if server == nil {
close(done)
return
}
server.Stop()
close(done)
}
}
}
}()
return done
}
func dumpStacks() {
var (
buf []byte
stackSize int
)
bufferLen := 16384
for stackSize == len(buf) {
buf = make([]byte, bufferLen)
stackSize = runtime.Stack(buf, true)
bufferLen *= 2
}
buf = buf[:stackSize]
logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
}

View File

@@ -0,0 +1,42 @@
package command
import (
"context"
"os"
"path/filepath"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/server"
"golang.org/x/sys/windows"
)
var (
defaultConfigPath = filepath.Join(os.Getenv("programfiles"), "containerd", "config.toml")
handledSignals = []os.Signal{
windows.SIGTERM,
windows.SIGINT,
}
)
func handleSignals(ctx context.Context, signals chan os.Signal, serverC chan *server.Server) chan struct{} {
done := make(chan struct{})
go func() {
var server *server.Server
for {
select {
case s := <-serverC:
server = s
case s := <-signals:
log.G(ctx).WithField("signal", s).Debug("received signal")
if server == nil {
close(done)
return
}
server.Stop()
close(done)
}
}
}()
return done
}

View File

@@ -0,0 +1,92 @@
package command
import (
gocontext "context"
"io"
"io/ioutil"
"net"
"os"
"time"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/dialer"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/namespaces"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/urfave/cli"
"google.golang.org/grpc"
)
var publishCommand = cli.Command{
Name: "publish",
Usage: "binary to publish events to containerd",
Flags: []cli.Flag{
cli.StringFlag{
Name: "namespace",
Usage: "namespace to publish to",
},
cli.StringFlag{
Name: "topic",
Usage: "topic of the event",
},
},
Action: func(context *cli.Context) error {
ctx := namespaces.WithNamespace(gocontext.Background(), context.String("namespace"))
topic := context.String("topic")
if topic == "" {
return errors.New("topic required to publish event")
}
payload, err := getEventPayload(os.Stdin)
if err != nil {
return err
}
client, err := connectEvents(context.GlobalString("address"))
if err != nil {
return err
}
if _, err := client.Publish(ctx, &eventsapi.PublishRequest{
Topic: topic,
Event: payload,
}); err != nil {
return errdefs.FromGRPC(err)
}
return nil
},
}
func getEventPayload(r io.Reader) (*types.Any, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
var any types.Any
if err := any.Unmarshal(data); err != nil {
return nil, err
}
return &any, nil
}
func connectEvents(address string) (eventsapi.EventsClient, error) {
conn, err := connect(address, dialer.Dialer)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
}
return eventsapi.NewEventsClient(conn), nil
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) {
gopts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithInsecure(),
grpc.WithTimeout(60 * time.Second),
grpc.WithDialer(d),
grpc.FailOnNonTempDialError(true),
grpc.WithBackoffMaxDelay(3 * time.Second),
}
conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
}
return conn, nil
}