Merge pull request #2415 from dmcgowan/proxy-plugins
Add support for proxy plugins
This commit is contained in:
		| @@ -17,6 +17,7 @@ | |||||||
| package command | package command | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	gocontext "context" | ||||||
| 	"io" | 	"io" | ||||||
| 	"os" | 	"os" | ||||||
|  |  | ||||||
| @@ -48,7 +49,7 @@ var configCommand = cli.Command{ | |||||||
| 				config := &Config{ | 				config := &Config{ | ||||||
| 					Config: defaultConfig(), | 					Config: defaultConfig(), | ||||||
| 				} | 				} | ||||||
| 				plugins, err := server.LoadPlugins(config.Config) | 				plugins, err := server.LoadPlugins(gocontext.Background(), config.Config) | ||||||
| 				if err != nil { | 				if err != nil { | ||||||
| 					return err | 					return err | ||||||
| 				} | 				} | ||||||
|   | |||||||
| @@ -43,6 +43,8 @@ type Config struct { | |||||||
| 	OOMScore int `toml:"oom_score"` | 	OOMScore int `toml:"oom_score"` | ||||||
| 	// Cgroup specifies cgroup information for the containerd daemon process | 	// Cgroup specifies cgroup information for the containerd daemon process | ||||||
| 	Cgroup CgroupConfig `toml:"cgroup"` | 	Cgroup CgroupConfig `toml:"cgroup"` | ||||||
|  | 	// ProxyPlugins configures plugins which are communicated to over GRPC | ||||||
|  | 	ProxyPlugins map[string]ProxyPlugin `toml:"proxy_plugins"` | ||||||
|  |  | ||||||
| 	md toml.MetaData | 	md toml.MetaData | ||||||
| } | } | ||||||
| @@ -75,6 +77,12 @@ type CgroupConfig struct { | |||||||
| 	Path string `toml:"path"` | 	Path string `toml:"path"` | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // ProxyPlugin provides a proxy plugin configuration | ||||||
|  | type ProxyPlugin struct { | ||||||
|  | 	Type    string `toml:"type"` | ||||||
|  | 	Address string `toml:"address"` | ||||||
|  | } | ||||||
|  |  | ||||||
| // Decode unmarshals a plugin specific configuration by plugin id | // Decode unmarshals a plugin specific configuration by plugin id | ||||||
| func (c *Config) Decode(id string, v interface{}) (interface{}, error) { | func (c *Config) Decode(id string, v interface{}) (interface{}, error) { | ||||||
| 	data, ok := c.Plugins[id] | 	data, ok := c.Plugins[id] | ||||||
|   | |||||||
| @@ -26,19 +26,26 @@ import ( | |||||||
| 	"os" | 	"os" | ||||||
| 	"path/filepath" | 	"path/filepath" | ||||||
| 	"strings" | 	"strings" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/boltdb/bolt" | 	"github.com/boltdb/bolt" | ||||||
|  | 	csapi "github.com/containerd/containerd/api/services/content/v1" | ||||||
|  | 	ssapi "github.com/containerd/containerd/api/services/snapshots/v1" | ||||||
| 	"github.com/containerd/containerd/content" | 	"github.com/containerd/containerd/content" | ||||||
| 	"github.com/containerd/containerd/content/local" | 	"github.com/containerd/containerd/content/local" | ||||||
|  | 	csproxy "github.com/containerd/containerd/content/proxy" | ||||||
|  | 	"github.com/containerd/containerd/defaults" | ||||||
| 	"github.com/containerd/containerd/events/exchange" | 	"github.com/containerd/containerd/events/exchange" | ||||||
| 	"github.com/containerd/containerd/log" | 	"github.com/containerd/containerd/log" | ||||||
| 	"github.com/containerd/containerd/metadata" | 	"github.com/containerd/containerd/metadata" | ||||||
|  | 	"github.com/containerd/containerd/pkg/dialer" | ||||||
| 	"github.com/containerd/containerd/plugin" | 	"github.com/containerd/containerd/plugin" | ||||||
| 	"github.com/containerd/containerd/snapshots" | 	"github.com/containerd/containerd/snapshots" | ||||||
|  | 	ssproxy "github.com/containerd/containerd/snapshots/proxy" | ||||||
| 	metrics "github.com/docker/go-metrics" | 	metrics "github.com/docker/go-metrics" | ||||||
| 	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" | 	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" | ||||||
| 	"github.com/pkg/errors" | 	"github.com/pkg/errors" | ||||||
|  |  | ||||||
| 	"google.golang.org/grpc" | 	"google.golang.org/grpc" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -62,7 +69,7 @@ func New(ctx context.Context, config *Config) (*Server, error) { | |||||||
| 	if err := apply(ctx, config); err != nil { | 	if err := apply(ctx, config); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	plugins, err := LoadPlugins(config) | 	plugins, err := LoadPlugins(ctx, config) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @@ -204,7 +211,7 @@ func (s *Server) Stop() { | |||||||
|  |  | ||||||
| // LoadPlugins loads all plugins into containerd and generates an ordered graph | // LoadPlugins loads all plugins into containerd and generates an ordered graph | ||||||
| // of all plugins. | // of all plugins. | ||||||
| func LoadPlugins(config *Config) ([]*plugin.Registration, error) { | func LoadPlugins(ctx context.Context, config *Config) ([]*plugin.Registration, error) { | ||||||
| 	// load all plugins into containerd | 	// load all plugins into containerd | ||||||
| 	if err := plugin.Load(filepath.Join(config.Root, "plugins")); err != nil { | 	if err := plugin.Load(filepath.Join(config.Root, "plugins")); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @@ -265,10 +272,85 @@ func LoadPlugins(config *Config) ([]*plugin.Registration, error) { | |||||||
| 		}, | 		}, | ||||||
| 	}) | 	}) | ||||||
|  |  | ||||||
|  | 	clients := &proxyClients{} | ||||||
|  | 	for name, pp := range config.ProxyPlugins { | ||||||
|  | 		var ( | ||||||
|  | 			t plugin.Type | ||||||
|  | 			f func(*grpc.ClientConn) interface{} | ||||||
|  |  | ||||||
|  | 			address = pp.Address | ||||||
|  | 		) | ||||||
|  |  | ||||||
|  | 		switch pp.Type { | ||||||
|  | 		case string(plugin.SnapshotPlugin), "snapshot": | ||||||
|  | 			t = plugin.SnapshotPlugin | ||||||
|  | 			ssname := name | ||||||
|  | 			f = func(conn *grpc.ClientConn) interface{} { | ||||||
|  | 				return ssproxy.NewSnapshotter(ssapi.NewSnapshotsClient(conn), ssname) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 		case string(plugin.ContentPlugin), "content": | ||||||
|  | 			t = plugin.ContentPlugin | ||||||
|  | 			f = func(conn *grpc.ClientConn) interface{} { | ||||||
|  | 				return csproxy.NewContentStore(csapi.NewContentClient(conn)) | ||||||
|  | 			} | ||||||
|  | 		default: | ||||||
|  | 			log.G(ctx).WithField("type", pp.Type).Warn("unknown proxy plugin type") | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		plugin.Register(&plugin.Registration{ | ||||||
|  | 			Type: t, | ||||||
|  | 			ID:   name, | ||||||
|  | 			InitFn: func(ic *plugin.InitContext) (interface{}, error) { | ||||||
|  | 				ic.Meta.Exports["address"] = address | ||||||
|  | 				conn, err := clients.getClient(address) | ||||||
|  | 				if err != nil { | ||||||
|  | 					return nil, err | ||||||
|  | 				} | ||||||
|  | 				return f(conn), nil | ||||||
|  | 			}, | ||||||
|  | 		}) | ||||||
|  |  | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// return the ordered graph for plugins | 	// return the ordered graph for plugins | ||||||
| 	return plugin.Graph(config.DisabledPlugins), nil | 	return plugin.Graph(config.DisabledPlugins), nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | type proxyClients struct { | ||||||
|  | 	m       sync.Mutex | ||||||
|  | 	clients map[string]*grpc.ClientConn | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (pc *proxyClients) getClient(address string) (*grpc.ClientConn, error) { | ||||||
|  | 	pc.m.Lock() | ||||||
|  | 	defer pc.m.Unlock() | ||||||
|  | 	if pc.clients == nil { | ||||||
|  | 		pc.clients = map[string]*grpc.ClientConn{} | ||||||
|  | 	} else if c, ok := pc.clients[address]; ok { | ||||||
|  | 		return c, nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	gopts := []grpc.DialOption{ | ||||||
|  | 		grpc.WithInsecure(), | ||||||
|  | 		grpc.WithBackoffMaxDelay(3 * time.Second), | ||||||
|  | 		grpc.WithDialer(dialer.Dialer), | ||||||
|  |  | ||||||
|  | 		// TODO(stevvooe): We may need to allow configuration of this on the client. | ||||||
|  | 		grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)), | ||||||
|  | 		grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)), | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	conn, err := grpc.Dial(dialer.DialAddress(address), gopts...) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, errors.Wrapf(err, "failed to dial %q", address) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	pc.clients[address] = conn | ||||||
|  |  | ||||||
|  | 	return conn, nil | ||||||
|  | } | ||||||
|  |  | ||||||
| func trapClosedConnErr(err error) error { | func trapClosedConnErr(err error) error { | ||||||
| 	if err == nil { | 	if err == nil { | ||||||
| 		return nil | 		return nil | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Akihiro Suda
					Akihiro Suda