Let cri-containerd exit with containerd

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2017-08-25 23:40:24 +00:00
parent 82ee80d0fa
commit c3e8c69aff
5 changed files with 138 additions and 136 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"github.com/spf13/pflag" "github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/util/interrupt"
"github.com/kubernetes-incubator/cri-containerd/cmd/cri-containerd/options" "github.com/kubernetes-incubator/cri-containerd/cmd/cri-containerd/options"
"github.com/kubernetes-incubator/cri-containerd/pkg/server" "github.com/kubernetes-incubator/cri-containerd/pkg/server"
@ -38,7 +39,8 @@ func main() {
} }
glog.V(2).Infof("Run cri-containerd grpc server on socket %q", o.SocketPath) glog.V(2).Infof("Run cri-containerd grpc server on socket %q", o.SocketPath)
service, err := server.NewCRIContainerdService( s, err := server.NewCRIContainerdService(
o.SocketPath,
o.ContainerdEndpoint, o.ContainerdEndpoint,
o.ContainerdSnapshotter, o.ContainerdSnapshotter,
o.RootDir, o.RootDir,
@ -51,10 +53,11 @@ func main() {
if err != nil { if err != nil {
glog.Exitf("Failed to create CRI containerd service %+v: %v", o, err) glog.Exitf("Failed to create CRI containerd service %+v: %v", o, err)
} }
service.Start() // Use interrupt handler to make sure the server to be stopped properly.
// Pass in non-empty final function to avoid os.Exit(1). We expect `Run`
s := server.NewCRIContainerdServer(o.SocketPath, service, service) // to return itself.
if err := s.Run(); err != nil { h := interrupt.New(func(os.Signal) {}, s.Stop)
if err := h.Run(func() error { return s.Run() }); err != nil {
glog.Exitf("Failed to run cri-containerd grpc server: %v", err) glog.Exitf("Failed to run cri-containerd grpc server: %v", err)
} }
} }

View File

@ -17,72 +17,69 @@ limitations under the License.
package server package server
import ( import (
"time" "fmt"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/typeurl" "github.com/containerd/containerd/typeurl"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/jpillora/backoff"
"golang.org/x/net/context" "golang.org/x/net/context"
containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
) )
const ( // eventMonitor monitors containerd event and updates internal state correspondingly.
// minRetryInterval is the minimum retry interval when lost connection with containerd.
minRetryInterval = 100 * time.Millisecond
// maxRetryInterval is the maximum retry interval when lost connection with containerd.
maxRetryInterval = 30 * time.Second
// exponentialFactor is the exponential backoff factor.
exponentialFactor = 2.0
)
// startEventMonitor starts an event monitor which monitors and handles all
// container events.
// TODO(random-liu): [P1] Is it possible to drop event during containerd is running? // TODO(random-liu): [P1] Is it possible to drop event during containerd is running?
func (c *criContainerdService) startEventMonitor() { type eventMonitor struct {
b := backoff.Backoff{ c *criContainerdService
Min: minRetryInterval, eventstream events.Events_SubscribeClient
Max: maxRetryInterval, cancel context.CancelFunc
Factor: exponentialFactor, closeCh chan struct{}
} }
// Create new event monitor. New event monitor will start subscribing containerd event. All events
// happen after it should be monitored.
func newEventMonitor(c *criContainerdService) (*eventMonitor, error) {
ctx, cancel := context.WithCancel(context.Background())
e, err := c.client.Events(ctx)
if err != nil {
return nil, fmt.Errorf("failed to subscribe containerd event: %v", err)
}
return &eventMonitor{
c: c,
eventstream: e,
cancel: cancel,
closeCh: make(chan struct{}),
}, nil
}
// start starts the event monitor which monitors and handles all container events. It returns
// a channel for the caller to wait for the event monitor to stop.
func (em *eventMonitor) start() <-chan struct{} {
go func() { go func() {
for { for {
eventstream, err := c.eventService.Subscribe(context.Background(), &events.SubscribeRequest{}) e, err := em.eventstream.Recv()
if err != nil { if err != nil {
glog.Errorf("Failed to connect to containerd event stream: %v", err)
time.Sleep(b.Duration())
continue
}
// Successfully connect with containerd, reset backoff.
b.Reset()
// TODO(random-liu): Relist to recover state, should prevent other operations
// until state is fully recovered.
for {
if err := c.handleEventStream(eventstream); err != nil {
glog.Errorf("Failed to handle event stream: %v", err) glog.Errorf("Failed to handle event stream: %v", err)
break close(em.closeCh)
} return
}
}
}()
}
// handleEventStream receives an event from containerd and handles the event.
func (c *criContainerdService) handleEventStream(eventstream events.Events_SubscribeClient) error {
e, err := eventstream.Recv()
if err != nil {
return err
} }
glog.V(4).Infof("Received container event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic) glog.V(4).Infof("Received container event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
c.handleEvent(e) em.handleEvent(e)
return nil }
}()
return em.closeCh
}
// stop stops the event monitor. It will close the event channel.
func (em *eventMonitor) stop() {
em.cancel()
} }
// handleEvent handles a containerd event. // handleEvent handles a containerd event.
func (c *criContainerdService) handleEvent(evt *events.Envelope) { func (em *eventMonitor) handleEvent(evt *events.Envelope) {
c := em.c
any, err := typeurl.UnmarshalAny(evt.Event) any, err := typeurl.UnmarshalAny(evt.Event)
if err != nil { if err != nil {
glog.Errorf("Failed to convert event envelope %+v: %v", evt, err) glog.Errorf("Failed to convert event envelope %+v: %v", evt, err)

View File

@ -1,74 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"fmt"
"net"
"os"
"syscall"
"github.com/golang/glog"
"google.golang.org/grpc"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/util/interrupt"
)
// unixProtocol is the network protocol of unix socket.
const unixProtocol = "unix"
// CRIContainerdServer is the grpc server of cri-containerd.
type CRIContainerdServer struct {
// addr is the address to serve on.
addr string
// runtimeService is the cri-containerd runtime service.
runtimeService runtime.RuntimeServiceServer
// imageService is the cri-containerd image service.
imageService runtime.ImageServiceServer
// server is the grpc server.
server *grpc.Server
}
// NewCRIContainerdServer creates the cri-containerd grpc server.
func NewCRIContainerdServer(addr string, r runtime.RuntimeServiceServer, i runtime.ImageServiceServer) *CRIContainerdServer {
return &CRIContainerdServer{
addr: addr,
runtimeService: r,
imageService: i,
}
}
// Run runs the cri-containerd grpc server.
func (s *CRIContainerdServer) Run() error {
glog.V(2).Infof("Start cri-containerd grpc server")
// Unlink to cleanup the previous socket file.
err := syscall.Unlink(s.addr)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to unlink socket file %q: %v", s.addr, err)
}
l, err := net.Listen(unixProtocol, s.addr)
if err != nil {
return fmt.Errorf("failed to listen on %q: %v", s.addr, err)
}
// Create the grpc server and register runtime and image services.
s.server = grpc.NewServer()
runtime.RegisterRuntimeServiceServer(s.server, s.runtimeService)
runtime.RegisterImageServiceServer(s.server, s.imageService)
// Use interrupt handler to make sure the server to be stopped properly.
h := interrupt.New(nil, s.server.Stop)
return h.Run(func() error { return s.server.Serve(l) })
}

View File

@ -18,14 +18,17 @@ package server
import ( import (
"fmt" "fmt"
"net"
"os"
"syscall"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/services/tasks/v1" "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
"github.com/containerd/containerd/images" "github.com/containerd/containerd/images"
"github.com/cri-o/ocicni" "github.com/cri-o/ocicni"
"github.com/golang/glog" "github.com/golang/glog"
"google.golang.org/grpc"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/server/streaming" "k8s.io/kubernetes/pkg/kubelet/server/streaming"
@ -36,18 +39,27 @@ import (
sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox" sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox"
) )
const (
// k8sContainerdNamespace is the namespace we use to connect containerd. // k8sContainerdNamespace is the namespace we use to connect containerd.
const k8sContainerdNamespace = "k8s.io" k8sContainerdNamespace = "k8s.io"
// unixProtocol is the network protocol of unix socket.
unixProtocol = "unix"
)
// CRIContainerdService is the interface implement CRI remote service server. // CRIContainerdService is the interface implement CRI remote service server.
type CRIContainerdService interface { type CRIContainerdService interface {
Start() Run() error
Stop()
runtime.RuntimeServiceServer runtime.RuntimeServiceServer
runtime.ImageServiceServer runtime.ImageServiceServer
} }
// criContainerdService implements CRIContainerdService. // criContainerdService implements CRIContainerdService.
type criContainerdService struct { type criContainerdService struct {
// serverAddress is the grpc server unix path.
serverAddress string
// server is the grpc server.
server *grpc.Server
// os is an interface for all required os operations. // os is an interface for all required os operations.
os osinterface.OS os osinterface.OS
// rootDir is the directory for managing cri-containerd files. // rootDir is the directory for managing cri-containerd files.
@ -76,8 +88,6 @@ type criContainerdService struct {
// imageStoreService is the containerd service to store and track // imageStoreService is the containerd service to store and track
// image metadata. // image metadata.
imageStoreService images.Store imageStoreService images.Store
// eventsService is the containerd task service client
eventService events.EventsClient
// netPlugin is used to setup and teardown network when run/stop pod sandbox. // netPlugin is used to setup and teardown network when run/stop pod sandbox.
netPlugin ocicni.CNIPlugin netPlugin ocicni.CNIPlugin
// client is an instance of the containerd client // client is an instance of the containerd client
@ -86,11 +96,14 @@ type criContainerdService struct {
streamServer streaming.Server streamServer streaming.Server
// cgroupPath in which the cri-containerd is placed in // cgroupPath in which the cri-containerd is placed in
cgroupPath string cgroupPath string
// eventMonitor is the monitor monitors containerd events.
eventMonitor *eventMonitor
} }
// NewCRIContainerdService returns a new instance of CRIContainerdService // NewCRIContainerdService returns a new instance of CRIContainerdService
// TODO(random-liu): Add cri-containerd server config to get rid of the long arg list. // TODO(random-liu): Add cri-containerd server config to get rid of the long arg list.
func NewCRIContainerdService( func NewCRIContainerdService(
serverAddress,
containerdEndpoint, containerdEndpoint,
containerdSnapshotter, containerdSnapshotter,
rootDir, rootDir,
@ -113,6 +126,7 @@ func NewCRIContainerdService(
} }
c := &criContainerdService{ c := &criContainerdService{
serverAddress: serverAddress,
os: osinterface.RealOS{}, os: osinterface.RealOS{},
rootDir: rootDir, rootDir: rootDir,
sandboxImage: defaultSandboxImage, sandboxImage: defaultSandboxImage,
@ -124,7 +138,6 @@ func NewCRIContainerdService(
containerNameIndex: registrar.NewRegistrar(), containerNameIndex: registrar.NewRegistrar(),
taskService: client.TaskService(), taskService: client.TaskService(),
imageStoreService: client.ImageService(), imageStoreService: client.ImageService(),
eventService: client.EventService(),
contentStoreService: client.ContentStore(), contentStoreService: client.ContentStore(),
client: client, client: client,
cgroupPath: cgroupPath, cgroupPath: cgroupPath,
@ -142,14 +155,78 @@ func NewCRIContainerdService(
return nil, fmt.Errorf("failed to create stream server: %v", err) return nil, fmt.Errorf("failed to create stream server: %v", err)
} }
c.eventMonitor, err = newEventMonitor(c)
if err != nil {
return nil, fmt.Errorf("failed to create event monitor: %v", err)
}
// Create the grpc server and register runtime and image services.
c.server = grpc.NewServer()
runtime.RegisterRuntimeServiceServer(c.server, newInstrumentedService(c))
runtime.RegisterImageServiceServer(c.server, newInstrumentedService(c))
return newInstrumentedService(c), nil return newInstrumentedService(c), nil
} }
func (c *criContainerdService) Start() { // Run starts the cri-containerd service.
c.startEventMonitor() func (c *criContainerdService) Run() error {
glog.V(2).Info("Start cri-containerd service")
// TODO(random-liu): Recover state.
// Start event handler.
glog.V(2).Info("Start event monitor")
eventMonitorCloseCh := c.eventMonitor.start()
// Start streaming server.
glog.V(2).Info("Start streaming server")
streamServerCloseCh := make(chan struct{})
go func() { go func() {
if err := c.streamServer.Start(true); err != nil { if err := c.streamServer.Start(true); err != nil {
glog.Errorf("Failed to start streaming server: %v", err) glog.Errorf("Failed to start streaming server: %v", err)
} }
close(streamServerCloseCh)
}() }()
// Start grpc server.
// Unlink to cleanup the previous socket file.
glog.V(2).Info("Start grpc server")
err := syscall.Unlink(c.serverAddress)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to unlink socket file %q: %v", c.serverAddress, err)
}
l, err := net.Listen(unixProtocol, c.serverAddress)
if err != nil {
return fmt.Errorf("failed to listen on %q: %v", c.serverAddress, err)
}
grpcServerCloseCh := make(chan struct{})
go func() {
if err := c.server.Serve(l); err != nil {
glog.Errorf("Failed to serve grpc grpc request: %v", err)
}
close(grpcServerCloseCh)
}()
// Stop the whole cri-containerd service if any of the critical service exits.
select {
case <-eventMonitorCloseCh:
case <-streamServerCloseCh:
case <-grpcServerCloseCh:
}
c.Stop()
<-eventMonitorCloseCh
glog.V(2).Info("Event monitor stopped")
<-streamServerCloseCh
glog.V(2).Info("Stream server stopped")
<-grpcServerCloseCh
glog.V(2).Info("GRPC server stopped")
return nil
}
// Stop stops the cri-containerd service.
func (c *criContainerdService) Stop() {
glog.V(2).Info("Stop cri-containerd service")
c.eventMonitor.stop()
c.streamServer.Stop() // nolint: errcheck
c.server.Stop()
} }

View File

@ -26,7 +26,6 @@ github.com/go-openapi/jsonreference 13c6e3589ad90f49bd3e3bbe2c2cb3d7a4142272
github.com/go-openapi/spec 6aced65f8501fe1217321abf0749d354824ba2ff github.com/go-openapi/spec 6aced65f8501fe1217321abf0749d354824ba2ff
github.com/go-openapi/swag 1d0bd113de87027671077d3c71eb3ac5d7dbba72 github.com/go-openapi/swag 1d0bd113de87027671077d3c71eb3ac5d7dbba72
github.com/godbus/dbus 97646858c46433e4afb3432ad28c12e968efa298 github.com/godbus/dbus 97646858c46433e4afb3432ad28c12e968efa298
github.com/jpillora/backoff 06c7a16c845dc8e0bf575fafeeca0f5462f5eb4d
github.com/juju/ratelimit 5b9ff866471762aa2ab2dced63c9fb6f53921342 github.com/juju/ratelimit 5b9ff866471762aa2ab2dced63c9fb6f53921342
github.com/mailru/easyjson d5b7844b561a7bc640052f1b935f7b800330d7e0 github.com/mailru/easyjson d5b7844b561a7bc640052f1b935f7b800330d7e0
github.com/Microsoft/go-winio v0.4.4 github.com/Microsoft/go-winio v0.4.4