diff --git a/cmd/containerd/command/config.go b/cmd/containerd/command/config.go index eebe2ba5b..d6133f9a1 100644 --- a/cmd/containerd/command/config.go +++ b/cmd/containerd/command/config.go @@ -17,6 +17,7 @@ package command import ( + gocontext "context" "io" "os" @@ -48,7 +49,7 @@ var configCommand = cli.Command{ config := &Config{ Config: defaultConfig(), } - plugins, err := server.LoadPlugins(config.Config) + plugins, err := server.LoadPlugins(gocontext.Background(), config.Config) if err != nil { return err } diff --git a/services/server/config.go b/services/server/config.go index 41d08b43c..a91d5fffd 100644 --- a/services/server/config.go +++ b/services/server/config.go @@ -43,6 +43,8 @@ type Config struct { OOMScore int `toml:"oom_score"` // Cgroup specifies cgroup information for the containerd daemon process Cgroup CgroupConfig `toml:"cgroup"` + // ProxyPlugins configures plugins which are communicated to over GRPC + ProxyPlugins map[string]ProxyPlugin `toml:"proxy_plugins"` md toml.MetaData } @@ -75,6 +77,12 @@ type CgroupConfig struct { Path string `toml:"path"` } +// ProxyPlugin provides a proxy plugin configuration +type ProxyPlugin struct { + Type string `toml:"type"` + Address string `toml:"address"` +} + // Decode unmarshals a plugin specific configuration by plugin id func (c *Config) Decode(id string, v interface{}) (interface{}, error) { data, ok := c.Plugins[id] diff --git a/services/server/server.go b/services/server/server.go index 33d4b6c75..3b2852e42 100644 --- a/services/server/server.go +++ b/services/server/server.go @@ -26,19 +26,26 @@ import ( "os" "path/filepath" "strings" + "sync" + "time" "github.com/boltdb/bolt" + csapi "github.com/containerd/containerd/api/services/content/v1" + ssapi "github.com/containerd/containerd/api/services/snapshots/v1" "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" + csproxy "github.com/containerd/containerd/content/proxy" + "github.com/containerd/containerd/defaults" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/pkg/dialer" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/snapshots" + ssproxy "github.com/containerd/containerd/snapshots/proxy" metrics "github.com/docker/go-metrics" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/pkg/errors" - "google.golang.org/grpc" ) @@ -62,7 +69,7 @@ func New(ctx context.Context, config *Config) (*Server, error) { if err := apply(ctx, config); err != nil { return nil, err } - plugins, err := LoadPlugins(config) + plugins, err := LoadPlugins(ctx, config) if err != nil { return nil, err } @@ -204,7 +211,7 @@ func (s *Server) Stop() { // LoadPlugins loads all plugins into containerd and generates an ordered graph // of all plugins. -func LoadPlugins(config *Config) ([]*plugin.Registration, error) { +func LoadPlugins(ctx context.Context, config *Config) ([]*plugin.Registration, error) { // load all plugins into containerd if err := plugin.Load(filepath.Join(config.Root, "plugins")); err != nil { return nil, err @@ -265,10 +272,85 @@ func LoadPlugins(config *Config) ([]*plugin.Registration, error) { }, }) + clients := &proxyClients{} + for name, pp := range config.ProxyPlugins { + var ( + t plugin.Type + f func(*grpc.ClientConn) interface{} + + address = pp.Address + ) + + switch pp.Type { + case string(plugin.SnapshotPlugin), "snapshot": + t = plugin.SnapshotPlugin + ssname := name + f = func(conn *grpc.ClientConn) interface{} { + return ssproxy.NewSnapshotter(ssapi.NewSnapshotsClient(conn), ssname) + } + + case string(plugin.ContentPlugin), "content": + t = plugin.ContentPlugin + f = func(conn *grpc.ClientConn) interface{} { + return csproxy.NewContentStore(csapi.NewContentClient(conn)) + } + default: + log.G(ctx).WithField("type", pp.Type).Warn("unknown proxy plugin type") + } + + plugin.Register(&plugin.Registration{ + Type: t, + ID: name, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + ic.Meta.Exports["address"] = address + conn, err := clients.getClient(address) + if err != nil { + return nil, err + } + return f(conn), nil + }, + }) + + } + // return the ordered graph for plugins return plugin.Graph(config.DisabledPlugins), nil } +type proxyClients struct { + m sync.Mutex + clients map[string]*grpc.ClientConn +} + +func (pc *proxyClients) getClient(address string) (*grpc.ClientConn, error) { + pc.m.Lock() + defer pc.m.Unlock() + if pc.clients == nil { + pc.clients = map[string]*grpc.ClientConn{} + } else if c, ok := pc.clients[address]; ok { + return c, nil + } + + gopts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBackoffMaxDelay(3 * time.Second), + grpc.WithDialer(dialer.Dialer), + + // TODO(stevvooe): We may need to allow configuration of this on the client. + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)), + } + + conn, err := grpc.Dial(dialer.DialAddress(address), gopts...) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial %q", address) + } + + pc.clients[address] = conn + + return conn, nil +} + func trapClosedConnErr(err error) error { if err == nil { return nil