Merge pull request #567 from Random-Liu/update-containerd

Update containerd and leverage plugin graceful stop.
This commit is contained in:
Lantao Liu 2018-01-23 16:24:25 -08:00 committed by GitHub
commit 4fcf8d91b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 188 additions and 61 deletions

View File

@ -123,7 +123,11 @@ func main() {
// Use interrupt handler to make sure the server is stopped properly.
// Pass in non-empty final function to avoid os.Exit(1). We expect `Run`
// to return itself.
h := interrupt.New(func(os.Signal) {}, s.Stop)
h := interrupt.New(func(os.Signal) {}, func() {
if err := s.Close(); err != nil {
logrus.WithError(err).Error("Failed to stop cri service")
}
})
if err := h.Run(func() error { return s.Run(true) }); err != nil {
return fmt.Errorf("failed to run cri-containerd with grpc server: %v", err)
}

1
cri.go
View File

@ -51,7 +51,6 @@ func init() {
func initCRIService(ic *plugin.InitContext) (interface{}, error) {
ctx := ic.Context
// TODO(random-liu): Handle graceful stop.
pluginConfig := ic.Config.(*options.PluginConfig)
c := options.CRIConfig{
PluginConfig: *pluginConfig,

View File

@ -1,6 +1,6 @@
RUNC_VERSION=7f24b40cc5423969b4554ef04ba0b00e2b4ba010
RUNC_VERSION=9f9c96235cc97674e935002fc3d78361b696a69e
CNI_VERSION=v0.6.0
CONTAINERD_VERSION=4812f4be8ffa2b9558915a93cce5901004d27cb8
CONTAINERD_VERSION=12eaf13f6f12e5a4db52afe66876cf62fe7835cf
CONTAINERD_REPO=
CRITOOL_VERSION=v1.0.0-alpha.0
KUBERNETES_VERSION=v1.9.0

View File

@ -18,6 +18,7 @@ package server
import (
"fmt"
"io"
"net"
"os"
"path/filepath"
@ -65,7 +66,8 @@ type grpcServices interface {
// CRIContainerdService is the interface implement CRI remote service server.
type CRIContainerdService interface {
Run(bool) error
Stop()
// io.Closer is used by containerd to gracefully stop cri service.
io.Closer
plugin.Service
grpcServices
}
@ -263,7 +265,9 @@ func (c *criContainerdService) Run(startGRPC bool) error {
case <-streamServerCloseCh:
case <-grpcServerCloseCh:
}
c.Stop()
if err := c.Close(); err != nil {
return fmt.Errorf("failed to stop cri service: %v", err)
}
<-eventMonitorCloseCh
logrus.Info("Event monitor stopped")
@ -278,11 +282,15 @@ func (c *criContainerdService) Run(startGRPC bool) error {
}
// Stop stops the cri-containerd service.
func (c *criContainerdService) Stop() {
func (c *criContainerdService) Close() error {
logrus.Info("Stop cri-containerd service")
// TODO(random-liu): Make event monitor stop synchronous.
c.eventMonitor.stop()
c.streamServer.Stop() // nolint: errcheck
if err := c.streamServer.Stop(); err != nil {
return fmt.Errorf("failed to stop stream server: %v", err)
}
c.server.Stop()
return nil
}
// getDeviceUUID gets device uuid for a given path.

View File

@ -1,7 +1,7 @@
github.com/blang/semver v3.1.0
github.com/BurntSushi/toml v0.2.0-21-g9906417
github.com/containerd/cgroups 29da22c6171a4316169f9205ab6c49f59b5b852f
github.com/containerd/containerd 4812f4be8ffa2b9558915a93cce5901004d27cb8
github.com/containerd/containerd 12eaf13f6f12e5a4db52afe66876cf62fe7835cf
github.com/containerd/continuity cf279e6ac893682272b4479d4c67fd3abf878b4e
github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
github.com/containerd/typeurl f6943554a7e7e88b3c14aad190bf05932da84788
@ -38,7 +38,7 @@ github.com/mailru/easyjson d5b7844b561a7bc640052f1b935f7b800330d7e0
github.com/Microsoft/go-winio v0.4.5
github.com/opencontainers/go-digest 21dfd564fd89c944783d00d069f33e3e7123c448
github.com/opencontainers/image-spec v1.0.1
github.com/opencontainers/runc 7f24b40cc5423969b4554ef04ba0b00e2b4ba010
github.com/opencontainers/runc 9f9c96235cc97674e935002fc3d78361b696a69e
github.com/opencontainers/runtime-spec v1.0.1
github.com/opencontainers/runtime-tools 6073aff4ac61897f75895123f7e24135204a404d
github.com/opencontainers/selinux 4a2974bf1ee960774ffd517717f1f45325af0206

View File

@ -6,8 +6,17 @@ import (
"io"
"os"
"sync"
"github.com/containerd/containerd/defaults"
)
var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
return &buffer
},
}
// Config holds the IO configurations.
type Config struct {
// Terminal is true if one has been allocated
@ -68,6 +77,7 @@ type Streams struct {
Stdout io.Writer
Stderr io.Writer
Terminal bool
FIFODir string
}
// Opt customize options for creating a Creator or Attach
@ -92,16 +102,25 @@ func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt {
}
}
// WithFIFODir sets the fifo directory.
// e.g. "/run/containerd/fifo", "/run/users/1001/containerd/fifo"
func WithFIFODir(dir string) Opt {
return func(opt *Streams) {
opt.FIFODir = dir
}
}
// NewCreator returns an IO creator from the options
func NewCreator(opts ...Opt) Creator {
streams := &Streams{}
for _, opt := range opts {
opt(streams)
}
if streams.FIFODir == "" {
streams.FIFODir = defaults.DefaultFIFODir
}
return func(id string) (IO, error) {
// TODO: accept root as a param
root := "/run/containerd/fifo"
fifos, err := NewFIFOSetInDir(root, id, streams.Terminal)
fifos, err := NewFIFOSetInDir(streams.FIFODir, id, streams.Terminal)
if err != nil {
return nil, err
}

View File

@ -47,7 +47,10 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
if fifos.Stdin != "" {
go func() {
io.Copy(pipes.Stdin, ioset.Stdin)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(pipes.Stdin, ioset.Stdin, *p)
pipes.Stdin.Close()
}()
}
@ -55,7 +58,10 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
var wg = &sync.WaitGroup{}
wg.Add(1)
go func() {
io.Copy(ioset.Stdout, pipes.Stdout)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(ioset.Stdout, pipes.Stdout, *p)
pipes.Stdout.Close()
wg.Done()
}()
@ -63,7 +69,10 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
if !fifos.Terminal {
wg.Add(1)
go func() {
io.Copy(ioset.Stderr, pipes.Stderr)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(ioset.Stderr, pipes.Stderr, *p)
pipes.Stderr.Close()
wg.Done()
}()

View File

@ -47,7 +47,11 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.Stdin)
return
}
io.Copy(c, ioset.Stdin)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(c, ioset.Stdin, *p)
c.Close()
l.Close()
}()
@ -73,7 +77,11 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Stdout)
return
}
io.Copy(ioset.Stdout, c)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(ioset.Stdout, c, *p)
c.Close()
l.Close()
}()
@ -99,7 +107,11 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Stderr)
return
}
io.Copy(ioset.Stderr, c)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(ioset.Stderr, c, *p)
c.Close()
l.Close()
}()

View File

@ -0,0 +1,19 @@
// +build !windows
package defaults
const (
// DefaultRootDir is the default location used by containerd to store
// persistent data
DefaultRootDir = "/var/lib/containerd"
// DefaultStateDir is the default location used by containerd to store
// transient data
DefaultStateDir = "/run/containerd"
// DefaultAddress is the default unix socket address
DefaultAddress = "/run/containerd/containerd.sock"
// DefaultDebugAddress is the default unix socket address for pprof data
DefaultDebugAddress = "/run/containerd/debug.sock"
// DefaultFIFODir is the default location used by client-side cio library
// to store FIFOs.
DefaultFIFODir = "/run/containerd/fifo"
)

View File

@ -0,0 +1,27 @@
// +build windows
package defaults
import (
"os"
"path/filepath"
)
var (
// DefaultRootDir is the default location used by containerd to store
// persistent data
DefaultRootDir = filepath.Join(os.Getenv("programfiles"), "containerd", "root")
// DefaultStateDir is the default location used by containerd to store
// transient data
DefaultStateDir = filepath.Join(os.Getenv("programfiles"), "containerd", "state")
)
const (
// DefaultAddress is the default winpipe address
DefaultAddress = `\\.\pipe\containerd-containerd`
// DefaultDebugAddress is the default winpipe address for pprof data
DefaultDebugAddress = `\\.\pipe\containerd-debug`
// DefaultFIFODir is the default location used by client-side cio library
// to store FIFOs. Unused on Windows.
DefaultFIFODir = ""
)

View File

@ -0,0 +1,3 @@
// Package defaults provides several common defaults for interacting wtih
// containerd. These can be used on the client-side or server-side.
package defaults

View File

@ -2,6 +2,7 @@ package oci
import (
"context"
"strings"
"github.com/containerd/containerd/containers"
specs "github.com/opencontainers/runtime-spec/specs-go"
@ -33,3 +34,59 @@ func WithHostname(name string) SpecOpts {
return nil
}
}
// WithEnv appends environment variables
func WithEnv(environmnetVariables []string) SpecOpts {
return func(_ context.Context, _ Client, _ *containers.Container, s *specs.Spec) error {
if len(environmnetVariables) > 0 {
s.Process.Env = replaceOrAppendEnvValues(s.Process.Env, environmnetVariables)
}
return nil
}
}
// WithMounts appends mounts
func WithMounts(mounts []specs.Mount) SpecOpts {
return func(_ context.Context, _ Client, _ *containers.Container, s *specs.Spec) error {
s.Mounts = append(s.Mounts, mounts...)
return nil
}
}
// replaceOrAppendEnvValues returns the defaults with the overrides either
// replaced by env key or appended to the list
func replaceOrAppendEnvValues(defaults, overrides []string) []string {
cache := make(map[string]int, len(defaults))
for i, e := range defaults {
parts := strings.SplitN(e, "=", 2)
cache[parts[0]] = i
}
for _, value := range overrides {
// Values w/o = means they want this env to be removed/unset.
if !strings.Contains(value, "=") {
if i, exists := cache[value]; exists {
defaults[i] = "" // Used to indicate it should be removed
}
continue
}
// Just do a normal set/update
parts := strings.SplitN(value, "=", 2)
if i, exists := cache[parts[0]]; exists {
defaults[i] = value
} else {
defaults = append(defaults, value)
}
}
// Now remove all entries that we want to "unset"
for i := 0; i < len(defaults); i++ {
if defaults[i] == "" {
defaults = append(defaults[:i], defaults[i+1:]...)
i--
}
}
return defaults
}

View File

@ -1,41 +0,0 @@
// +build linux
// Package sys provides access to the Get Child and Set Child prctl flags.
// See http://man7.org/linux/man-pages/man2/prctl.2.html
package sys
import (
"unsafe"
"golang.org/x/sys/unix"
)
// GetSubreaper returns the subreaper setting for the calling process
func GetSubreaper() (int, error) {
var i uintptr
// PR_GET_CHILD_SUBREAPER allows retrieving the current child
// subreaper.
// Returns the "child subreaper" setting of the caller, in the
// location pointed to by (int *) arg2.
if err := unix.Prctl(unix.PR_GET_CHILD_SUBREAPER, uintptr(unsafe.Pointer(&i)), 0, 0, 0); err != nil {
return -1, err
}
return int(i), nil
}
// SetSubreaper sets the value i as the subreaper setting for the calling process
func SetSubreaper(i int) error {
// PR_SET_CHILD_SUBREAPER allows setting the child subreaper.
// If arg2 is nonzero, set the "child subreaper" attribute of the
// calling process; if arg2 is zero, unset the attribute. When a
// process is marked as a child subreaper, all of the children
// that it creates, and their descendants, will be marked as
// having a subreaper. In effect, a subreaper fulfills the role
// of init(1) for its descendant processes. Upon termination of
// a process that is orphaned (i.e., its immediate parent has
// already terminated) and marked as having a subreaper, the
// nearest still living ancestor subreaper will receive a SIGCHLD
// signal and be able to wait(2) on the process to discover its
// termination status.
return unix.Prctl(unix.PR_SET_CHILD_SUBREAPER, uintptr(i), 0, 0, 0)
}

View File

@ -16,7 +16,7 @@ github.com/docker/go-units v0.3.1
github.com/gogo/protobuf v0.5
github.com/golang/protobuf 1643683e1b54a9e88ad26d98f81400c8c9d9f4f9
github.com/opencontainers/runtime-spec v1.0.1
github.com/opencontainers/runc 7f24b40cc5423969b4554ef04ba0b00e2b4ba010
github.com/opencontainers/runc 9f9c96235cc97674e935002fc3d78361b696a69e
github.com/sirupsen/logrus v1.0.0
github.com/containerd/btrfs cc52c4dea2ce11a44e6639e561bb5c2af9ada9e3
github.com/stretchr/testify v1.1.4

View File

@ -134,3 +134,14 @@ func RunningInUserNS() bool {
func SetSubreaper(i int) error {
return unix.Prctl(PR_SET_CHILD_SUBREAPER, uintptr(i), 0, 0, 0)
}
// GetSubreaper returns the subreaper setting for the calling process
func GetSubreaper() (int, error) {
var i uintptr
if err := unix.Prctl(unix.PR_GET_CHILD_SUBREAPER, uintptr(unsafe.Pointer(&i)), 0, 0, 0); err != nil {
return -1, err
}
return int(i), nil
}