From 62e692114578dc6b169ab54aa3b4cee599faa19f Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Thu, 18 Jan 2018 03:27:21 +0000 Subject: [PATCH 1/2] Use containerd grpc server Signed-off-by: Lantao Liu --- cmd/cri-containerd/cri_containerd.go | 4 +- cri.go | 22 ++-- pkg/atomic/atomic_boolean.go | 54 ++++++++ pkg/atomic/atomic_boolean_test.go | 32 +++++ pkg/server/events.go | 55 +++++--- pkg/server/instrumented_service.go | 184 +++++++++++++++++++++++---- pkg/server/service.go | 121 ++++++++++++------ pkg/server/version.go | 1 + 8 files changed, 375 insertions(+), 98 deletions(-) create mode 100644 pkg/atomic/atomic_boolean.go create mode 100644 pkg/atomic/atomic_boolean_test.go diff --git a/cmd/cri-containerd/cri_containerd.go b/cmd/cri-containerd/cri_containerd.go index b54a5f863..52327b06a 100644 --- a/cmd/cri-containerd/cri_containerd.go +++ b/cmd/cri-containerd/cri_containerd.go @@ -110,8 +110,8 @@ func main() { // Pass in non-empty final function to avoid os.Exit(1). We expect `Run` // to return itself. h := interrupt.New(func(os.Signal) {}, s.Stop) - if err := h.Run(func() error { return s.Run() }); err != nil { - return fmt.Errorf("failed to run cri-containerd grpc server: %v", err) + if err := h.Run(func() error { return s.Run(true) }); err != nil { + return fmt.Errorf("failed to run cri-containerd with grpc server: %v", err) } return nil } diff --git a/cri.go b/cri.go index b61aa3bb1..c894d8f7e 100644 --- a/cri.go +++ b/cri.go @@ -19,11 +19,13 @@ package cri import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/plugin" + "github.com/pkg/errors" "github.com/containerd/cri-containerd/cmd/cri-containerd/options" "github.com/containerd/cri-containerd/pkg/server" ) +// TODO(random-liu): Use github.com/pkg/errors for our errors. // Register CRI service plugin func init() { plugin.Register(&plugin.Registration{ @@ -48,24 +50,24 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { // TODO(random-liu): Support Config through Registration.Config. // TODO(random-liu): Validate the configuration. // TODO(random-liu): Leverage other fields in InitContext, such as Root. - // TODO(random-liu): Register GRPC service onto containerd GRPC server. // TODO(random-liu): Separate cri plugin config from cri-containerd server config, // because many options only make sense to cri-containerd server. // TODO(random-liu): Handle graceful stop. - // TODO(random-liu): Make grpc interceptor pluggable, and add and use cri context. c := options.DefaultConfig() log.G(ctx).Infof("Start cri plugin with config %+v", c) + + s, err := server.NewCRIContainerdService(c) + if err != nil { + return nil, errors.Wrap(err, "failed to create CRI service") + } + // Use a goroutine to start cri service. The reason is that currently // cri service requires containerd to be running. - // TODO(random-liu): Resolve the circular dependency. go func() { - s, err := server.NewCRIContainerdService(c) - if err != nil { - log.G(ctx).WithError(err).Fatal("Failed to create CRI service") - } - if err := s.Run(); err != nil { - log.G(ctx).WithError(err).Fatal("Failed to run CRI grpc server") + if err := s.Run(false); err != nil { + log.G(ctx).WithError(err).Fatal("Failed to run CRI service") } + // TODO(random-liu): Whether and how we can stop containerd. }() - return nil, nil + return s, nil } diff --git a/pkg/atomic/atomic_boolean.go b/pkg/atomic/atomic_boolean.go new file mode 100644 index 000000000..5efc188b0 --- /dev/null +++ b/pkg/atomic/atomic_boolean.go @@ -0,0 +1,54 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package atomic + +import "sync/atomic" + +// Bool is an atomic Boolean, +// Its methods are all atomic, thus safe to be called by +// multiple goroutines simultaneously. +type Bool interface { + Set() + Unset() + IsSet() bool +} + +// NewBool creates an Bool with given default value +func NewBool(ok bool) Bool { + ab := new(atomicBool) + if ok { + ab.Set() + } + return ab +} + +type atomicBool int32 + +// Set sets the Boolean to true +func (ab *atomicBool) Set() { + atomic.StoreInt32((*int32)(ab), 1) +} + +// Unset sets the Boolean to false +func (ab *atomicBool) Unset() { + atomic.StoreInt32((*int32)(ab), 0) +} + +// IsSet returns whether the Boolean is true +func (ab *atomicBool) IsSet() bool { + return atomic.LoadInt32((*int32)(ab)) == 1 +} diff --git a/pkg/atomic/atomic_boolean_test.go b/pkg/atomic/atomic_boolean_test.go new file mode 100644 index 000000000..9e8ae4af4 --- /dev/null +++ b/pkg/atomic/atomic_boolean_test.go @@ -0,0 +1,32 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package atomic + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBoolean(t *testing.T) { + ab := NewBool(true) + assert.True(t, ab.IsSet()) + ab.Unset() + assert.False(t, ab.IsSet()) + ab.Set() + assert.True(t, ab.IsSet()) +} diff --git a/pkg/server/events.go b/pkg/server/events.go index 120c2443c..4a00853ab 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -17,6 +17,9 @@ limitations under the License. package server import ( + "errors" + + "github.com/containerd/containerd" eventtypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/services/events/v1" containerdio "github.com/containerd/containerd/cio" @@ -26,35 +29,45 @@ import ( "golang.org/x/net/context" containerstore "github.com/containerd/cri-containerd/pkg/store/container" + sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox" ) // eventMonitor monitors containerd event and updates internal state correspondingly. // TODO(random-liu): [P1] Is it possible to drop event during containerd is running? type eventMonitor struct { - c *criContainerdService - ch <-chan *events.Envelope - errCh <-chan error - closeCh chan struct{} - cancel context.CancelFunc + containerStore *containerstore.Store + sandboxStore *sandboxstore.Store + ch <-chan *events.Envelope + errCh <-chan error + ctx context.Context + cancel context.CancelFunc } // Create new event monitor. New event monitor will start subscribing containerd event. All events // happen after it should be monitored. -func newEventMonitor(c *criContainerdService) *eventMonitor { +func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonitor { ctx, cancel := context.WithCancel(context.Background()) - ch, errCh := c.client.Subscribe(ctx) return &eventMonitor{ - c: c, - ch: ch, - errCh: errCh, - closeCh: make(chan struct{}), - cancel: cancel, + containerStore: c, + sandboxStore: s, + ctx: ctx, + cancel: cancel, } } +// subscribe starts subsribe containerd events. We separate subscribe from +func (em *eventMonitor) subscribe(client *containerd.Client) { + em.ch, em.errCh = client.Subscribe(em.ctx) +} + // start starts the event monitor which monitors and handles all container events. It returns -// a channel for the caller to wait for the event monitor to stop. -func (em *eventMonitor) start() <-chan struct{} { +// a channel for the caller to wait for the event monitor to stop. start must be called after +// subscribe. +func (em *eventMonitor) start() (<-chan struct{}, error) { + if em.ch == nil || em.errCh == nil { + return nil, errors.New("event channel is nil") + } + closeCh := make(chan struct{}) go func() { for { select { @@ -63,22 +76,22 @@ func (em *eventMonitor) start() <-chan struct{} { em.handleEvent(e) case err := <-em.errCh: logrus.WithError(err).Error("Failed to handle event stream") - close(em.closeCh) + close(closeCh) return } } }() - return em.closeCh + return closeCh, nil } // stop stops the event monitor. It will close the event channel. +// Once event monitor is stopped, it can't be started. func (em *eventMonitor) stop() { em.cancel() } // handleEvent handles a containerd event. func (em *eventMonitor) handleEvent(evt *events.Envelope) { - c := em.c any, err := typeurl.UnmarshalAny(evt.Event) if err != nil { logrus.WithError(err).Errorf("Failed to convert event envelope %+v", evt) @@ -92,9 +105,9 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { case *eventtypes.TaskExit: e := any.(*eventtypes.TaskExit) logrus.Infof("TaskExit event %+v", e) - cntr, err := c.containerStore.Get(e.ContainerID) + cntr, err := em.containerStore.Get(e.ContainerID) if err != nil { - if _, err := c.sandboxStore.Get(e.ContainerID); err == nil { + if _, err := em.sandboxStore.Get(e.ContainerID); err == nil { return } logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID) @@ -145,9 +158,9 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { case *eventtypes.TaskOOM: e := any.(*eventtypes.TaskOOM) logrus.Infof("TaskOOM event %+v", e) - cntr, err := c.containerStore.Get(e.ContainerID) + cntr, err := em.containerStore.Get(e.ContainerID) if err != nil { - if _, err := c.sandboxStore.Get(e.ContainerID); err == nil { + if _, err := em.sandboxStore.Get(e.ContainerID); err == nil { return } logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID) diff --git a/pkg/server/instrumented_service.go b/pkg/server/instrumented_service.go index 6f2ce041d..b7de2fe81 100644 --- a/pkg/server/instrumented_service.go +++ b/pkg/server/instrumented_service.go @@ -17,6 +17,8 @@ limitations under the License. package server import ( + "errors" + "github.com/sirupsen/logrus" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -27,14 +29,28 @@ import ( // instrumentedService wraps service and logs each operation. type instrumentedService struct { - *criContainerdService + c *criContainerdService } -func newInstrumentedService(c *criContainerdService) CRIContainerdService { - return &instrumentedService{criContainerdService: c} +func newInstrumentedService(c *criContainerdService) grpcServices { + return &instrumentedService{c: c} +} + +// checkInitialized returns error if the server is not fully initialized. +// GRPC service request handlers should return error before server is fully +// initialized. +// NOTE(random-liu): All following functions MUST check initialized at the beginning. +func (in *instrumentedService) checkInitialized() error { + if in.c.initialized.IsSet() { + return nil + } + return errors.New("server is not initialized yet") } func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (res *runtime.RunPodSandboxResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("RunPodSandbox with config %+v", r.GetConfig()) defer func() { if err != nil { @@ -43,10 +59,13 @@ func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.Run logrus.Infof("RunPodSandbox for %+v returns sandbox id %q", r.GetConfig().GetMetadata(), res.GetPodSandboxId()) } }() - return in.criContainerdService.RunPodSandbox(ctx, r) + return in.c.RunPodSandbox(ctx, r) } func (in *instrumentedService) ListPodSandbox(ctx context.Context, r *runtime.ListPodSandboxRequest) (res *runtime.ListPodSandboxResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } log.Tracef("ListPodSandbox with filter %+v", r.GetFilter()) defer func() { if err != nil { @@ -55,10 +74,13 @@ func (in *instrumentedService) ListPodSandbox(ctx context.Context, r *runtime.Li log.Tracef("ListPodSandbox returns sandboxes %+v", res.GetItems()) } }() - return in.criContainerdService.ListPodSandbox(ctx, r) + return in.c.ListPodSandbox(ctx, r) } func (in *instrumentedService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandboxStatusRequest) (res *runtime.PodSandboxStatusResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } log.Tracef("PodSandboxStatus for %q", r.GetPodSandboxId()) defer func() { if err != nil { @@ -67,10 +89,13 @@ func (in *instrumentedService) PodSandboxStatus(ctx context.Context, r *runtime. log.Tracef("PodSandboxStatus for %q returns status %+v", r.GetPodSandboxId(), res.GetStatus()) } }() - return in.criContainerdService.PodSandboxStatus(ctx, r) + return in.c.PodSandboxStatus(ctx, r) } func (in *instrumentedService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (_ *runtime.StopPodSandboxResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("StopPodSandbox for %q", r.GetPodSandboxId()) defer func() { if err != nil { @@ -79,10 +104,13 @@ func (in *instrumentedService) StopPodSandbox(ctx context.Context, r *runtime.St logrus.Infof("StopPodSandbox for %q returns successfully", r.GetPodSandboxId()) } }() - return in.criContainerdService.StopPodSandbox(ctx, r) + return in.c.StopPodSandbox(ctx, r) } func (in *instrumentedService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodSandboxRequest) (_ *runtime.RemovePodSandboxResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("RemovePodSandbox for %q", r.GetPodSandboxId()) defer func() { if err != nil { @@ -91,10 +119,13 @@ func (in *instrumentedService) RemovePodSandbox(ctx context.Context, r *runtime. logrus.Infof("RemovePodSandbox %q returns successfully", r.GetPodSandboxId()) } }() - return in.criContainerdService.RemovePodSandbox(ctx, r) + return in.c.RemovePodSandbox(ctx, r) } func (in *instrumentedService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (res *runtime.PortForwardResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("Portforward for %q port %v", r.GetPodSandboxId(), r.GetPort()) defer func() { if err != nil { @@ -103,10 +134,13 @@ func (in *instrumentedService) PortForward(ctx context.Context, r *runtime.PortF logrus.Infof("Portforward for %q returns URL %q", r.GetPodSandboxId(), res.GetUrl()) } }() - return in.criContainerdService.PortForward(ctx, r) + return in.c.PortForward(ctx, r) } func (in *instrumentedService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (res *runtime.CreateContainerResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("CreateContainer within sandbox %q with container config %+v and sandbox config %+v", r.GetPodSandboxId(), r.GetConfig(), r.GetSandboxConfig()) defer func() { @@ -118,10 +152,13 @@ func (in *instrumentedService) CreateContainer(ctx context.Context, r *runtime.C r.GetPodSandboxId(), r.GetConfig().GetMetadata(), res.GetContainerId()) } }() - return in.criContainerdService.CreateContainer(ctx, r) + return in.c.CreateContainer(ctx, r) } func (in *instrumentedService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (_ *runtime.StartContainerResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("StartContainer for %q", r.GetContainerId()) defer func() { if err != nil { @@ -130,10 +167,13 @@ func (in *instrumentedService) StartContainer(ctx context.Context, r *runtime.St logrus.Infof("StartContainer for %q returns successfully", r.GetContainerId()) } }() - return in.criContainerdService.StartContainer(ctx, r) + return in.c.StartContainer(ctx, r) } func (in *instrumentedService) ListContainers(ctx context.Context, r *runtime.ListContainersRequest) (res *runtime.ListContainersResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } log.Tracef("ListContainers with filter %+v", r.GetFilter()) defer func() { if err != nil { @@ -143,10 +183,13 @@ func (in *instrumentedService) ListContainers(ctx context.Context, r *runtime.Li r.GetFilter(), res.GetContainers()) } }() - return in.criContainerdService.ListContainers(ctx, r) + return in.c.ListContainers(ctx, r) } func (in *instrumentedService) ContainerStatus(ctx context.Context, r *runtime.ContainerStatusRequest) (res *runtime.ContainerStatusResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } log.Tracef("ContainerStatus for %q", r.GetContainerId()) defer func() { if err != nil { @@ -155,10 +198,13 @@ func (in *instrumentedService) ContainerStatus(ctx context.Context, r *runtime.C log.Tracef("ContainerStatus for %q returns status %+v", r.GetContainerId(), res.GetStatus()) } }() - return in.criContainerdService.ContainerStatus(ctx, r) + return in.c.ContainerStatus(ctx, r) } func (in *instrumentedService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (res *runtime.StopContainerResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("StopContainer for %q with timeout %d (s)", r.GetContainerId(), r.GetTimeout()) defer func() { if err != nil { @@ -167,10 +213,13 @@ func (in *instrumentedService) StopContainer(ctx context.Context, r *runtime.Sto logrus.Infof("StopContainer for %q returns successfully", r.GetContainerId()) } }() - return in.criContainerdService.StopContainer(ctx, r) + return in.c.StopContainer(ctx, r) } func (in *instrumentedService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (res *runtime.RemoveContainerResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("RemoveContainer for %q", r.GetContainerId()) defer func() { if err != nil { @@ -179,10 +228,13 @@ func (in *instrumentedService) RemoveContainer(ctx context.Context, r *runtime.R logrus.Infof("RemoveContainer for %q returns successfully", r.GetContainerId()) } }() - return in.criContainerdService.RemoveContainer(ctx, r) + return in.c.RemoveContainer(ctx, r) } func (in *instrumentedService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (res *runtime.ExecSyncResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("ExecSync for %q with command %+v and timeout %d (s)", r.GetContainerId(), r.GetCmd(), r.GetTimeout()) defer func() { if err != nil { @@ -193,10 +245,13 @@ func (in *instrumentedService) ExecSync(ctx context.Context, r *runtime.ExecSync res.GetStdout(), res.GetStderr()) } }() - return in.criContainerdService.ExecSync(ctx, r) + return in.c.ExecSync(ctx, r) } func (in *instrumentedService) Exec(ctx context.Context, r *runtime.ExecRequest) (res *runtime.ExecResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("Exec for %q with command %+v, tty %v and stdin %v", r.GetContainerId(), r.GetCmd(), r.GetTty(), r.GetStdin()) defer func() { @@ -206,10 +261,13 @@ func (in *instrumentedService) Exec(ctx context.Context, r *runtime.ExecRequest) logrus.Infof("Exec for %q returns URL %q", r.GetContainerId(), res.GetUrl()) } }() - return in.criContainerdService.Exec(ctx, r) + return in.c.Exec(ctx, r) } func (in *instrumentedService) Attach(ctx context.Context, r *runtime.AttachRequest) (res *runtime.AttachResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("Attach for %q with tty %v and stdin %v", r.GetContainerId(), r.GetTty(), r.GetStdin()) defer func() { if err != nil { @@ -218,10 +276,13 @@ func (in *instrumentedService) Attach(ctx context.Context, r *runtime.AttachRequ logrus.Infof("Attach for %q returns URL %q", r.GetContainerId(), res.Url) } }() - return in.criContainerdService.Attach(ctx, r) + return in.c.Attach(ctx, r) } func (in *instrumentedService) UpdateContainerResources(ctx context.Context, r *runtime.UpdateContainerResourcesRequest) (res *runtime.UpdateContainerResourcesResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("UpdateContainerResources for %q with %+v", r.GetContainerId(), r.GetLinux()) defer func() { if err != nil { @@ -230,10 +291,13 @@ func (in *instrumentedService) UpdateContainerResources(ctx context.Context, r * logrus.Infof("UpdateContainerResources for %q returns successfully", r.GetContainerId()) } }() - return in.criContainerdService.UpdateContainerResources(ctx, r) + return in.c.UpdateContainerResources(ctx, r) } func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (res *runtime.PullImageResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("PullImage %q with auth config %+v", r.GetImage().GetImage(), r.GetAuth()) defer func() { if err != nil { @@ -243,10 +307,13 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma r.GetImage().GetImage(), res.GetImageRef()) } }() - return in.criContainerdService.PullImage(ctx, r) + return in.c.PullImage(ctx, r) } func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListImagesRequest) (res *runtime.ListImagesResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } log.Tracef("ListImages with filter %+v", r.GetFilter()) defer func() { if err != nil { @@ -256,10 +323,13 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm r.GetFilter(), res.GetImages()) } }() - return in.criContainerdService.ListImages(ctx, r) + return in.c.ListImages(ctx, r) } func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (res *runtime.ImageStatusResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } log.Tracef("ImageStatus for %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -269,10 +339,13 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image r.GetImage().GetImage(), res.GetImage()) } }() - return in.criContainerdService.ImageStatus(ctx, r) + return in.c.ImageStatus(ctx, r) } func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (_ *runtime.RemoveImageResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Infof("RemoveImage %q", r.GetImage().GetImage()) defer func() { if err != nil { @@ -281,10 +354,13 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov logrus.Infof("RemoveImage %q returns successfully", r.GetImage().GetImage()) } }() - return in.criContainerdService.RemoveImage(ctx, r) + return in.c.RemoveImage(ctx, r) } func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (res *runtime.ImageFsInfoResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Debugf("ImageFsInfo") defer func() { if err != nil { @@ -293,10 +369,13 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image logrus.Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems) } }() - return in.criContainerdService.ImageFsInfo(ctx, r) + return in.c.ImageFsInfo(ctx, r) } func (in *instrumentedService) ContainerStats(ctx context.Context, r *runtime.ContainerStatsRequest) (res *runtime.ContainerStatsResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Debugf("ContainerStats for %q", r.GetContainerId()) defer func() { if err != nil { @@ -305,10 +384,13 @@ func (in *instrumentedService) ContainerStats(ctx context.Context, r *runtime.Co logrus.Debugf("ContainerStats for %q returns stats %+v", r.GetContainerId(), res.GetStats()) } }() - return in.criContainerdService.ContainerStats(ctx, r) + return in.c.ContainerStats(ctx, r) } func (in *instrumentedService) ListContainerStats(ctx context.Context, r *runtime.ListContainerStatsRequest) (res *runtime.ListContainerStatsResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } log.Tracef("ListContainerStats with filter %+v", r.GetFilter()) defer func() { if err != nil { @@ -317,10 +399,58 @@ func (in *instrumentedService) ListContainerStats(ctx context.Context, r *runtim log.Tracef("ListContainerStats returns stats %+v", res.GetStats()) } }() - return in.criContainerdService.ListContainerStats(ctx, r) + return in.c.ListContainerStats(ctx, r) +} + +func (in *instrumentedService) Status(ctx context.Context, r *runtime.StatusRequest) (res *runtime.StatusResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } + log.Tracef("Status") + defer func() { + if err != nil { + logrus.WithError(err).Error("Status failed") + } else { + log.Tracef("Status returns status %+v", res.GetStatus()) + } + }() + return in.c.Status(ctx, r) +} + +func (in *instrumentedService) Version(ctx context.Context, r *runtime.VersionRequest) (res *runtime.VersionResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } + log.Tracef("Version with client side version %q", r.GetVersion()) + defer func() { + if err != nil { + logrus.WithError(err).Error("Version failed") + } else { + log.Tracef("Version returns %+v", res) + } + }() + return in.c.Version(ctx, r) +} + +func (in *instrumentedService) UpdateRuntimeConfig(ctx context.Context, r *runtime.UpdateRuntimeConfigRequest) (res *runtime.UpdateRuntimeConfigResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } + logrus.Debugf("UpdateRuntimeConfig with config %+v", r.GetRuntimeConfig()) + defer func() { + if err != nil { + logrus.WithError(err).Error("UpdateRuntimeConfig failed") + } else { + logrus.Debug("UpdateRuntimeConfig returns returns successfully") + } + }() + return in.c.UpdateRuntimeConfig(ctx, r) } func (in *instrumentedService) LoadImage(ctx context.Context, r *api.LoadImageRequest) (res *api.LoadImageResponse, err error) { + if err := in.checkInitialized(); err != nil { + return nil, err + } logrus.Debugf("LoadImage from file %q", r.GetFilePath()) defer func() { if err != nil { @@ -329,5 +459,5 @@ func (in *instrumentedService) LoadImage(ctx context.Context, r *api.LoadImageRe logrus.Debugf("LoadImage returns images %+v", res.GetImages()) } }() - return in.criContainerdService.LoadImage(ctx, r) + return in.c.LoadImage(ctx, r) } diff --git a/pkg/server/service.go b/pkg/server/service.go index e931d6ff6..1c50da521 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -39,6 +39,7 @@ import ( "github.com/containerd/cri-containerd/cmd/cri-containerd/options" api "github.com/containerd/cri-containerd/pkg/api/v1" + "github.com/containerd/cri-containerd/pkg/atomic" osinterface "github.com/containerd/cri-containerd/pkg/os" "github.com/containerd/cri-containerd/pkg/registrar" containerstore "github.com/containerd/cri-containerd/pkg/store/container" @@ -54,15 +55,21 @@ const ( unixProtocol = "unix" ) -// CRIContainerdService is the interface implement CRI remote service server. -type CRIContainerdService interface { - Run() error - Stop() +// grpcServices are all the grpc services provided by cri containerd. +type grpcServices interface { runtime.RuntimeServiceServer runtime.ImageServiceServer api.CRIContainerdServiceServer } +// CRIContainerdService is the interface implement CRI remote service server. +type CRIContainerdService interface { + Run(bool) error + Stop() + plugin.Service + grpcServices +} + // criContainerdService implements CRIContainerdService. type criContainerdService struct { // config contains all configurations. @@ -99,15 +106,14 @@ type criContainerdService struct { streamServer streaming.Server // eventMonitor is the monitor monitors containerd events. eventMonitor *eventMonitor + // initialized indicates whether the server is initialized. All GRPC services + // should return error before the server is initialized. + initialized atomic.Bool } // NewCRIContainerdService returns a new instance of CRIContainerdService func NewCRIContainerdService(config options.Config) (CRIContainerdService, error) { - client, err := containerd.New(config.ContainerdConfig.Endpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace)) - if err != nil { - return nil, fmt.Errorf("failed to initialize containerd client with endpoint %q: %v", - config.ContainerdConfig.Endpoint, err) - } + var err error if config.CgroupPath != "" { _, err := loadCgroup(config.CgroupPath) if err != nil { @@ -131,7 +137,7 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error snapshotStore: snapshotstore.NewStore(), sandboxNameIndex: registrar.NewRegistrar(), containerNameIndex: registrar.NewRegistrar(), - client: client, + initialized: atomic.NewBool(false), } if !c.config.SkipImageFSUUID { @@ -156,22 +162,47 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error return nil, fmt.Errorf("failed to create stream server: %v", err) } - c.eventMonitor = newEventMonitor(c) + c.eventMonitor = newEventMonitor(c.containerStore, c.sandboxStore) - // Create the grpc server and register runtime and image services. + // To avoid race condition between `Run` and `Stop`, still create grpc server + // although we may not use it. It's just a small in-memory data structure. + // TODO(random-liu): Get rid of the grpc server when completely switch + // to plugin mode. c.server = grpc.NewServer() - instrumented := newInstrumentedService(c) - runtime.RegisterRuntimeServiceServer(c.server, instrumented) - runtime.RegisterImageServiceServer(c.server, instrumented) - api.RegisterCRIContainerdServiceServer(c.server, instrumented) - return newInstrumentedService(c), nil + return c, nil } -// Run starts the cri-containerd service. -func (c *criContainerdService) Run() error { +// Register registers all required services onto a specific grpc server. +// This is used by containerd cri plugin. +func (c *criContainerdService) Register(s *grpc.Server) error { + instrumented := newInstrumentedService(c) + runtime.RegisterRuntimeServiceServer(s, instrumented) + runtime.RegisterImageServiceServer(s, instrumented) + api.RegisterCRIContainerdServiceServer(s, instrumented) + return nil +} + +// Run starts the cri-containerd service. startGRPC specifies +// whether to start grpc server in this function. +// TODO(random-liu): Remove `startRPC=true` case when we no longer support cri-containerd +// standalone mode. +func (c *criContainerdService) Run(startGRPC bool) error { logrus.Info("Start cri-containerd service") + // Connect containerd service here, to get rid of the containerd dependency + // in `NewCRIContainerdService`. This is required for plugin mode bootstrapping. + logrus.Info("Connect containerd service") + client, err := containerd.New(c.config.ContainerdConfig.Endpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace)) + if err != nil { + return fmt.Errorf("failed to initialize containerd client with endpoint %q: %v", + c.config.ContainerdConfig.Endpoint, err) + } + c.client = client + + logrus.Info("Start subscribing containerd event") + c.eventMonitor.subscribe(c.client) + logrus.Infof("Start recovering state") if err := c.recover(context.Background()); err != nil { return fmt.Errorf("failed to recover state: %v", err) @@ -179,7 +210,10 @@ func (c *criContainerdService) Run() error { // Start event handler. logrus.Info("Start event monitor") - eventMonitorCloseCh := c.eventMonitor.start() + eventMonitorCloseCh, err := c.eventMonitor.start() + if err != nil { + return fmt.Errorf("failed to start event monitor: %v", err) + } // Start snapshot stats syncer, it doesn't need to be stopped. logrus.Info("Start snapshots syncer") @@ -200,24 +234,32 @@ func (c *criContainerdService) Run() error { close(streamServerCloseCh) }() - // Start grpc server. - // Unlink to cleanup the previous socket file. - logrus.Info("Start grpc server") - err := syscall.Unlink(c.config.SocketPath) - if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to unlink socket file %q: %v", c.config.SocketPath, err) - } - l, err := net.Listen(unixProtocol, c.config.SocketPath) - if err != nil { - return fmt.Errorf("failed to listen on %q: %v", c.config.SocketPath, err) - } + // Set the server as initialized. GRPC services could start serving traffic. + c.initialized.Set() + grpcServerCloseCh := make(chan struct{}) - go func() { - if err := c.server.Serve(l); err != nil { - logrus.WithError(err).Error("Failed to serve grpc grpc request") + if startGRPC { + // Create the grpc server and register runtime and image services. + c.Register(c.server) // nolint: errcheck + // Start grpc server. + // Unlink to cleanup the previous socket file. + logrus.Info("Start grpc server") + err := syscall.Unlink(c.config.SocketPath) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to unlink socket file %q: %v", c.config.SocketPath, err) } - close(grpcServerCloseCh) - }() + l, err := net.Listen(unixProtocol, c.config.SocketPath) + if err != nil { + return fmt.Errorf("failed to listen on %q: %v", c.config.SocketPath, err) + } + go func() { + if err := c.server.Serve(l); err != nil { + logrus.WithError(err).Error("Failed to serve grpc request") + } + close(grpcServerCloseCh) + }() + } + // Keep grpcServerCloseCh open if grpc server is not started. // Stop the whole cri-containerd service if any of the critical service exits. select { @@ -231,8 +273,11 @@ func (c *criContainerdService) Run() error { logrus.Info("Event monitor stopped") <-streamServerCloseCh logrus.Info("Stream server stopped") - <-grpcServerCloseCh - logrus.Info("GRPC server stopped") + if startGRPC { + // Only wait for grpc server close channel when grpc server is started. + <-grpcServerCloseCh + logrus.Info("GRPC server stopped") + } return nil } diff --git a/pkg/server/version.go b/pkg/server/version.go index d7eb2737b..b07bfeef7 100644 --- a/pkg/server/version.go +++ b/pkg/server/version.go @@ -36,6 +36,7 @@ const ( ) // Version returns the runtime name, runtime version and runtime API version. +// TODO(random-liu): Return containerd version since we are going to merge 2 daemons. func (c *criContainerdService) Version(ctx context.Context, r *runtime.VersionRequest) (*runtime.VersionResponse, error) { return &runtime.VersionResponse{ Version: kubeAPIVersion, From a9d846af23ccaf8a0268a1b05a701e57fdfe1817 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Thu, 18 Jan 2018 03:27:28 +0000 Subject: [PATCH 2/2] Upgrade test framework to talk to containerd sock. Signed-off-by: Lantao Liu --- hack/test-integration.sh | 5 ++++- hack/test-utils.sh | 3 +++ integration/restart_test.go | 4 ++-- integration/test_utils.go | 24 +++++++++++++----------- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/hack/test-integration.sh b/hack/test-integration.sh index 64cbd97c6..9d8738e1b 100755 --- a/hack/test-integration.sh +++ b/hack/test-integration.sh @@ -31,7 +31,10 @@ test_setup ${REPORT_DIR} # Run integration test. # Set STANDALONE_CRI_CONTAINERD so that integration test can see it. # Some integration test needs the env to skip itself. -sudo STANDALONE_CRI_CONTAINERD=${STANDALONE_CRI_CONTAINERD} ${ROOT}/_output/integration.test --test.run="${FOCUS}" --test.v +sudo ${ROOT}/_output/integration.test --test.run="${FOCUS}" --test.v \ + --standalone-cri-containerd=${STANDALONE_CRI_CONTAINERD} \ + --cri-containerd-endpoint=${CRICONTAINERD_SOCK} + test_exit_code=$? test_teardown diff --git a/hack/test-utils.sh b/hack/test-utils.sh index 5b61fbb4c..705376914 100644 --- a/hack/test-utils.sh +++ b/hack/test-utils.sh @@ -25,6 +25,9 @@ RESTART_WAIT_PERIOD=${RESTART_WAIT_PERIOD:-10} STANDALONE_CRI_CONTAINERD=${STANDALONE_CRI_CONTAINERD:-true} CRICONTAINERD_SOCK=/var/run/cri-containerd.sock +if ! ${STANDALONE_CRI_CONTAINERD}; then + CRICONTAINERD_SOCK=/var/run/containerd/containerd.sock +fi cri_containerd_pid= containerd_pid= diff --git a/integration/restart_test.go b/integration/restart_test.go index 494f92b8e..af06b56b3 100644 --- a/integration/restart_test.go +++ b/integration/restart_test.go @@ -34,7 +34,7 @@ import ( // NOTE(random-liu): Current restart test only support standalone cri-containerd mode. func TestSandboxAcrossCRIContainerdRestart(t *testing.T) { - if os.Getenv(standaloneEnvKey) == "false" { + if !*standaloneCRIContainerd { t.Skip("Skip because cri-containerd does not run in standalone mode") } ctx := context.Background() @@ -153,7 +153,7 @@ func TestSandboxAcrossCRIContainerdRestart(t *testing.T) { // teardown the network properly. // This test uses host network sandbox to avoid resource leakage. func TestSandboxDeletionAcrossCRIContainerdRestart(t *testing.T) { - if os.Getenv(standaloneEnvKey) == "false" { + if !*standaloneCRIContainerd { t.Skip("Skip because cri-containerd does not run in standalone mode") } ctx := context.Background() diff --git a/integration/test_utils.go b/integration/test_utils.go index 900442b20..4f096fc18 100644 --- a/integration/test_utils.go +++ b/integration/test_utils.go @@ -18,6 +18,7 @@ package integration import ( "errors" + "flag" "fmt" "os/exec" "time" @@ -34,14 +35,11 @@ import ( ) const ( - sock = "/var/run/cri-containerd.sock" - timeout = 1 * time.Minute - pauseImage = "gcr.io/google_containers/pause:3.0" // This is the same with default sandbox image. - k8sNamespace = "k8s.io" // This is the same with server.k8sContainerdNamespace. - containerdEndpoint = "/run/containerd/containerd.sock" - criContainerdEndpoint = "/var/run/cri-containerd.sock" - criContainerdRoot = "/var/lib/cri-containerd" - standaloneEnvKey = "STANDALONE_CRI_CONTAINERD" + timeout = 1 * time.Minute + pauseImage = "gcr.io/google_containers/pause:3.0" // This is the same with default sandbox image. + k8sNamespace = "k8s.io" // This is the same with server.k8sContainerdNamespace. + containerdEndpoint = "/run/containerd/containerd.sock" + criContainerdRoot = "/var/lib/cri-containerd" ) var ( @@ -51,7 +49,11 @@ var ( criContainerdClient api.CRIContainerdServiceClient ) +var standaloneCRIContainerd = flag.Bool("standalone-cri-containerd", true, "Whether cri-containerd is running in standalone mode.") +var criContainerdEndpoint = flag.String("cri-containerd-endpoint", "/var/run/cri-containerd.sock", "The endpoint of cri-containerd.") + func init() { + flag.Parse() if err := ConnectDaemons(); err != nil { logrus.WithError(err).Fatalf("Failed to connect daemons") } @@ -60,11 +62,11 @@ func init() { // ConnectDaemons connect cri-containerd and containerd, and initialize the clients. func ConnectDaemons() error { var err error - runtimeService, err = remote.NewRemoteRuntimeService(sock, timeout) + runtimeService, err = remote.NewRemoteRuntimeService(*criContainerdEndpoint, timeout) if err != nil { return fmt.Errorf("failed to create runtime service: %v", err) } - imageService, err = remote.NewRemoteImageService(sock, timeout) + imageService, err = remote.NewRemoteImageService(*criContainerdEndpoint, timeout) if err != nil { return fmt.Errorf("failed to create image service: %v", err) } @@ -83,7 +85,7 @@ func ConnectDaemons() error { if err != nil { return fmt.Errorf("failed to connect containerd: %v", err) } - criContainerdClient, err = client.NewCRIContainerdClient(criContainerdEndpoint, timeout) + criContainerdClient, err = client.NewCRIContainerdClient(*criContainerdEndpoint, timeout) if err != nil { return fmt.Errorf("failed to connect cri-containerd: %v", err) }