494 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			494 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
   Copyright The containerd Authors.
 | 
						|
 | 
						|
   Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
   you may not use this file except in compliance with the License.
 | 
						|
   You may obtain a copy of the License at
 | 
						|
 | 
						|
       http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
   Unless required by applicable law or agreed to in writing, software
 | 
						|
   distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
   See the License for the specific language governing permissions and
 | 
						|
   limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package server
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"expvar"
 | 
						|
	"io"
 | 
						|
	"net"
 | 
						|
	"net/http"
 | 
						|
	"net/http/pprof"
 | 
						|
	"os"
 | 
						|
	"path/filepath"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	csapi "github.com/containerd/containerd/api/services/content/v1"
 | 
						|
	ssapi "github.com/containerd/containerd/api/services/snapshots/v1"
 | 
						|
	"github.com/containerd/containerd/content"
 | 
						|
	"github.com/containerd/containerd/content/local"
 | 
						|
	csproxy "github.com/containerd/containerd/content/proxy"
 | 
						|
	"github.com/containerd/containerd/defaults"
 | 
						|
	"github.com/containerd/containerd/diff"
 | 
						|
	"github.com/containerd/containerd/events/exchange"
 | 
						|
	"github.com/containerd/containerd/log"
 | 
						|
	"github.com/containerd/containerd/metadata"
 | 
						|
	"github.com/containerd/containerd/pkg/dialer"
 | 
						|
	"github.com/containerd/containerd/pkg/timeout"
 | 
						|
	"github.com/containerd/containerd/plugin"
 | 
						|
	srvconfig "github.com/containerd/containerd/services/server/config"
 | 
						|
	"github.com/containerd/containerd/snapshots"
 | 
						|
	ssproxy "github.com/containerd/containerd/snapshots/proxy"
 | 
						|
	"github.com/containerd/containerd/sys"
 | 
						|
	"github.com/containerd/ttrpc"
 | 
						|
	metrics "github.com/docker/go-metrics"
 | 
						|
	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
 | 
						|
	"github.com/pkg/errors"
 | 
						|
	bolt "go.etcd.io/bbolt"
 | 
						|
	"google.golang.org/grpc"
 | 
						|
	"google.golang.org/grpc/backoff"
 | 
						|
	"google.golang.org/grpc/credentials"
 | 
						|
)
 | 
						|
 | 
						|
// CreateTopLevelDirectories creates the top-level root and state directories.
 | 
						|
func CreateTopLevelDirectories(config *srvconfig.Config) error {
 | 
						|
	switch {
 | 
						|
	case config.Root == "":
 | 
						|
		return errors.New("root must be specified")
 | 
						|
	case config.State == "":
 | 
						|
		return errors.New("state must be specified")
 | 
						|
	case config.Root == config.State:
 | 
						|
		return errors.New("root and state must be different paths")
 | 
						|
	}
 | 
						|
 | 
						|
	if err := sys.MkdirAllWithACL(config.Root, 0711); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return sys.MkdirAllWithACL(config.State, 0711)
 | 
						|
}
 | 
						|
 | 
						|
// New creates and initializes a new containerd server
 | 
						|
func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
 | 
						|
	if err := apply(ctx, config); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	for key, sec := range config.Timeouts {
 | 
						|
		d, err := time.ParseDuration(sec)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.Errorf("unable to parse %s into a time duration", sec)
 | 
						|
		}
 | 
						|
		timeout.Set(key, d)
 | 
						|
	}
 | 
						|
	plugins, err := LoadPlugins(ctx, config)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	for id, p := range config.StreamProcessors {
 | 
						|
		diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args))
 | 
						|
	}
 | 
						|
 | 
						|
	serverOpts := []grpc.ServerOption{
 | 
						|
		grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
 | 
						|
		grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
 | 
						|
	}
 | 
						|
	if config.GRPC.MaxRecvMsgSize > 0 {
 | 
						|
		serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(config.GRPC.MaxRecvMsgSize))
 | 
						|
	}
 | 
						|
	if config.GRPC.MaxSendMsgSize > 0 {
 | 
						|
		serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize))
 | 
						|
	}
 | 
						|
	ttrpcServer, err := newTTRPCServer()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	tcpServerOpts := serverOpts
 | 
						|
	if config.GRPC.TCPTLSCert != "" {
 | 
						|
		log.G(ctx).Info("setting up tls on tcp GRPC services...")
 | 
						|
		creds, err := credentials.NewServerTLSFromFile(config.GRPC.TCPTLSCert, config.GRPC.TCPTLSKey)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		tcpServerOpts = append(tcpServerOpts, grpc.Creds(creds))
 | 
						|
	}
 | 
						|
	var (
 | 
						|
		grpcServer = grpc.NewServer(serverOpts...)
 | 
						|
		tcpServer  = grpc.NewServer(tcpServerOpts...)
 | 
						|
 | 
						|
		grpcServices  []plugin.Service
 | 
						|
		tcpServices   []plugin.TCPService
 | 
						|
		ttrpcServices []plugin.TTRPCService
 | 
						|
 | 
						|
		s = &Server{
 | 
						|
			grpcServer:  grpcServer,
 | 
						|
			tcpServer:   tcpServer,
 | 
						|
			ttrpcServer: ttrpcServer,
 | 
						|
			events:      exchange.NewExchange(),
 | 
						|
			config:      config,
 | 
						|
		}
 | 
						|
		initialized = plugin.NewPluginSet()
 | 
						|
		required    = make(map[string]struct{})
 | 
						|
	)
 | 
						|
	for _, r := range config.RequiredPlugins {
 | 
						|
		required[r] = struct{}{}
 | 
						|
	}
 | 
						|
	for _, p := range plugins {
 | 
						|
		id := p.URI()
 | 
						|
		reqID := id
 | 
						|
		if config.GetVersion() == 1 {
 | 
						|
			reqID = p.ID
 | 
						|
		}
 | 
						|
		log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)
 | 
						|
 | 
						|
		initContext := plugin.NewContext(
 | 
						|
			ctx,
 | 
						|
			p,
 | 
						|
			initialized,
 | 
						|
			config.Root,
 | 
						|
			config.State,
 | 
						|
		)
 | 
						|
		initContext.Events = s.events
 | 
						|
		initContext.Address = config.GRPC.Address
 | 
						|
		initContext.TTRPCAddress = config.TTRPC.Address
 | 
						|
 | 
						|
		// load the plugin specific configuration if it is provided
 | 
						|
		if p.Config != nil {
 | 
						|
			pc, err := config.Decode(p)
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
			initContext.Config = pc
 | 
						|
		}
 | 
						|
		result := p.Init(initContext)
 | 
						|
		if err := initialized.Add(result); err != nil {
 | 
						|
			return nil, errors.Wrapf(err, "could not add plugin result to plugin set")
 | 
						|
		}
 | 
						|
 | 
						|
		instance, err := result.Instance()
 | 
						|
		if err != nil {
 | 
						|
			if plugin.IsSkipPlugin(err) {
 | 
						|
				log.G(ctx).WithError(err).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
 | 
						|
			} else {
 | 
						|
				log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id)
 | 
						|
			}
 | 
						|
			if _, ok := required[reqID]; ok {
 | 
						|
				return nil, errors.Wrapf(err, "load required plugin %s", id)
 | 
						|
			}
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		delete(required, reqID)
 | 
						|
		// check for grpc services that should be registered with the server
 | 
						|
		if src, ok := instance.(plugin.Service); ok {
 | 
						|
			grpcServices = append(grpcServices, src)
 | 
						|
		}
 | 
						|
		if src, ok := instance.(plugin.TTRPCService); ok {
 | 
						|
			ttrpcServices = append(ttrpcServices, src)
 | 
						|
		}
 | 
						|
		if service, ok := instance.(plugin.TCPService); ok {
 | 
						|
			tcpServices = append(tcpServices, service)
 | 
						|
		}
 | 
						|
 | 
						|
		s.plugins = append(s.plugins, result)
 | 
						|
	}
 | 
						|
	if len(required) != 0 {
 | 
						|
		var missing []string
 | 
						|
		for id := range required {
 | 
						|
			missing = append(missing, id)
 | 
						|
		}
 | 
						|
		return nil, errors.Errorf("required plugin %s not included", missing)
 | 
						|
	}
 | 
						|
 | 
						|
	// register services after all plugins have been initialized
 | 
						|
	for _, service := range grpcServices {
 | 
						|
		if err := service.Register(grpcServer); err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	for _, service := range ttrpcServices {
 | 
						|
		if err := service.RegisterTTRPC(ttrpcServer); err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	for _, service := range tcpServices {
 | 
						|
		if err := service.RegisterTCP(tcpServer); err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return s, nil
 | 
						|
}
 | 
						|
 | 
						|
// Server is the containerd main daemon
 | 
						|
type Server struct {
 | 
						|
	grpcServer  *grpc.Server
 | 
						|
	ttrpcServer *ttrpc.Server
 | 
						|
	tcpServer   *grpc.Server
 | 
						|
	events      *exchange.Exchange
 | 
						|
	config      *srvconfig.Config
 | 
						|
	plugins     []*plugin.Plugin
 | 
						|
}
 | 
						|
 | 
						|
// ServeGRPC provides the containerd grpc APIs on the provided listener
 | 
						|
func (s *Server) ServeGRPC(l net.Listener) error {
 | 
						|
	if s.config.Metrics.GRPCHistogram {
 | 
						|
		// enable grpc time histograms to measure rpc latencies
 | 
						|
		grpc_prometheus.EnableHandlingTimeHistogram()
 | 
						|
	}
 | 
						|
	// before we start serving the grpc API register the grpc_prometheus metrics
 | 
						|
	// handler.  This needs to be the last service registered so that it can collect
 | 
						|
	// metrics for every other service
 | 
						|
	grpc_prometheus.Register(s.grpcServer)
 | 
						|
	return trapClosedConnErr(s.grpcServer.Serve(l))
 | 
						|
}
 | 
						|
 | 
						|
// ServeTTRPC provides the containerd ttrpc APIs on the provided listener
 | 
						|
func (s *Server) ServeTTRPC(l net.Listener) error {
 | 
						|
	return trapClosedConnErr(s.ttrpcServer.Serve(context.Background(), l))
 | 
						|
}
 | 
						|
 | 
						|
// ServeMetrics provides a prometheus endpoint for exposing metrics
 | 
						|
func (s *Server) ServeMetrics(l net.Listener) error {
 | 
						|
	m := http.NewServeMux()
 | 
						|
	m.Handle("/v1/metrics", metrics.Handler())
 | 
						|
	return trapClosedConnErr(http.Serve(l, m))
 | 
						|
}
 | 
						|
 | 
						|
// ServeTCP allows services to serve over tcp
 | 
						|
func (s *Server) ServeTCP(l net.Listener) error {
 | 
						|
	grpc_prometheus.Register(s.tcpServer)
 | 
						|
	return trapClosedConnErr(s.tcpServer.Serve(l))
 | 
						|
}
 | 
						|
 | 
						|
// ServeDebug provides a debug endpoint
 | 
						|
func (s *Server) ServeDebug(l net.Listener) error {
 | 
						|
	// don't use the default http server mux to make sure nothing gets registered
 | 
						|
	// that we don't want to expose via containerd
 | 
						|
	m := http.NewServeMux()
 | 
						|
	m.Handle("/debug/vars", expvar.Handler())
 | 
						|
	m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
 | 
						|
	m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
 | 
						|
	m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
 | 
						|
	m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
 | 
						|
	m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
 | 
						|
	return trapClosedConnErr(http.Serve(l, m))
 | 
						|
}
 | 
						|
 | 
						|
// Stop the containerd server canceling any open connections
 | 
						|
func (s *Server) Stop() {
 | 
						|
	s.grpcServer.Stop()
 | 
						|
	for i := len(s.plugins) - 1; i >= 0; i-- {
 | 
						|
		p := s.plugins[i]
 | 
						|
		instance, err := p.Instance()
 | 
						|
		if err != nil {
 | 
						|
			log.L.WithError(err).WithField("id", p.Registration.URI()).
 | 
						|
				Errorf("could not get plugin instance")
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		closer, ok := instance.(io.Closer)
 | 
						|
		if !ok {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if err := closer.Close(); err != nil {
 | 
						|
			log.L.WithError(err).WithField("id", p.Registration.URI()).
 | 
						|
				Errorf("failed to close plugin")
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// LoadPlugins loads all plugins into containerd and generates an ordered graph
 | 
						|
// of all plugins.
 | 
						|
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {
 | 
						|
	// load all plugins into containerd
 | 
						|
	path := config.PluginDir
 | 
						|
	if path == "" {
 | 
						|
		path = filepath.Join(config.Root, "plugins")
 | 
						|
	}
 | 
						|
	if err := plugin.Load(path); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	// load additional plugins that don't automatically register themselves
 | 
						|
	plugin.Register(&plugin.Registration{
 | 
						|
		Type: plugin.ContentPlugin,
 | 
						|
		ID:   "content",
 | 
						|
		InitFn: func(ic *plugin.InitContext) (interface{}, error) {
 | 
						|
			ic.Meta.Exports["root"] = ic.Root
 | 
						|
			return local.NewStore(ic.Root)
 | 
						|
		},
 | 
						|
	})
 | 
						|
	plugin.Register(&plugin.Registration{
 | 
						|
		Type: plugin.MetadataPlugin,
 | 
						|
		ID:   "bolt",
 | 
						|
		Requires: []plugin.Type{
 | 
						|
			plugin.ContentPlugin,
 | 
						|
			plugin.SnapshotPlugin,
 | 
						|
		},
 | 
						|
		Config: &srvconfig.BoltConfig{
 | 
						|
			ContentSharingPolicy: srvconfig.SharingPolicyShared,
 | 
						|
		},
 | 
						|
		InitFn: func(ic *plugin.InitContext) (interface{}, error) {
 | 
						|
			if err := os.MkdirAll(ic.Root, 0711); err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
			cs, err := ic.Get(plugin.ContentPlugin)
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
 | 
						|
			snapshottersRaw, err := ic.GetByType(plugin.SnapshotPlugin)
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
 | 
						|
			snapshotters := make(map[string]snapshots.Snapshotter)
 | 
						|
			for name, sn := range snapshottersRaw {
 | 
						|
				sn, err := sn.Instance()
 | 
						|
				if err != nil {
 | 
						|
					if !plugin.IsSkipPlugin(err) {
 | 
						|
						log.G(ic.Context).WithError(err).
 | 
						|
							Warnf("could not use snapshotter %v in metadata plugin", name)
 | 
						|
					}
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				snapshotters[name] = sn.(snapshots.Snapshotter)
 | 
						|
			}
 | 
						|
 | 
						|
			shared := true
 | 
						|
			ic.Meta.Exports["policy"] = srvconfig.SharingPolicyShared
 | 
						|
			if cfg, ok := ic.Config.(*srvconfig.BoltConfig); ok {
 | 
						|
				if cfg.ContentSharingPolicy != "" {
 | 
						|
					if err := cfg.Validate(); err != nil {
 | 
						|
						return nil, err
 | 
						|
					}
 | 
						|
					if cfg.ContentSharingPolicy == srvconfig.SharingPolicyIsolated {
 | 
						|
						ic.Meta.Exports["policy"] = srvconfig.SharingPolicyIsolated
 | 
						|
						shared = false
 | 
						|
					}
 | 
						|
 | 
						|
					log.L.WithField("policy", cfg.ContentSharingPolicy).Info("metadata content store policy set")
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			path := filepath.Join(ic.Root, "meta.db")
 | 
						|
			ic.Meta.Exports["path"] = path
 | 
						|
 | 
						|
			db, err := bolt.Open(path, 0644, nil)
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
 | 
						|
			var dbopts []metadata.DBOpt
 | 
						|
			if !shared {
 | 
						|
				dbopts = append(dbopts, metadata.WithPolicyIsolated)
 | 
						|
			}
 | 
						|
			mdb := metadata.NewDB(db, cs.(content.Store), snapshotters, dbopts...)
 | 
						|
			if err := mdb.Init(ic.Context); err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
			return mdb, nil
 | 
						|
		},
 | 
						|
	})
 | 
						|
 | 
						|
	clients := &proxyClients{}
 | 
						|
	for name, pp := range config.ProxyPlugins {
 | 
						|
		var (
 | 
						|
			t plugin.Type
 | 
						|
			f func(*grpc.ClientConn) interface{}
 | 
						|
 | 
						|
			address = pp.Address
 | 
						|
		)
 | 
						|
 | 
						|
		switch pp.Type {
 | 
						|
		case string(plugin.SnapshotPlugin), "snapshot":
 | 
						|
			t = plugin.SnapshotPlugin
 | 
						|
			ssname := name
 | 
						|
			f = func(conn *grpc.ClientConn) interface{} {
 | 
						|
				return ssproxy.NewSnapshotter(ssapi.NewSnapshotsClient(conn), ssname)
 | 
						|
			}
 | 
						|
 | 
						|
		case string(plugin.ContentPlugin), "content":
 | 
						|
			t = plugin.ContentPlugin
 | 
						|
			f = func(conn *grpc.ClientConn) interface{} {
 | 
						|
				return csproxy.NewContentStore(csapi.NewContentClient(conn))
 | 
						|
			}
 | 
						|
		default:
 | 
						|
			log.G(ctx).WithField("type", pp.Type).Warn("unknown proxy plugin type")
 | 
						|
		}
 | 
						|
 | 
						|
		plugin.Register(&plugin.Registration{
 | 
						|
			Type: t,
 | 
						|
			ID:   name,
 | 
						|
			InitFn: func(ic *plugin.InitContext) (interface{}, error) {
 | 
						|
				ic.Meta.Exports["address"] = address
 | 
						|
				conn, err := clients.getClient(address)
 | 
						|
				if err != nil {
 | 
						|
					return nil, err
 | 
						|
				}
 | 
						|
				return f(conn), nil
 | 
						|
			},
 | 
						|
		})
 | 
						|
 | 
						|
	}
 | 
						|
 | 
						|
	filter := srvconfig.V2DisabledFilter
 | 
						|
	if config.GetVersion() == 1 {
 | 
						|
		filter = srvconfig.V1DisabledFilter
 | 
						|
	}
 | 
						|
	// return the ordered graph for plugins
 | 
						|
	return plugin.Graph(filter(config.DisabledPlugins)), nil
 | 
						|
}
 | 
						|
 | 
						|
type proxyClients struct {
 | 
						|
	m       sync.Mutex
 | 
						|
	clients map[string]*grpc.ClientConn
 | 
						|
}
 | 
						|
 | 
						|
func (pc *proxyClients) getClient(address string) (*grpc.ClientConn, error) {
 | 
						|
	pc.m.Lock()
 | 
						|
	defer pc.m.Unlock()
 | 
						|
	if pc.clients == nil {
 | 
						|
		pc.clients = map[string]*grpc.ClientConn{}
 | 
						|
	} else if c, ok := pc.clients[address]; ok {
 | 
						|
		return c, nil
 | 
						|
	}
 | 
						|
 | 
						|
	backoffConfig := backoff.DefaultConfig
 | 
						|
	backoffConfig.MaxDelay = 3 * time.Second
 | 
						|
	connParams := grpc.ConnectParams{
 | 
						|
		Backoff: backoffConfig,
 | 
						|
	}
 | 
						|
	gopts := []grpc.DialOption{
 | 
						|
		grpc.WithInsecure(),
 | 
						|
		grpc.WithConnectParams(connParams),
 | 
						|
		grpc.WithContextDialer(dialer.ContextDialer),
 | 
						|
 | 
						|
		// TODO(stevvooe): We may need to allow configuration of this on the client.
 | 
						|
		grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
 | 
						|
		grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
 | 
						|
	}
 | 
						|
 | 
						|
	conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
 | 
						|
	if err != nil {
 | 
						|
		return nil, errors.Wrapf(err, "failed to dial %q", address)
 | 
						|
	}
 | 
						|
 | 
						|
	pc.clients[address] = conn
 | 
						|
 | 
						|
	return conn, nil
 | 
						|
}
 | 
						|
 | 
						|
func trapClosedConnErr(err error) error {
 | 
						|
	if err == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if strings.Contains(err.Error(), "use of closed network connection") {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 |