Merge pull request #552 from Random-Liu/use-containerd-grpc-server

Use containerd grpc server
This commit is contained in:
Lantao Liu
2018-01-18 12:36:05 -08:00
committed by GitHub
12 changed files with 397 additions and 112 deletions

View File

@@ -17,6 +17,9 @@ limitations under the License.
package server
import (
"errors"
"github.com/containerd/containerd"
eventtypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/services/events/v1"
containerdio "github.com/containerd/containerd/cio"
@@ -26,35 +29,45 @@ import (
"golang.org/x/net/context"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
)
// eventMonitor monitors containerd event and updates internal state correspondingly.
// TODO(random-liu): [P1] Is it possible to drop event during containerd is running?
type eventMonitor struct {
c *criContainerdService
ch <-chan *events.Envelope
errCh <-chan error
closeCh chan struct{}
cancel context.CancelFunc
containerStore *containerstore.Store
sandboxStore *sandboxstore.Store
ch <-chan *events.Envelope
errCh <-chan error
ctx context.Context
cancel context.CancelFunc
}
// Create new event monitor. New event monitor will start subscribing containerd event. All events
// happen after it should be monitored.
func newEventMonitor(c *criContainerdService) *eventMonitor {
func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonitor {
ctx, cancel := context.WithCancel(context.Background())
ch, errCh := c.client.Subscribe(ctx)
return &eventMonitor{
c: c,
ch: ch,
errCh: errCh,
closeCh: make(chan struct{}),
cancel: cancel,
containerStore: c,
sandboxStore: s,
ctx: ctx,
cancel: cancel,
}
}
// subscribe starts subsribe containerd events. We separate subscribe from
func (em *eventMonitor) subscribe(client *containerd.Client) {
em.ch, em.errCh = client.Subscribe(em.ctx)
}
// start starts the event monitor which monitors and handles all container events. It returns
// a channel for the caller to wait for the event monitor to stop.
func (em *eventMonitor) start() <-chan struct{} {
// a channel for the caller to wait for the event monitor to stop. start must be called after
// subscribe.
func (em *eventMonitor) start() (<-chan struct{}, error) {
if em.ch == nil || em.errCh == nil {
return nil, errors.New("event channel is nil")
}
closeCh := make(chan struct{})
go func() {
for {
select {
@@ -63,22 +76,22 @@ func (em *eventMonitor) start() <-chan struct{} {
em.handleEvent(e)
case err := <-em.errCh:
logrus.WithError(err).Error("Failed to handle event stream")
close(em.closeCh)
close(closeCh)
return
}
}
}()
return em.closeCh
return closeCh, nil
}
// stop stops the event monitor. It will close the event channel.
// Once event monitor is stopped, it can't be started.
func (em *eventMonitor) stop() {
em.cancel()
}
// handleEvent handles a containerd event.
func (em *eventMonitor) handleEvent(evt *events.Envelope) {
c := em.c
any, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
logrus.WithError(err).Errorf("Failed to convert event envelope %+v", evt)
@@ -92,9 +105,9 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
case *eventtypes.TaskExit:
e := any.(*eventtypes.TaskExit)
logrus.Infof("TaskExit event %+v", e)
cntr, err := c.containerStore.Get(e.ContainerID)
cntr, err := em.containerStore.Get(e.ContainerID)
if err != nil {
if _, err := c.sandboxStore.Get(e.ContainerID); err == nil {
if _, err := em.sandboxStore.Get(e.ContainerID); err == nil {
return
}
logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID)
@@ -145,9 +158,9 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
case *eventtypes.TaskOOM:
e := any.(*eventtypes.TaskOOM)
logrus.Infof("TaskOOM event %+v", e)
cntr, err := c.containerStore.Get(e.ContainerID)
cntr, err := em.containerStore.Get(e.ContainerID)
if err != nil {
if _, err := c.sandboxStore.Get(e.ContainerID); err == nil {
if _, err := em.sandboxStore.Get(e.ContainerID); err == nil {
return
}
logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID)

View File

@@ -17,6 +17,8 @@ limitations under the License.
package server
import (
"errors"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
@@ -27,14 +29,28 @@ import (
// instrumentedService wraps service and logs each operation.
type instrumentedService struct {
*criContainerdService
c *criContainerdService
}
func newInstrumentedService(c *criContainerdService) CRIContainerdService {
return &instrumentedService{criContainerdService: c}
func newInstrumentedService(c *criContainerdService) grpcServices {
return &instrumentedService{c: c}
}
// checkInitialized returns error if the server is not fully initialized.
// GRPC service request handlers should return error before server is fully
// initialized.
// NOTE(random-liu): All following functions MUST check initialized at the beginning.
func (in *instrumentedService) checkInitialized() error {
if in.c.initialized.IsSet() {
return nil
}
return errors.New("server is not initialized yet")
}
func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (res *runtime.RunPodSandboxResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("RunPodSandbox with config %+v", r.GetConfig())
defer func() {
if err != nil {
@@ -43,10 +59,13 @@ func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.Run
logrus.Infof("RunPodSandbox for %+v returns sandbox id %q", r.GetConfig().GetMetadata(), res.GetPodSandboxId())
}
}()
return in.criContainerdService.RunPodSandbox(ctx, r)
return in.c.RunPodSandbox(ctx, r)
}
func (in *instrumentedService) ListPodSandbox(ctx context.Context, r *runtime.ListPodSandboxRequest) (res *runtime.ListPodSandboxResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.Tracef("ListPodSandbox with filter %+v", r.GetFilter())
defer func() {
if err != nil {
@@ -55,10 +74,13 @@ func (in *instrumentedService) ListPodSandbox(ctx context.Context, r *runtime.Li
log.Tracef("ListPodSandbox returns sandboxes %+v", res.GetItems())
}
}()
return in.criContainerdService.ListPodSandbox(ctx, r)
return in.c.ListPodSandbox(ctx, r)
}
func (in *instrumentedService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandboxStatusRequest) (res *runtime.PodSandboxStatusResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.Tracef("PodSandboxStatus for %q", r.GetPodSandboxId())
defer func() {
if err != nil {
@@ -67,10 +89,13 @@ func (in *instrumentedService) PodSandboxStatus(ctx context.Context, r *runtime.
log.Tracef("PodSandboxStatus for %q returns status %+v", r.GetPodSandboxId(), res.GetStatus())
}
}()
return in.criContainerdService.PodSandboxStatus(ctx, r)
return in.c.PodSandboxStatus(ctx, r)
}
func (in *instrumentedService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (_ *runtime.StopPodSandboxResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("StopPodSandbox for %q", r.GetPodSandboxId())
defer func() {
if err != nil {
@@ -79,10 +104,13 @@ func (in *instrumentedService) StopPodSandbox(ctx context.Context, r *runtime.St
logrus.Infof("StopPodSandbox for %q returns successfully", r.GetPodSandboxId())
}
}()
return in.criContainerdService.StopPodSandbox(ctx, r)
return in.c.StopPodSandbox(ctx, r)
}
func (in *instrumentedService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodSandboxRequest) (_ *runtime.RemovePodSandboxResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("RemovePodSandbox for %q", r.GetPodSandboxId())
defer func() {
if err != nil {
@@ -91,10 +119,13 @@ func (in *instrumentedService) RemovePodSandbox(ctx context.Context, r *runtime.
logrus.Infof("RemovePodSandbox %q returns successfully", r.GetPodSandboxId())
}
}()
return in.criContainerdService.RemovePodSandbox(ctx, r)
return in.c.RemovePodSandbox(ctx, r)
}
func (in *instrumentedService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (res *runtime.PortForwardResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("Portforward for %q port %v", r.GetPodSandboxId(), r.GetPort())
defer func() {
if err != nil {
@@ -103,10 +134,13 @@ func (in *instrumentedService) PortForward(ctx context.Context, r *runtime.PortF
logrus.Infof("Portforward for %q returns URL %q", r.GetPodSandboxId(), res.GetUrl())
}
}()
return in.criContainerdService.PortForward(ctx, r)
return in.c.PortForward(ctx, r)
}
func (in *instrumentedService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (res *runtime.CreateContainerResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("CreateContainer within sandbox %q with container config %+v and sandbox config %+v",
r.GetPodSandboxId(), r.GetConfig(), r.GetSandboxConfig())
defer func() {
@@ -118,10 +152,13 @@ func (in *instrumentedService) CreateContainer(ctx context.Context, r *runtime.C
r.GetPodSandboxId(), r.GetConfig().GetMetadata(), res.GetContainerId())
}
}()
return in.criContainerdService.CreateContainer(ctx, r)
return in.c.CreateContainer(ctx, r)
}
func (in *instrumentedService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (_ *runtime.StartContainerResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("StartContainer for %q", r.GetContainerId())
defer func() {
if err != nil {
@@ -130,10 +167,13 @@ func (in *instrumentedService) StartContainer(ctx context.Context, r *runtime.St
logrus.Infof("StartContainer for %q returns successfully", r.GetContainerId())
}
}()
return in.criContainerdService.StartContainer(ctx, r)
return in.c.StartContainer(ctx, r)
}
func (in *instrumentedService) ListContainers(ctx context.Context, r *runtime.ListContainersRequest) (res *runtime.ListContainersResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.Tracef("ListContainers with filter %+v", r.GetFilter())
defer func() {
if err != nil {
@@ -143,10 +183,13 @@ func (in *instrumentedService) ListContainers(ctx context.Context, r *runtime.Li
r.GetFilter(), res.GetContainers())
}
}()
return in.criContainerdService.ListContainers(ctx, r)
return in.c.ListContainers(ctx, r)
}
func (in *instrumentedService) ContainerStatus(ctx context.Context, r *runtime.ContainerStatusRequest) (res *runtime.ContainerStatusResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.Tracef("ContainerStatus for %q", r.GetContainerId())
defer func() {
if err != nil {
@@ -155,10 +198,13 @@ func (in *instrumentedService) ContainerStatus(ctx context.Context, r *runtime.C
log.Tracef("ContainerStatus for %q returns status %+v", r.GetContainerId(), res.GetStatus())
}
}()
return in.criContainerdService.ContainerStatus(ctx, r)
return in.c.ContainerStatus(ctx, r)
}
func (in *instrumentedService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (res *runtime.StopContainerResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("StopContainer for %q with timeout %d (s)", r.GetContainerId(), r.GetTimeout())
defer func() {
if err != nil {
@@ -167,10 +213,13 @@ func (in *instrumentedService) StopContainer(ctx context.Context, r *runtime.Sto
logrus.Infof("StopContainer for %q returns successfully", r.GetContainerId())
}
}()
return in.criContainerdService.StopContainer(ctx, r)
return in.c.StopContainer(ctx, r)
}
func (in *instrumentedService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (res *runtime.RemoveContainerResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("RemoveContainer for %q", r.GetContainerId())
defer func() {
if err != nil {
@@ -179,10 +228,13 @@ func (in *instrumentedService) RemoveContainer(ctx context.Context, r *runtime.R
logrus.Infof("RemoveContainer for %q returns successfully", r.GetContainerId())
}
}()
return in.criContainerdService.RemoveContainer(ctx, r)
return in.c.RemoveContainer(ctx, r)
}
func (in *instrumentedService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (res *runtime.ExecSyncResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("ExecSync for %q with command %+v and timeout %d (s)", r.GetContainerId(), r.GetCmd(), r.GetTimeout())
defer func() {
if err != nil {
@@ -193,10 +245,13 @@ func (in *instrumentedService) ExecSync(ctx context.Context, r *runtime.ExecSync
res.GetStdout(), res.GetStderr())
}
}()
return in.criContainerdService.ExecSync(ctx, r)
return in.c.ExecSync(ctx, r)
}
func (in *instrumentedService) Exec(ctx context.Context, r *runtime.ExecRequest) (res *runtime.ExecResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("Exec for %q with command %+v, tty %v and stdin %v",
r.GetContainerId(), r.GetCmd(), r.GetTty(), r.GetStdin())
defer func() {
@@ -206,10 +261,13 @@ func (in *instrumentedService) Exec(ctx context.Context, r *runtime.ExecRequest)
logrus.Infof("Exec for %q returns URL %q", r.GetContainerId(), res.GetUrl())
}
}()
return in.criContainerdService.Exec(ctx, r)
return in.c.Exec(ctx, r)
}
func (in *instrumentedService) Attach(ctx context.Context, r *runtime.AttachRequest) (res *runtime.AttachResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("Attach for %q with tty %v and stdin %v", r.GetContainerId(), r.GetTty(), r.GetStdin())
defer func() {
if err != nil {
@@ -218,10 +276,13 @@ func (in *instrumentedService) Attach(ctx context.Context, r *runtime.AttachRequ
logrus.Infof("Attach for %q returns URL %q", r.GetContainerId(), res.Url)
}
}()
return in.criContainerdService.Attach(ctx, r)
return in.c.Attach(ctx, r)
}
func (in *instrumentedService) UpdateContainerResources(ctx context.Context, r *runtime.UpdateContainerResourcesRequest) (res *runtime.UpdateContainerResourcesResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("UpdateContainerResources for %q with %+v", r.GetContainerId(), r.GetLinux())
defer func() {
if err != nil {
@@ -230,10 +291,13 @@ func (in *instrumentedService) UpdateContainerResources(ctx context.Context, r *
logrus.Infof("UpdateContainerResources for %q returns successfully", r.GetContainerId())
}
}()
return in.criContainerdService.UpdateContainerResources(ctx, r)
return in.c.UpdateContainerResources(ctx, r)
}
func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (res *runtime.PullImageResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("PullImage %q with auth config %+v", r.GetImage().GetImage(), r.GetAuth())
defer func() {
if err != nil {
@@ -243,10 +307,13 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma
r.GetImage().GetImage(), res.GetImageRef())
}
}()
return in.criContainerdService.PullImage(ctx, r)
return in.c.PullImage(ctx, r)
}
func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListImagesRequest) (res *runtime.ListImagesResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.Tracef("ListImages with filter %+v", r.GetFilter())
defer func() {
if err != nil {
@@ -256,10 +323,13 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm
r.GetFilter(), res.GetImages())
}
}()
return in.criContainerdService.ListImages(ctx, r)
return in.c.ListImages(ctx, r)
}
func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (res *runtime.ImageStatusResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.Tracef("ImageStatus for %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@@ -269,10 +339,13 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image
r.GetImage().GetImage(), res.GetImage())
}
}()
return in.criContainerdService.ImageStatus(ctx, r)
return in.c.ImageStatus(ctx, r)
}
func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (_ *runtime.RemoveImageResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Infof("RemoveImage %q", r.GetImage().GetImage())
defer func() {
if err != nil {
@@ -281,10 +354,13 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
logrus.Infof("RemoveImage %q returns successfully", r.GetImage().GetImage())
}
}()
return in.criContainerdService.RemoveImage(ctx, r)
return in.c.RemoveImage(ctx, r)
}
func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (res *runtime.ImageFsInfoResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Debugf("ImageFsInfo")
defer func() {
if err != nil {
@@ -293,10 +369,13 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image
logrus.Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems)
}
}()
return in.criContainerdService.ImageFsInfo(ctx, r)
return in.c.ImageFsInfo(ctx, r)
}
func (in *instrumentedService) ContainerStats(ctx context.Context, r *runtime.ContainerStatsRequest) (res *runtime.ContainerStatsResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Debugf("ContainerStats for %q", r.GetContainerId())
defer func() {
if err != nil {
@@ -305,10 +384,13 @@ func (in *instrumentedService) ContainerStats(ctx context.Context, r *runtime.Co
logrus.Debugf("ContainerStats for %q returns stats %+v", r.GetContainerId(), res.GetStats())
}
}()
return in.criContainerdService.ContainerStats(ctx, r)
return in.c.ContainerStats(ctx, r)
}
func (in *instrumentedService) ListContainerStats(ctx context.Context, r *runtime.ListContainerStatsRequest) (res *runtime.ListContainerStatsResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.Tracef("ListContainerStats with filter %+v", r.GetFilter())
defer func() {
if err != nil {
@@ -317,10 +399,58 @@ func (in *instrumentedService) ListContainerStats(ctx context.Context, r *runtim
log.Tracef("ListContainerStats returns stats %+v", res.GetStats())
}
}()
return in.criContainerdService.ListContainerStats(ctx, r)
return in.c.ListContainerStats(ctx, r)
}
func (in *instrumentedService) Status(ctx context.Context, r *runtime.StatusRequest) (res *runtime.StatusResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.Tracef("Status")
defer func() {
if err != nil {
logrus.WithError(err).Error("Status failed")
} else {
log.Tracef("Status returns status %+v", res.GetStatus())
}
}()
return in.c.Status(ctx, r)
}
func (in *instrumentedService) Version(ctx context.Context, r *runtime.VersionRequest) (res *runtime.VersionResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.Tracef("Version with client side version %q", r.GetVersion())
defer func() {
if err != nil {
logrus.WithError(err).Error("Version failed")
} else {
log.Tracef("Version returns %+v", res)
}
}()
return in.c.Version(ctx, r)
}
func (in *instrumentedService) UpdateRuntimeConfig(ctx context.Context, r *runtime.UpdateRuntimeConfigRequest) (res *runtime.UpdateRuntimeConfigResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Debugf("UpdateRuntimeConfig with config %+v", r.GetRuntimeConfig())
defer func() {
if err != nil {
logrus.WithError(err).Error("UpdateRuntimeConfig failed")
} else {
logrus.Debug("UpdateRuntimeConfig returns returns successfully")
}
}()
return in.c.UpdateRuntimeConfig(ctx, r)
}
func (in *instrumentedService) LoadImage(ctx context.Context, r *api.LoadImageRequest) (res *api.LoadImageResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
logrus.Debugf("LoadImage from file %q", r.GetFilePath())
defer func() {
if err != nil {
@@ -329,5 +459,5 @@ func (in *instrumentedService) LoadImage(ctx context.Context, r *api.LoadImageRe
logrus.Debugf("LoadImage returns images %+v", res.GetImages())
}
}()
return in.criContainerdService.LoadImage(ctx, r)
return in.c.LoadImage(ctx, r)
}

View File

@@ -39,6 +39,7 @@ import (
"github.com/containerd/cri-containerd/cmd/cri-containerd/options"
api "github.com/containerd/cri-containerd/pkg/api/v1"
"github.com/containerd/cri-containerd/pkg/atomic"
osinterface "github.com/containerd/cri-containerd/pkg/os"
"github.com/containerd/cri-containerd/pkg/registrar"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
@@ -54,15 +55,21 @@ const (
unixProtocol = "unix"
)
// CRIContainerdService is the interface implement CRI remote service server.
type CRIContainerdService interface {
Run() error
Stop()
// grpcServices are all the grpc services provided by cri containerd.
type grpcServices interface {
runtime.RuntimeServiceServer
runtime.ImageServiceServer
api.CRIContainerdServiceServer
}
// CRIContainerdService is the interface implement CRI remote service server.
type CRIContainerdService interface {
Run(bool) error
Stop()
plugin.Service
grpcServices
}
// criContainerdService implements CRIContainerdService.
type criContainerdService struct {
// config contains all configurations.
@@ -99,15 +106,14 @@ type criContainerdService struct {
streamServer streaming.Server
// eventMonitor is the monitor monitors containerd events.
eventMonitor *eventMonitor
// initialized indicates whether the server is initialized. All GRPC services
// should return error before the server is initialized.
initialized atomic.Bool
}
// NewCRIContainerdService returns a new instance of CRIContainerdService
func NewCRIContainerdService(config options.Config) (CRIContainerdService, error) {
client, err := containerd.New(config.ContainerdConfig.Endpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
if err != nil {
return nil, fmt.Errorf("failed to initialize containerd client with endpoint %q: %v",
config.ContainerdConfig.Endpoint, err)
}
var err error
if config.CgroupPath != "" {
_, err := loadCgroup(config.CgroupPath)
if err != nil {
@@ -131,7 +137,7 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error
snapshotStore: snapshotstore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(),
client: client,
initialized: atomic.NewBool(false),
}
if !c.config.SkipImageFSUUID {
@@ -156,22 +162,47 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error
return nil, fmt.Errorf("failed to create stream server: %v", err)
}
c.eventMonitor = newEventMonitor(c)
c.eventMonitor = newEventMonitor(c.containerStore, c.sandboxStore)
// Create the grpc server and register runtime and image services.
// To avoid race condition between `Run` and `Stop`, still create grpc server
// although we may not use it. It's just a small in-memory data structure.
// TODO(random-liu): Get rid of the grpc server when completely switch
// to plugin mode.
c.server = grpc.NewServer()
instrumented := newInstrumentedService(c)
runtime.RegisterRuntimeServiceServer(c.server, instrumented)
runtime.RegisterImageServiceServer(c.server, instrumented)
api.RegisterCRIContainerdServiceServer(c.server, instrumented)
return newInstrumentedService(c), nil
return c, nil
}
// Run starts the cri-containerd service.
func (c *criContainerdService) Run() error {
// Register registers all required services onto a specific grpc server.
// This is used by containerd cri plugin.
func (c *criContainerdService) Register(s *grpc.Server) error {
instrumented := newInstrumentedService(c)
runtime.RegisterRuntimeServiceServer(s, instrumented)
runtime.RegisterImageServiceServer(s, instrumented)
api.RegisterCRIContainerdServiceServer(s, instrumented)
return nil
}
// Run starts the cri-containerd service. startGRPC specifies
// whether to start grpc server in this function.
// TODO(random-liu): Remove `startRPC=true` case when we no longer support cri-containerd
// standalone mode.
func (c *criContainerdService) Run(startGRPC bool) error {
logrus.Info("Start cri-containerd service")
// Connect containerd service here, to get rid of the containerd dependency
// in `NewCRIContainerdService`. This is required for plugin mode bootstrapping.
logrus.Info("Connect containerd service")
client, err := containerd.New(c.config.ContainerdConfig.Endpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
if err != nil {
return fmt.Errorf("failed to initialize containerd client with endpoint %q: %v",
c.config.ContainerdConfig.Endpoint, err)
}
c.client = client
logrus.Info("Start subscribing containerd event")
c.eventMonitor.subscribe(c.client)
logrus.Infof("Start recovering state")
if err := c.recover(context.Background()); err != nil {
return fmt.Errorf("failed to recover state: %v", err)
@@ -179,7 +210,10 @@ func (c *criContainerdService) Run() error {
// Start event handler.
logrus.Info("Start event monitor")
eventMonitorCloseCh := c.eventMonitor.start()
eventMonitorCloseCh, err := c.eventMonitor.start()
if err != nil {
return fmt.Errorf("failed to start event monitor: %v", err)
}
// Start snapshot stats syncer, it doesn't need to be stopped.
logrus.Info("Start snapshots syncer")
@@ -200,24 +234,32 @@ func (c *criContainerdService) Run() error {
close(streamServerCloseCh)
}()
// Start grpc server.
// Unlink to cleanup the previous socket file.
logrus.Info("Start grpc server")
err := syscall.Unlink(c.config.SocketPath)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to unlink socket file %q: %v", c.config.SocketPath, err)
}
l, err := net.Listen(unixProtocol, c.config.SocketPath)
if err != nil {
return fmt.Errorf("failed to listen on %q: %v", c.config.SocketPath, err)
}
// Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Set()
grpcServerCloseCh := make(chan struct{})
go func() {
if err := c.server.Serve(l); err != nil {
logrus.WithError(err).Error("Failed to serve grpc grpc request")
if startGRPC {
// Create the grpc server and register runtime and image services.
c.Register(c.server) // nolint: errcheck
// Start grpc server.
// Unlink to cleanup the previous socket file.
logrus.Info("Start grpc server")
err := syscall.Unlink(c.config.SocketPath)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to unlink socket file %q: %v", c.config.SocketPath, err)
}
close(grpcServerCloseCh)
}()
l, err := net.Listen(unixProtocol, c.config.SocketPath)
if err != nil {
return fmt.Errorf("failed to listen on %q: %v", c.config.SocketPath, err)
}
go func() {
if err := c.server.Serve(l); err != nil {
logrus.WithError(err).Error("Failed to serve grpc request")
}
close(grpcServerCloseCh)
}()
}
// Keep grpcServerCloseCh open if grpc server is not started.
// Stop the whole cri-containerd service if any of the critical service exits.
select {
@@ -231,8 +273,11 @@ func (c *criContainerdService) Run() error {
logrus.Info("Event monitor stopped")
<-streamServerCloseCh
logrus.Info("Stream server stopped")
<-grpcServerCloseCh
logrus.Info("GRPC server stopped")
if startGRPC {
// Only wait for grpc server close channel when grpc server is started.
<-grpcServerCloseCh
logrus.Info("GRPC server stopped")
}
return nil
}

View File

@@ -36,6 +36,7 @@ const (
)
// Version returns the runtime name, runtime version and runtime API version.
// TODO(random-liu): Return containerd version since we are going to merge 2 daemons.
func (c *criContainerdService) Version(ctx context.Context, r *runtime.VersionRequest) (*runtime.VersionResponse, error) {
return &runtime.VersionResponse{
Version: kubeAPIVersion,