Use containerd grpc server

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu
2018-01-18 03:27:21 +00:00
parent ca3b73899a
commit 62e6921145
8 changed files with 375 additions and 98 deletions

View File

@@ -39,6 +39,7 @@ import (
"github.com/containerd/cri-containerd/cmd/cri-containerd/options"
api "github.com/containerd/cri-containerd/pkg/api/v1"
"github.com/containerd/cri-containerd/pkg/atomic"
osinterface "github.com/containerd/cri-containerd/pkg/os"
"github.com/containerd/cri-containerd/pkg/registrar"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
@@ -54,15 +55,21 @@ const (
unixProtocol = "unix"
)
// CRIContainerdService is the interface implement CRI remote service server.
type CRIContainerdService interface {
Run() error
Stop()
// grpcServices are all the grpc services provided by cri containerd.
type grpcServices interface {
runtime.RuntimeServiceServer
runtime.ImageServiceServer
api.CRIContainerdServiceServer
}
// CRIContainerdService is the interface implement CRI remote service server.
type CRIContainerdService interface {
Run(bool) error
Stop()
plugin.Service
grpcServices
}
// criContainerdService implements CRIContainerdService.
type criContainerdService struct {
// config contains all configurations.
@@ -99,15 +106,14 @@ type criContainerdService struct {
streamServer streaming.Server
// eventMonitor is the monitor monitors containerd events.
eventMonitor *eventMonitor
// initialized indicates whether the server is initialized. All GRPC services
// should return error before the server is initialized.
initialized atomic.Bool
}
// NewCRIContainerdService returns a new instance of CRIContainerdService
func NewCRIContainerdService(config options.Config) (CRIContainerdService, error) {
client, err := containerd.New(config.ContainerdConfig.Endpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
if err != nil {
return nil, fmt.Errorf("failed to initialize containerd client with endpoint %q: %v",
config.ContainerdConfig.Endpoint, err)
}
var err error
if config.CgroupPath != "" {
_, err := loadCgroup(config.CgroupPath)
if err != nil {
@@ -131,7 +137,7 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error
snapshotStore: snapshotstore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(),
client: client,
initialized: atomic.NewBool(false),
}
if !c.config.SkipImageFSUUID {
@@ -156,22 +162,47 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error
return nil, fmt.Errorf("failed to create stream server: %v", err)
}
c.eventMonitor = newEventMonitor(c)
c.eventMonitor = newEventMonitor(c.containerStore, c.sandboxStore)
// Create the grpc server and register runtime and image services.
// To avoid race condition between `Run` and `Stop`, still create grpc server
// although we may not use it. It's just a small in-memory data structure.
// TODO(random-liu): Get rid of the grpc server when completely switch
// to plugin mode.
c.server = grpc.NewServer()
instrumented := newInstrumentedService(c)
runtime.RegisterRuntimeServiceServer(c.server, instrumented)
runtime.RegisterImageServiceServer(c.server, instrumented)
api.RegisterCRIContainerdServiceServer(c.server, instrumented)
return newInstrumentedService(c), nil
return c, nil
}
// Run starts the cri-containerd service.
func (c *criContainerdService) Run() error {
// Register registers all required services onto a specific grpc server.
// This is used by containerd cri plugin.
func (c *criContainerdService) Register(s *grpc.Server) error {
instrumented := newInstrumentedService(c)
runtime.RegisterRuntimeServiceServer(s, instrumented)
runtime.RegisterImageServiceServer(s, instrumented)
api.RegisterCRIContainerdServiceServer(s, instrumented)
return nil
}
// Run starts the cri-containerd service. startGRPC specifies
// whether to start grpc server in this function.
// TODO(random-liu): Remove `startRPC=true` case when we no longer support cri-containerd
// standalone mode.
func (c *criContainerdService) Run(startGRPC bool) error {
logrus.Info("Start cri-containerd service")
// Connect containerd service here, to get rid of the containerd dependency
// in `NewCRIContainerdService`. This is required for plugin mode bootstrapping.
logrus.Info("Connect containerd service")
client, err := containerd.New(c.config.ContainerdConfig.Endpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
if err != nil {
return fmt.Errorf("failed to initialize containerd client with endpoint %q: %v",
c.config.ContainerdConfig.Endpoint, err)
}
c.client = client
logrus.Info("Start subscribing containerd event")
c.eventMonitor.subscribe(c.client)
logrus.Infof("Start recovering state")
if err := c.recover(context.Background()); err != nil {
return fmt.Errorf("failed to recover state: %v", err)
@@ -179,7 +210,10 @@ func (c *criContainerdService) Run() error {
// Start event handler.
logrus.Info("Start event monitor")
eventMonitorCloseCh := c.eventMonitor.start()
eventMonitorCloseCh, err := c.eventMonitor.start()
if err != nil {
return fmt.Errorf("failed to start event monitor: %v", err)
}
// Start snapshot stats syncer, it doesn't need to be stopped.
logrus.Info("Start snapshots syncer")
@@ -200,24 +234,32 @@ func (c *criContainerdService) Run() error {
close(streamServerCloseCh)
}()
// Start grpc server.
// Unlink to cleanup the previous socket file.
logrus.Info("Start grpc server")
err := syscall.Unlink(c.config.SocketPath)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to unlink socket file %q: %v", c.config.SocketPath, err)
}
l, err := net.Listen(unixProtocol, c.config.SocketPath)
if err != nil {
return fmt.Errorf("failed to listen on %q: %v", c.config.SocketPath, err)
}
// Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Set()
grpcServerCloseCh := make(chan struct{})
go func() {
if err := c.server.Serve(l); err != nil {
logrus.WithError(err).Error("Failed to serve grpc grpc request")
if startGRPC {
// Create the grpc server and register runtime and image services.
c.Register(c.server) // nolint: errcheck
// Start grpc server.
// Unlink to cleanup the previous socket file.
logrus.Info("Start grpc server")
err := syscall.Unlink(c.config.SocketPath)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to unlink socket file %q: %v", c.config.SocketPath, err)
}
close(grpcServerCloseCh)
}()
l, err := net.Listen(unixProtocol, c.config.SocketPath)
if err != nil {
return fmt.Errorf("failed to listen on %q: %v", c.config.SocketPath, err)
}
go func() {
if err := c.server.Serve(l); err != nil {
logrus.WithError(err).Error("Failed to serve grpc request")
}
close(grpcServerCloseCh)
}()
}
// Keep grpcServerCloseCh open if grpc server is not started.
// Stop the whole cri-containerd service if any of the critical service exits.
select {
@@ -231,8 +273,11 @@ func (c *criContainerdService) Run() error {
logrus.Info("Event monitor stopped")
<-streamServerCloseCh
logrus.Info("Stream server stopped")
<-grpcServerCloseCh
logrus.Info("GRPC server stopped")
if startGRPC {
// Only wait for grpc server close channel when grpc server is started.
<-grpcServerCloseCh
logrus.Info("GRPC server stopped")
}
return nil
}