diff --git a/cmd/cri-containerd/cri_containerd.go b/cmd/cri-containerd/cri_containerd.go index dffed35df..08e22113e 100644 --- a/cmd/cri-containerd/cri_containerd.go +++ b/cmd/cri-containerd/cri_containerd.go @@ -21,6 +21,7 @@ import ( "github.com/golang/glog" "github.com/spf13/pflag" + "k8s.io/kubernetes/pkg/util/interrupt" "github.com/kubernetes-incubator/cri-containerd/cmd/cri-containerd/options" "github.com/kubernetes-incubator/cri-containerd/pkg/server" @@ -38,7 +39,8 @@ func main() { } glog.V(2).Infof("Run cri-containerd grpc server on socket %q", o.SocketPath) - service, err := server.NewCRIContainerdService( + s, err := server.NewCRIContainerdService( + o.SocketPath, o.ContainerdEndpoint, o.ContainerdSnapshotter, o.RootDir, @@ -51,10 +53,11 @@ func main() { if err != nil { glog.Exitf("Failed to create CRI containerd service %+v: %v", o, err) } - service.Start() - - s := server.NewCRIContainerdServer(o.SocketPath, service, service) - if err := s.Run(); err != nil { + // Use interrupt handler to make sure the server to be stopped properly. + // Pass in non-empty final function to avoid os.Exit(1). We expect `Run` + // to return itself. + h := interrupt.New(func(os.Signal) {}, s.Stop) + if err := h.Run(func() error { return s.Run() }); err != nil { glog.Exitf("Failed to run cri-containerd grpc server: %v", err) } } diff --git a/pkg/server/events.go b/pkg/server/events.go index 5ef8f0b41..37a68bc0b 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -17,72 +17,69 @@ limitations under the License. package server import ( - "time" + "fmt" "github.com/containerd/containerd" "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/typeurl" "github.com/golang/glog" - "github.com/jpillora/backoff" "golang.org/x/net/context" containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" ) -const ( - // minRetryInterval is the minimum retry interval when lost connection with containerd. - minRetryInterval = 100 * time.Millisecond - // maxRetryInterval is the maximum retry interval when lost connection with containerd. - maxRetryInterval = 30 * time.Second - // exponentialFactor is the exponential backoff factor. - exponentialFactor = 2.0 -) - -// startEventMonitor starts an event monitor which monitors and handles all -// container events. +// eventMonitor monitors containerd event and updates internal state correspondingly. // TODO(random-liu): [P1] Is it possible to drop event during containerd is running? -func (c *criContainerdService) startEventMonitor() { - b := backoff.Backoff{ - Min: minRetryInterval, - Max: maxRetryInterval, - Factor: exponentialFactor, - } - go func() { - for { - eventstream, err := c.eventService.Subscribe(context.Background(), &events.SubscribeRequest{}) - if err != nil { - glog.Errorf("Failed to connect to containerd event stream: %v", err) - time.Sleep(b.Duration()) - continue - } - // Successfully connect with containerd, reset backoff. - b.Reset() - // TODO(random-liu): Relist to recover state, should prevent other operations - // until state is fully recovered. - for { - if err := c.handleEventStream(eventstream); err != nil { - glog.Errorf("Failed to handle event stream: %v", err) - break - } - } - } - }() +type eventMonitor struct { + c *criContainerdService + eventstream events.Events_SubscribeClient + cancel context.CancelFunc + closeCh chan struct{} } -// handleEventStream receives an event from containerd and handles the event. -func (c *criContainerdService) handleEventStream(eventstream events.Events_SubscribeClient) error { - e, err := eventstream.Recv() +// Create new event monitor. New event monitor will start subscribing containerd event. All events +// happen after it should be monitored. +func newEventMonitor(c *criContainerdService) (*eventMonitor, error) { + ctx, cancel := context.WithCancel(context.Background()) + e, err := c.client.Events(ctx) if err != nil { - return err + return nil, fmt.Errorf("failed to subscribe containerd event: %v", err) } - glog.V(4).Infof("Received container event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic) - c.handleEvent(e) - return nil + return &eventMonitor{ + c: c, + eventstream: e, + cancel: cancel, + closeCh: make(chan struct{}), + }, nil +} + +// start starts the event monitor which monitors and handles all container events. It returns +// a channel for the caller to wait for the event monitor to stop. +func (em *eventMonitor) start() <-chan struct{} { + go func() { + for { + e, err := em.eventstream.Recv() + if err != nil { + glog.Errorf("Failed to handle event stream: %v", err) + close(em.closeCh) + return + } + glog.V(4).Infof("Received container event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic) + em.handleEvent(e) + } + }() + return em.closeCh +} + +// stop stops the event monitor. It will close the event channel. +func (em *eventMonitor) stop() { + em.cancel() } // handleEvent handles a containerd event. -func (c *criContainerdService) handleEvent(evt *events.Envelope) { +func (em *eventMonitor) handleEvent(evt *events.Envelope) { + c := em.c any, err := typeurl.UnmarshalAny(evt.Event) if err != nil { glog.Errorf("Failed to convert event envelope %+v: %v", evt, err) diff --git a/pkg/server/server.go b/pkg/server/server.go deleted file mode 100644 index 7bfa0c850..000000000 --- a/pkg/server/server.go +++ /dev/null @@ -1,74 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package server - -import ( - "fmt" - "net" - "os" - "syscall" - - "github.com/golang/glog" - "google.golang.org/grpc" - "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" - "k8s.io/kubernetes/pkg/util/interrupt" -) - -// unixProtocol is the network protocol of unix socket. -const unixProtocol = "unix" - -// CRIContainerdServer is the grpc server of cri-containerd. -type CRIContainerdServer struct { - // addr is the address to serve on. - addr string - // runtimeService is the cri-containerd runtime service. - runtimeService runtime.RuntimeServiceServer - // imageService is the cri-containerd image service. - imageService runtime.ImageServiceServer - // server is the grpc server. - server *grpc.Server -} - -// NewCRIContainerdServer creates the cri-containerd grpc server. -func NewCRIContainerdServer(addr string, r runtime.RuntimeServiceServer, i runtime.ImageServiceServer) *CRIContainerdServer { - return &CRIContainerdServer{ - addr: addr, - runtimeService: r, - imageService: i, - } -} - -// Run runs the cri-containerd grpc server. -func (s *CRIContainerdServer) Run() error { - glog.V(2).Infof("Start cri-containerd grpc server") - // Unlink to cleanup the previous socket file. - err := syscall.Unlink(s.addr) - if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to unlink socket file %q: %v", s.addr, err) - } - l, err := net.Listen(unixProtocol, s.addr) - if err != nil { - return fmt.Errorf("failed to listen on %q: %v", s.addr, err) - } - // Create the grpc server and register runtime and image services. - s.server = grpc.NewServer() - runtime.RegisterRuntimeServiceServer(s.server, s.runtimeService) - runtime.RegisterImageServiceServer(s.server, s.imageService) - // Use interrupt handler to make sure the server to be stopped properly. - h := interrupt.New(nil, s.server.Stop) - return h.Run(func() error { return s.server.Serve(l) }) -} diff --git a/pkg/server/service.go b/pkg/server/service.go index 5774f4cba..5599791e8 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -18,14 +18,17 @@ package server import ( "fmt" + "net" + "os" + "syscall" "github.com/containerd/containerd" - "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/services/tasks/v1" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/cri-o/ocicni" "github.com/golang/glog" + "google.golang.org/grpc" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/server/streaming" @@ -36,18 +39,27 @@ import ( sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox" ) -// k8sContainerdNamespace is the namespace we use to connect containerd. -const k8sContainerdNamespace = "k8s.io" +const ( + // k8sContainerdNamespace is the namespace we use to connect containerd. + k8sContainerdNamespace = "k8s.io" + // unixProtocol is the network protocol of unix socket. + unixProtocol = "unix" +) // CRIContainerdService is the interface implement CRI remote service server. type CRIContainerdService interface { - Start() + Run() error + Stop() runtime.RuntimeServiceServer runtime.ImageServiceServer } // criContainerdService implements CRIContainerdService. type criContainerdService struct { + // serverAddress is the grpc server unix path. + serverAddress string + // server is the grpc server. + server *grpc.Server // os is an interface for all required os operations. os osinterface.OS // rootDir is the directory for managing cri-containerd files. @@ -76,8 +88,6 @@ type criContainerdService struct { // imageStoreService is the containerd service to store and track // image metadata. imageStoreService images.Store - // eventsService is the containerd task service client - eventService events.EventsClient // netPlugin is used to setup and teardown network when run/stop pod sandbox. netPlugin ocicni.CNIPlugin // client is an instance of the containerd client @@ -86,11 +96,14 @@ type criContainerdService struct { streamServer streaming.Server // cgroupPath in which the cri-containerd is placed in cgroupPath string + // eventMonitor is the monitor monitors containerd events. + eventMonitor *eventMonitor } // NewCRIContainerdService returns a new instance of CRIContainerdService // TODO(random-liu): Add cri-containerd server config to get rid of the long arg list. func NewCRIContainerdService( + serverAddress, containerdEndpoint, containerdSnapshotter, rootDir, @@ -113,6 +126,7 @@ func NewCRIContainerdService( } c := &criContainerdService{ + serverAddress: serverAddress, os: osinterface.RealOS{}, rootDir: rootDir, sandboxImage: defaultSandboxImage, @@ -124,7 +138,6 @@ func NewCRIContainerdService( containerNameIndex: registrar.NewRegistrar(), taskService: client.TaskService(), imageStoreService: client.ImageService(), - eventService: client.EventService(), contentStoreService: client.ContentStore(), client: client, cgroupPath: cgroupPath, @@ -142,14 +155,78 @@ func NewCRIContainerdService( return nil, fmt.Errorf("failed to create stream server: %v", err) } + c.eventMonitor, err = newEventMonitor(c) + if err != nil { + return nil, fmt.Errorf("failed to create event monitor: %v", err) + } + + // Create the grpc server and register runtime and image services. + c.server = grpc.NewServer() + runtime.RegisterRuntimeServiceServer(c.server, newInstrumentedService(c)) + runtime.RegisterImageServiceServer(c.server, newInstrumentedService(c)) + return newInstrumentedService(c), nil } -func (c *criContainerdService) Start() { - c.startEventMonitor() +// Run starts the cri-containerd service. +func (c *criContainerdService) Run() error { + glog.V(2).Info("Start cri-containerd service") + // TODO(random-liu): Recover state. + + // Start event handler. + glog.V(2).Info("Start event monitor") + eventMonitorCloseCh := c.eventMonitor.start() + + // Start streaming server. + glog.V(2).Info("Start streaming server") + streamServerCloseCh := make(chan struct{}) go func() { if err := c.streamServer.Start(true); err != nil { glog.Errorf("Failed to start streaming server: %v", err) } + close(streamServerCloseCh) }() + + // Start grpc server. + // Unlink to cleanup the previous socket file. + glog.V(2).Info("Start grpc server") + err := syscall.Unlink(c.serverAddress) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to unlink socket file %q: %v", c.serverAddress, err) + } + l, err := net.Listen(unixProtocol, c.serverAddress) + if err != nil { + return fmt.Errorf("failed to listen on %q: %v", c.serverAddress, err) + } + grpcServerCloseCh := make(chan struct{}) + go func() { + if err := c.server.Serve(l); err != nil { + glog.Errorf("Failed to serve grpc grpc request: %v", err) + } + close(grpcServerCloseCh) + }() + + // Stop the whole cri-containerd service if any of the critical service exits. + select { + case <-eventMonitorCloseCh: + case <-streamServerCloseCh: + case <-grpcServerCloseCh: + } + c.Stop() + + <-eventMonitorCloseCh + glog.V(2).Info("Event monitor stopped") + <-streamServerCloseCh + glog.V(2).Info("Stream server stopped") + <-grpcServerCloseCh + glog.V(2).Info("GRPC server stopped") + return nil +} + +// Stop stops the cri-containerd service. +func (c *criContainerdService) Stop() { + glog.V(2).Info("Stop cri-containerd service") + c.eventMonitor.stop() + c.streamServer.Stop() // nolint: errcheck + c.server.Stop() } diff --git a/vendor.conf b/vendor.conf index e368cdde4..daf884e5e 100644 --- a/vendor.conf +++ b/vendor.conf @@ -26,7 +26,6 @@ github.com/go-openapi/jsonreference 13c6e3589ad90f49bd3e3bbe2c2cb3d7a4142272 github.com/go-openapi/spec 6aced65f8501fe1217321abf0749d354824ba2ff github.com/go-openapi/swag 1d0bd113de87027671077d3c71eb3ac5d7dbba72 github.com/godbus/dbus 97646858c46433e4afb3432ad28c12e968efa298 -github.com/jpillora/backoff 06c7a16c845dc8e0bf575fafeeca0f5462f5eb4d github.com/juju/ratelimit 5b9ff866471762aa2ab2dced63c9fb6f53921342 github.com/mailru/easyjson d5b7844b561a7bc640052f1b935f7b800330d7e0 github.com/Microsoft/go-winio v0.4.4