Update containerd to 59a625defb

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu
2019-09-03 00:31:40 -07:00
parent 3a6510756e
commit 59b6ed641f
372 changed files with 17135 additions and 2887 deletions

View File

@@ -19,9 +19,7 @@ package archive
import (
"archive/tar"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"runtime"
@@ -91,11 +89,6 @@ const (
// archives.
whiteoutMetaPrefix = whiteoutPrefix + whiteoutPrefix
// whiteoutLinkDir is a directory AUFS uses for storing hardlink links to other
// layers. Normally these should not go into exported archives and all changed
// hardlinks should be copied to the top layer.
whiteoutLinkDir = whiteoutMetaPrefix + "plnk"
// whiteoutOpaqueDir file means directory has been made opaque - meaning
// readdir calls to this directory do not follow to lower layers.
whiteoutOpaqueDir = whiteoutMetaPrefix + ".opq"
@@ -117,11 +110,15 @@ func Apply(ctx context.Context, root string, r io.Reader, opts ...ApplyOpt) (int
if options.Filter == nil {
options.Filter = all
}
if options.applyFunc == nil {
options.applyFunc = applyNaive
}
return apply(ctx, root, tar.NewReader(r), options)
return options.applyFunc(ctx, root, tar.NewReader(r), options)
}
// applyNaive applies a tar stream of an OCI style diff tar.
// applyNaive applies a tar stream of an OCI style diff tar to a directory
// applying each file as either a whole file or whiteout.
// See https://github.com/opencontainers/image-spec/blob/master/layer.md#applying-changesets
func applyNaive(ctx context.Context, root string, tr *tar.Reader, options ApplyOptions) (size int64, err error) {
var (
@@ -131,11 +128,49 @@ func applyNaive(ctx context.Context, root string, tr *tar.Reader, options ApplyO
// may occur out of order
unpackedPaths = make(map[string]struct{})
// Used for aufs plink directory
aufsTempdir = ""
aufsHardlinks = make(map[string]*tar.Header)
convertWhiteout = options.ConvertWhiteout
)
if convertWhiteout == nil {
// handle whiteouts by removing the target files
convertWhiteout = func(hdr *tar.Header, path string) (bool, error) {
base := filepath.Base(path)
dir := filepath.Dir(path)
if base == whiteoutOpaqueDir {
_, err := os.Lstat(dir)
if err != nil {
return false, err
}
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
if os.IsNotExist(err) {
err = nil // parent was deleted
}
return err
}
if path == dir {
return nil
}
if _, exists := unpackedPaths[path]; !exists {
err := os.RemoveAll(path)
return err
}
return nil
})
return false, err
}
if strings.HasPrefix(base, whiteoutPrefix) {
originalBase := base[len(whiteoutPrefix):]
originalPath := filepath.Join(dir, originalBase)
return false, os.RemoveAll(originalPath)
}
return true, nil
}
}
// Iterate through the files in the archive.
for {
select {
@@ -193,85 +228,21 @@ func applyNaive(ctx context.Context, root string, tr *tar.Reader, options ApplyO
if base == "" {
parentPath = filepath.Dir(path)
}
if _, err := os.Lstat(parentPath); err != nil && os.IsNotExist(err) {
err = mkdirAll(parentPath, 0755)
if err != nil {
return 0, err
}
}
}
// Skip AUFS metadata dirs
if strings.HasPrefix(hdr.Name, whiteoutMetaPrefix) {
// Regular files inside /.wh..wh.plnk can be used as hardlink targets
// We don't want this directory, but we need the files in them so that
// such hardlinks can be resolved.
if strings.HasPrefix(hdr.Name, whiteoutLinkDir) && hdr.Typeflag == tar.TypeReg {
basename := filepath.Base(hdr.Name)
aufsHardlinks[basename] = hdr
if aufsTempdir == "" {
if aufsTempdir, err = ioutil.TempDir(os.Getenv("XDG_RUNTIME_DIR"), "dockerplnk"); err != nil {
return 0, err
}
defer os.RemoveAll(aufsTempdir)
}
p, err := fs.RootPath(aufsTempdir, basename)
if err != nil {
return 0, err
}
if err := createTarFile(ctx, p, root, hdr, tr); err != nil {
return 0, err
}
}
if hdr.Name != whiteoutOpaqueDir {
continue
}
}
if strings.HasPrefix(base, whiteoutPrefix) {
dir := filepath.Dir(path)
if base == whiteoutOpaqueDir {
_, err := os.Lstat(dir)
if err != nil {
return 0, err
}
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
if os.IsNotExist(err) {
err = nil // parent was deleted
}
return err
}
if path == dir {
return nil
}
if _, exists := unpackedPaths[path]; !exists {
err := os.RemoveAll(path)
return err
}
return nil
})
if err != nil {
return 0, err
}
continue
}
originalBase := base[len(whiteoutPrefix):]
originalPath := filepath.Join(dir, originalBase)
// Ensure originalPath is under dir
if dir[len(dir)-1] != filepath.Separator {
dir += string(filepath.Separator)
}
if !strings.HasPrefix(originalPath, dir) {
return 0, errors.Wrapf(errInvalidArchive, "invalid whiteout name: %v", base)
}
if err := os.RemoveAll(originalPath); err != nil {
if err := mkparent(ctx, parentPath, root, options.Parents); err != nil {
return 0, err
}
}
// Naive whiteout convert function which handles whiteout files by
// removing the target files.
if err := validateWhiteout(path); err != nil {
return 0, err
}
writeFile, err := convertWhiteout(hdr, path)
if err != nil {
return 0, errors.Wrapf(err, "failed to convert whiteout file %q", hdr.Name)
}
if !writeFile {
continue
}
// If path exits we almost always just want to remove and replace it.
@@ -289,26 +260,6 @@ func applyNaive(ctx context.Context, root string, tr *tar.Reader, options ApplyO
srcData := io.Reader(tr)
srcHdr := hdr
// Hard links into /.wh..wh.plnk don't work, as we don't extract that directory, so
// we manually retarget these into the temporary files we extracted them into
if hdr.Typeflag == tar.TypeLink && strings.HasPrefix(filepath.Clean(hdr.Linkname), whiteoutLinkDir) {
linkBasename := filepath.Base(hdr.Linkname)
srcHdr = aufsHardlinks[linkBasename]
if srcHdr == nil {
return 0, fmt.Errorf("invalid aufs hardlink")
}
p, err := fs.RootPath(aufsTempdir, linkBasename)
if err != nil {
return 0, err
}
tmpFile, err := os.Open(p)
if err != nil {
return 0, err
}
defer tmpFile.Close()
srcData = tmpFile
}
if err := createTarFile(ctx, path, root, srcHdr, srcData); err != nil {
return 0, err
}
@@ -428,6 +379,66 @@ func createTarFile(ctx context.Context, path, extractDir string, hdr *tar.Header
return chtimes(path, boundTime(latestTime(hdr.AccessTime, hdr.ModTime)), boundTime(hdr.ModTime))
}
func mkparent(ctx context.Context, path, root string, parents []string) error {
if dir, err := os.Lstat(path); err == nil {
if dir.IsDir() {
return nil
}
return &os.PathError{
Op: "mkparent",
Path: path,
Err: syscall.ENOTDIR,
}
} else if !os.IsNotExist(err) {
return err
}
i := len(path)
for i > len(root) && !os.IsPathSeparator(path[i-1]) {
i--
}
if i > len(root)+1 {
if err := mkparent(ctx, path[:i-1], root, parents); err != nil {
return err
}
}
if err := mkdir(path, 0755); err != nil {
// Check that still doesn't exist
dir, err1 := os.Lstat(path)
if err1 == nil && dir.IsDir() {
return nil
}
return err
}
for _, p := range parents {
ppath, err := fs.RootPath(p, path[len(root):])
if err != nil {
return err
}
dir, err := os.Lstat(ppath)
if err == nil {
if !dir.IsDir() {
// Replaced, do not copy attributes
break
}
if err := copyDirInfo(dir, path); err != nil {
return err
}
return copyUpXAttrs(path, ppath)
} else if !os.IsNotExist(err) {
return err
}
}
log.G(ctx).Debugf("parent directory %q not found: default permissions(0755) used", path)
return nil
}
type changeWriter struct {
tw *tar.Writer
source string
@@ -493,6 +504,12 @@ func (cw *changeWriter) HandleChange(k fs.ChangeKind, p string, f os.FileInfo, e
hdr.Mode = int64(chmodTarEntry(os.FileMode(hdr.Mode)))
// truncate timestamp for compatibility. without PAX stdlib rounds timestamps instead
hdr.Format = tar.FormatPAX
hdr.ModTime = hdr.ModTime.Truncate(time.Second)
hdr.AccessTime = time.Time{}
hdr.ChangeTime = time.Time{}
name := p
if strings.HasPrefix(name, string(filepath.Separator)) {
name, err = filepath.Rel(string(filepath.Separator), name)
@@ -598,6 +615,9 @@ func (cw *changeWriter) Close() error {
}
func (cw *changeWriter) includeParents(hdr *tar.Header) error {
if cw.addedDirs == nil {
return nil
}
name := strings.TrimRight(hdr.Name, "/")
fname := filepath.Join(cw.source, name)
parent := filepath.Dir(name)
@@ -684,3 +704,26 @@ func hardlinkRootPath(root, linkname string) (string, error) {
}
return targetPath, nil
}
func validateWhiteout(path string) error {
base := filepath.Base(path)
dir := filepath.Dir(path)
if base == whiteoutOpaqueDir {
return nil
}
if strings.HasPrefix(base, whiteoutPrefix) {
originalBase := base[len(whiteoutPrefix):]
originalPath := filepath.Join(dir, originalBase)
// Ensure originalPath is under dir
if dir[len(dir)-1] != filepath.Separator {
dir += string(filepath.Separator)
}
if !strings.HasPrefix(originalPath, dir) {
return errors.Wrapf(errInvalidArchive, "invalid whiteout name: %v", base)
}
}
return nil
}

View File

@@ -16,7 +16,19 @@
package archive
import "archive/tar"
import (
"archive/tar"
"context"
)
// ApplyOptions provides additional options for an Apply operation
type ApplyOptions struct {
Filter Filter // Filter tar headers
ConvertWhiteout ConvertWhiteout // Convert whiteout files
Parents []string // Parent directories to handle inherited attributes without CoW
applyFunc func(context.Context, string, *tar.Reader, ApplyOptions) (int64, error)
}
// ApplyOpt allows setting mutable archive apply properties on creation
type ApplyOpt func(options *ApplyOptions) error
@@ -24,6 +36,9 @@ type ApplyOpt func(options *ApplyOptions) error
// Filter specific files from the archive
type Filter func(*tar.Header) (bool, error)
// ConvertWhiteout converts whiteout files from the archive
type ConvertWhiteout func(*tar.Header, string) (bool, error)
// all allows all files
func all(_ *tar.Header) (bool, error) {
return true, nil
@@ -36,3 +51,24 @@ func WithFilter(f Filter) ApplyOpt {
return nil
}
}
// WithConvertWhiteout uses the convert function to convert the whiteout files.
func WithConvertWhiteout(c ConvertWhiteout) ApplyOpt {
return func(options *ApplyOptions) error {
options.ConvertWhiteout = c
return nil
}
}
// WithParents provides parent directories for resolving inherited attributes
// directory from the filesystem.
// Inherited attributes are searched from first to last, making the first
// element in the list the most immediate parent directory.
// NOTE: When applying to a filesystem which supports CoW, file attributes
// should be inherited by the filesystem.
func WithParents(p []string) ApplyOpt {
return func(options *ApplyOptions) error {
options.Parents = p
return nil
}
}

View File

@@ -0,0 +1,59 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package archive
import (
"archive/tar"
"os"
"path/filepath"
"strings"
"golang.org/x/sys/unix"
)
// AufsConvertWhiteout converts whiteout files for aufs.
func AufsConvertWhiteout(_ *tar.Header, _ string) (bool, error) {
return true, nil
}
// OverlayConvertWhiteout converts whiteout files for overlay.
func OverlayConvertWhiteout(hdr *tar.Header, path string) (bool, error) {
base := filepath.Base(path)
dir := filepath.Dir(path)
// if a directory is marked as opaque, we need to translate that to overlay
if base == whiteoutOpaqueDir {
// don't write the file itself
return false, unix.Setxattr(dir, "trusted.overlay.opaque", []byte{'y'}, 0)
}
// if a file was deleted and we are using overlay, we need to create a character device
if strings.HasPrefix(base, whiteoutPrefix) {
originalBase := base[len(whiteoutPrefix):]
originalPath := filepath.Join(dir, originalBase)
if err := unix.Mknod(originalPath, unix.S_IFCHR, 0); err != nil {
return false, err
}
// don't write the file itself
return false, os.Chown(originalPath, hdr.Uid, hdr.Gid)
}
return true, nil
}

View File

@@ -18,28 +18,12 @@
package archive
// ApplyOptions provides additional options for an Apply operation
type ApplyOptions struct {
ParentLayerPaths []string // Parent layer paths used for Windows layer apply
IsWindowsContainerLayer bool // True if the tar stream to be applied is a Windows Container Layer
Filter Filter // Filter tar headers
}
// WithParentLayers adds parent layers to the apply process this is required
// for all Windows layers except the base layer.
func WithParentLayers(parentPaths []string) ApplyOpt {
return func(options *ApplyOptions) error {
options.ParentLayerPaths = parentPaths
return nil
}
}
// AsWindowsContainerLayer indicates that the tar stream to apply is that of
// a Windows Container Layer. The caller must be holding SeBackupPrivilege and
// SeRestorePrivilege.
func AsWindowsContainerLayer() ApplyOpt {
return func(options *ApplyOptions) error {
options.IsWindowsContainerLayer = true
options.applyFunc = applyWindowsLayer
return nil
}
}

View File

@@ -20,11 +20,12 @@ package archive
import (
"archive/tar"
"context"
"os"
"strings"
"sync"
"syscall"
"github.com/containerd/continuity/fs"
"github.com/containerd/continuity/sysx"
"github.com/opencontainers/runc/libcontainer/system"
"github.com/pkg/errors"
@@ -74,10 +75,6 @@ func openFile(name string, flag int, perm os.FileMode) (*os.File, error) {
return f, err
}
func mkdirAll(path string, perm os.FileMode) error {
return os.MkdirAll(path, perm)
}
func mkdir(path string, perm os.FileMode) error {
if err := os.Mkdir(path, perm); err != nil {
return err
@@ -149,11 +146,71 @@ func getxattr(path, attr string) ([]byte, error) {
}
func setxattr(path, key, value string) error {
return sysx.LSetxattr(path, key, []byte(value), 0)
// Do not set trusted attributes
if strings.HasPrefix(key, "trusted.") {
return errors.Wrap(unix.ENOTSUP, "admin attributes from archive not supported")
}
return unix.Lsetxattr(path, key, []byte(value), 0)
}
// apply applies a tar stream of an OCI style diff tar.
// See https://github.com/opencontainers/image-spec/blob/master/layer.md#applying-changesets
func apply(ctx context.Context, root string, tr *tar.Reader, options ApplyOptions) (size int64, err error) {
return applyNaive(ctx, root, tr, options)
func copyDirInfo(fi os.FileInfo, path string) error {
st := fi.Sys().(*syscall.Stat_t)
if err := os.Lchown(path, int(st.Uid), int(st.Gid)); err != nil {
if os.IsPermission(err) {
// Normally if uid/gid are the same this would be a no-op, but some
// filesystems may still return EPERM... for instance NFS does this.
// In such a case, this is not an error.
if dstStat, err2 := os.Lstat(path); err2 == nil {
st2 := dstStat.Sys().(*syscall.Stat_t)
if st.Uid == st2.Uid && st.Gid == st2.Gid {
err = nil
}
}
}
if err != nil {
return errors.Wrapf(err, "failed to chown %s", path)
}
}
if err := os.Chmod(path, fi.Mode()); err != nil {
return errors.Wrapf(err, "failed to chmod %s", path)
}
timespec := []unix.Timespec{unix.Timespec(fs.StatAtime(st)), unix.Timespec(fs.StatMtime(st))}
if err := unix.UtimesNanoAt(unix.AT_FDCWD, path, timespec, unix.AT_SYMLINK_NOFOLLOW); err != nil {
return errors.Wrapf(err, "failed to utime %s", path)
}
return nil
}
func copyUpXAttrs(dst, src string) error {
xattrKeys, err := sysx.LListxattr(src)
if err != nil {
if err == unix.ENOTSUP || err == sysx.ENODATA {
return nil
}
return errors.Wrapf(err, "failed to list xattrs on %s", src)
}
for _, xattr := range xattrKeys {
// Do not copy up trusted attributes
if strings.HasPrefix(xattr, "trusted.") {
continue
}
data, err := sysx.LGetxattr(src, xattr)
if err != nil {
if err == unix.ENOTSUP || err == sysx.ENODATA {
continue
}
return errors.Wrapf(err, "failed to get xattr %q on %s", xattr, src)
}
if err := unix.Lsetxattr(dst, xattr, data, unix.XATTR_CREATE); err != nil {
if err == unix.ENOTSUP || err == unix.ENODATA || err == unix.EEXIST {
continue
}
return errors.Wrapf(err, "failed to set xattr %q on %s", xattr, dst)
}
}
return nil
}

View File

@@ -23,7 +23,6 @@ import (
"bufio"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"os"
@@ -36,6 +35,7 @@ import (
"github.com/Microsoft/go-winio"
"github.com/Microsoft/hcsshim"
"github.com/containerd/containerd/sys"
"github.com/pkg/errors"
)
const (
@@ -107,10 +107,6 @@ func openFile(name string, flag int, perm os.FileMode) (*os.File, error) {
return sys.OpenFileSequential(name, flag, perm)
}
func mkdirAll(path string, perm os.FileMode) error {
return sys.MkdirAll(path, perm)
}
func mkdir(path string, perm os.FileMode) error {
return os.Mkdir(path, perm)
}
@@ -153,16 +149,8 @@ func setxattr(path, key, value string) error {
return errors.New("xattrs not supported on Windows")
}
// apply applies a tar stream of an OCI style diff tar of a Windows layer.
// See https://github.com/opencontainers/image-spec/blob/master/layer.md#applying-changesets
func apply(ctx context.Context, root string, tr *tar.Reader, options ApplyOptions) (size int64, err error) {
if options.IsWindowsContainerLayer {
return applyWindowsLayer(ctx, root, tr, options)
}
return applyNaive(ctx, root, tr, options)
}
// applyWindowsLayer applies a tar stream of an OCI style diff tar of a Windows layer.
// applyWindowsLayer applies a tar stream of an OCI style diff tar of a Windows
// layer using the hcsshim layer writer and backup streams.
// See https://github.com/opencontainers/image-spec/blob/master/layer.md#applying-changesets
func applyWindowsLayer(ctx context.Context, root string, tr *tar.Reader, options ApplyOptions) (size int64, err error) {
home, id := filepath.Split(root)
@@ -170,7 +158,7 @@ func applyWindowsLayer(ctx context.Context, root string, tr *tar.Reader, options
HomeDir: home,
}
w, err := hcsshim.NewLayerWriter(info, id, options.ParentLayerPaths)
w, err := hcsshim.NewLayerWriter(info, id, options.Parents)
if err != nil {
return 0, err
}
@@ -443,3 +431,14 @@ func writeBackupStreamFromTarFile(w io.Writer, t *tar.Reader, hdr *tar.Header) (
}
}
}
func copyDirInfo(fi os.FileInfo, path string) error {
if err := os.Chmod(path, fi.Mode()); err != nil {
return errors.Wrapf(err, "failed to chmod %s", path)
}
return nil
}
func copyUpXAttrs(dst, src string) error {
return nil
}

View File

@@ -72,17 +72,19 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
}
var wg = &sync.WaitGroup{}
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if fifos.Stdout != "" {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(ioset.Stdout, pipes.Stdout, *p)
pipes.Stdout.Close()
wg.Done()
}()
io.CopyBuffer(ioset.Stdout, pipes.Stdout, *p)
pipes.Stdout.Close()
wg.Done()
}()
}
if !fifos.Terminal {
if !fifos.Terminal && fifos.Stderr != "" {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)

View File

@@ -99,6 +99,12 @@ func New(address string, opts ...ClientOpt) (*Client, error) {
c.runtime = defaults.DefaultRuntime
}
if copts.defaultPlatform != nil {
c.platform = copts.defaultPlatform
} else {
c.platform = platforms.Default()
}
if copts.services != nil {
c.services = *copts.services
}
@@ -193,6 +199,7 @@ type Client struct {
conn *grpc.ClientConn
runtime string
defaultns string
platform platforms.MatchComparer
connector func() (*grpc.ClientConn, error)
}
@@ -294,10 +301,14 @@ type RemoteContext struct {
PlatformMatcher platforms.MatchComparer
// Unpack is done after an image is pulled to extract into a snapshotter.
// It is done simultaneously for schema 2 images when they are pulled.
// If an image is not unpacked on pull, it can be unpacked any time
// afterwards. Unpacking is required to run an image.
Unpack bool
// UnpackOpts handles options to the unpack call.
UnpackOpts []UnpackOpt
// Snapshotter used for unpacking
Snapshotter string
@@ -329,9 +340,8 @@ type RemoteContext struct {
// MaxConcurrentDownloads is the max concurrent content downloads for each pull.
MaxConcurrentDownloads int
// AppendDistributionSourceLabel allows fetcher to add distribute source
// label for each blob content, which doesn't work for legacy schema1.
AppendDistributionSourceLabel bool
// AllMetadata downloads all manifests and known-configuration files
AllMetadata bool
}
func defaultRemoteContext() *RemoteContext {

View File

@@ -26,11 +26,12 @@ import (
)
type clientOpts struct {
defaultns string
defaultRuntime string
services *services
dialOptions []grpc.DialOption
timeout time.Duration
defaultns string
defaultRuntime string
defaultPlatform platforms.MatchComparer
services *services
dialOptions []grpc.DialOption
timeout time.Duration
}
// ClientOpt allows callers to set options on the containerd client
@@ -55,6 +56,14 @@ func WithDefaultRuntime(rt string) ClientOpt {
}
}
// WithDefaultPlatform sets the default platform matcher on the client
func WithDefaultPlatform(platform platforms.MatchComparer) ClientOpt {
return func(c *clientOpts) error {
c.defaultPlatform = platform
return nil
}
}
// WithDialOpts allows grpc.DialOptions to be set on the connection
func WithDialOpts(opts []grpc.DialOption) ClientOpt {
return func(c *clientOpts) error {
@@ -195,11 +204,10 @@ func WithMaxConcurrentDownloads(max int) RemoteOpt {
}
}
// WithAppendDistributionSourceLabel allows fetcher to add distribute source
// label for each blob content, which doesn't work for legacy schema1.
func WithAppendDistributionSourceLabel() RemoteOpt {
// WithAllMetadata downloads all manifests and known-configuration files
func WithAllMetadata() RemoteOpt {
return func(_ *Client, c *RemoteContext) error {
c.AppendDistributionSourceLabel = true
c.AllMetadata = true
return nil
}
}

View File

@@ -22,6 +22,7 @@ import (
"os"
"github.com/BurntSushi/toml"
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/services/server"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/urfave/cli"
@@ -39,6 +40,50 @@ func (c *Config) WriteTo(w io.Writer) (int64, error) {
return 0, toml.NewEncoder(w).Encode(c)
}
func outputConfig(cfg *srvconfig.Config) error {
config := &Config{
Config: cfg,
}
plugins, err := server.LoadPlugins(gocontext.Background(), config.Config)
if err != nil {
return err
}
if len(plugins) != 0 {
config.Plugins = make(map[string]interface{})
for _, p := range plugins {
if p.Config == nil {
continue
}
pc, err := config.Decode(p)
if err != nil {
return err
}
config.Plugins[p.URI()] = pc
}
}
timeouts := timeout.All()
config.Timeouts = make(map[string]string)
for k, v := range timeouts {
config.Timeouts[k] = v.String()
}
// for the time being, keep the defaultConfig's version set at 1 so that
// when a config without a version is loaded from disk and has no version
// set, we assume it's a v1 config. But when generating new configs via
// this command, generate the v2 config
config.Config.Version = 2
// remove overridden Plugins type to avoid duplication in output
config.Config.Plugins = nil
_, err = config.WriteTo(os.Stdout)
return err
}
var configCommand = cli.Command{
Name: "config",
Usage: "information on the containerd config",
@@ -47,29 +92,19 @@ var configCommand = cli.Command{
Name: "default",
Usage: "see the output of the default config",
Action: func(context *cli.Context) error {
config := &Config{
Config: defaultConfig(),
}
// for the time being, keep the defaultConfig's version set at 1 so that
// when a config without a version is loaded from disk and has no version
// set, we assume it's a v1 config. But when generating new configs via
// this command, generate the v2 config
config.Config.Version = 2
plugins, err := server.LoadPlugins(gocontext.Background(), config.Config)
if err != nil {
return outputConfig(defaultConfig())
},
},
{
Name: "dump",
Usage: "see the output of the final main config with imported in subconfig files",
Action: func(context *cli.Context) error {
config := defaultConfig()
if err := srvconfig.LoadConfig(context.GlobalString("config"), config); err != nil && !os.IsNotExist(err) {
return err
}
if len(plugins) != 0 {
config.Plugins = make(map[string]interface{})
for _, p := range plugins {
if p.Config == nil {
continue
}
config.Plugins[p.URI()] = p.Config
}
}
_, err = config.WriteTo(os.Stdout)
return err
return outputConfig(config)
},
},
},

View File

@@ -148,13 +148,16 @@ func App() *cli.App {
for _, w := range warnings {
log.G(ctx).WithError(w).Warn("cleanup temp mount")
}
var (
address = config.GRPC.Address
ttrpcAddress = fmt.Sprintf("%s.ttrpc", config.GRPC.Address)
)
if address == "" {
if config.GRPC.Address == "" {
return errors.Wrap(errdefs.ErrInvalidArgument, "grpc address cannot be empty")
}
if config.TTRPC.Address == "" {
// If TTRPC was not explicitly configured, use defaults based on GRPC.
config.TTRPC.Address = fmt.Sprintf("%s.ttrpc", config.GRPC.Address)
config.TTRPC.UID = config.GRPC.UID
config.TTRPC.GID = config.GRPC.GID
}
log.G(ctx).WithFields(logrus.Fields{
"version": version.Version,
"revision": version.Revision,
@@ -193,7 +196,7 @@ func App() *cli.App {
serve(ctx, l, server.ServeMetrics)
}
// setup the ttrpc endpoint
tl, err := sys.GetLocalListener(ttrpcAddress, config.GRPC.UID, config.GRPC.GID)
tl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)
if err != nil {
return errors.Wrapf(err, "failed to get listener for main ttrpc endpoint")
}
@@ -207,7 +210,7 @@ func App() *cli.App {
serve(ctx, l, server.ServeTCP)
}
// setup the main grpc endpoint
l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID)
l, err := sys.GetLocalListener(config.GRPC.Address, config.GRPC.UID, config.GRPC.GID)
if err != nil {
return errors.Wrapf(err, "failed to get listener for main endpoint")
}

View File

@@ -58,6 +58,7 @@ func handleSignals(ctx context.Context, signals chan os.Signal, serverC chan *se
}
server.Stop()
close(done)
return
}
}
}

View File

@@ -17,7 +17,6 @@
package command
import (
"bytes"
"fmt"
"io/ioutil"
"log"
@@ -35,7 +34,6 @@ import (
"golang.org/x/sys/windows"
"golang.org/x/sys/windows/svc"
"golang.org/x/sys/windows/svc/debug"
"golang.org/x/sys/windows/svc/eventlog"
"golang.org/x/sys/windows/svc/mgr"
)
@@ -54,18 +52,6 @@ var (
service *handler
)
const (
// These should match the values in event_messages.mc.
eventInfo = 1
eventWarn = 1
eventError = 1
eventDebug = 2
eventPanic = 3
eventFatal = 4
eventExtraOffset = 10 // Add this to any event to get a string that supports extended data
)
// serviceFlags returns an array of flags for configuring containerd to run
// as a Windows service under control of SCM.
func serviceFlags() []cli.Flag {
@@ -124,93 +110,6 @@ type handler struct {
done chan struct{} // Indicates back to app main to quit
}
type etwHook struct {
log *eventlog.Log
}
func (h *etwHook) Levels() []logrus.Level {
return []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel,
logrus.DebugLevel,
}
}
func (h *etwHook) Fire(e *logrus.Entry) error {
var (
etype uint16
eid uint32
)
switch e.Level {
case logrus.PanicLevel:
etype = windows.EVENTLOG_ERROR_TYPE
eid = eventPanic
case logrus.FatalLevel:
etype = windows.EVENTLOG_ERROR_TYPE
eid = eventFatal
case logrus.ErrorLevel:
etype = windows.EVENTLOG_ERROR_TYPE
eid = eventError
case logrus.WarnLevel:
etype = windows.EVENTLOG_WARNING_TYPE
eid = eventWarn
case logrus.InfoLevel:
etype = windows.EVENTLOG_INFORMATION_TYPE
eid = eventInfo
case logrus.DebugLevel:
etype = windows.EVENTLOG_INFORMATION_TYPE
eid = eventDebug
default:
return errors.Wrap(errdefs.ErrInvalidArgument, "unknown level")
}
// If there is additional data, include it as a second string.
exts := ""
if len(e.Data) > 0 {
fs := bytes.Buffer{}
for k, v := range e.Data {
fs.WriteString(k)
fs.WriteByte('=')
fmt.Fprint(&fs, v)
fs.WriteByte(' ')
}
exts = fs.String()[:fs.Len()-1]
eid += eventExtraOffset
}
if h.log == nil {
fmt.Fprintf(os.Stderr, "%s [%s]\n", e.Message, exts)
return nil
}
var (
ss [2]*uint16
err error
)
ss[0], err = windows.UTF16PtrFromString(e.Message)
if err != nil {
return err
}
count := uint16(1)
if exts != "" {
ss[1], err = windows.UTF16PtrFromString(exts)
if err != nil {
return err
}
count++
}
return windows.ReportEvent(h.log.Handle, etype, 0, eid, 0, count, 0, &ss[0], nil)
}
func getServicePath() (string, error) {
p, err := exec.LookPath(os.Args[0])
if err != nil {
@@ -283,7 +182,7 @@ func registerService() error {
return err
}
return eventlog.Install(serviceNameFlag, p, false, eventlog.Info|eventlog.Warning|eventlog.Error)
return nil
}
func unregisterService() error {
@@ -299,7 +198,6 @@ func unregisterService() error {
}
defer s.Close()
eventlog.Remove(serviceNameFlag)
err = s.Delete()
if err != nil {
return err
@@ -345,20 +243,6 @@ func registerUnregisterService(root string) (bool, error) {
return true, err
}
interactive, err := svc.IsAnInteractiveSession()
if err != nil {
return true, err
}
var log *eventlog.Log
if !interactive {
log, err = eventlog.Open(serviceNameFlag)
if err != nil {
return true, err
}
}
logrus.AddHook(&etwHook{log})
logrus.SetOutput(ioutil.Discard)
}
return false, nil

View File

@@ -25,6 +25,7 @@ import (
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types"
tasktypes "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
@@ -382,7 +383,9 @@ func (c *container) loadTask(ctx context.Context, ioAttach cio.Attach) (Task, er
return nil, err
}
var i cio.IO
if ioAttach != nil {
if ioAttach != nil && response.Process.Status != tasktypes.StatusUnknown {
// Do not attach IO for task in unknown state, because there
// are no fifo paths anyway.
if i, err = attachExistingIO(response, ioAttach); err != nil {
return nil, err
}

View File

@@ -22,7 +22,6 @@ import (
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/oci"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/types"
@@ -78,6 +77,14 @@ func WithImage(i Image) NewContainerOpts {
}
}
// WithImageName allows setting the image name as the base for the container
func WithImageName(n string) NewContainerOpts {
return func(ctx context.Context, _ *Client, c *containers.Container) error {
c.Image = n
return nil
}
}
// WithContainerLabels adds the provided labels to the container
func WithContainerLabels(labels map[string]string) NewContainerOpts {
return func(_ context.Context, _ *Client, c *containers.Container) error {
@@ -182,7 +189,7 @@ func WithSnapshotCleanup(ctx context.Context, client *Client, c containers.Conta
// root filesystem in read-only mode
func WithNewSnapshotView(id string, i Image, opts ...snapshots.Opt) NewContainerOpts {
return func(ctx context.Context, client *Client, c *containers.Container) error {
diffIDs, err := i.(*image).i.RootFS(ctx, client.ContentStore(), platforms.Default())
diffIDs, err := i.(*image).i.RootFS(ctx, client.ContentStore(), client.platform)
if err != nil {
return err
}

View File

@@ -28,7 +28,6 @@ import (
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/platforms"
"github.com/opencontainers/image-spec/identity"
)
@@ -45,7 +44,7 @@ func WithRemappedSnapshotView(id string, i Image, uid, gid uint32) NewContainerO
func withRemappedSnapshotBase(id string, i Image, uid, gid uint32, readonly bool) NewContainerOpts {
return func(ctx context.Context, client *Client, c *containers.Container) error {
diffIDs, err := i.(*image).i.RootFS(ctx, client.ContentStore(), platforms.Default())
diffIDs, err := i.(*image).i.RootFS(ctx, client.ContentStore(), client.platform)
if err != nil {
return err
}

View File

@@ -22,7 +22,6 @@ import (
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/gogo/protobuf/proto"
ptypes "github.com/gogo/protobuf/types"
"github.com/opencontainers/image-spec/identity"
@@ -58,7 +57,7 @@ func WithRestoreImage(ctx context.Context, id string, client *Client, checkpoint
return err
}
diffIDs, err := i.(*image).i.RootFS(ctx, client.ContentStore(), platforms.Default())
diffIDs, err := i.(*image).i.RootFS(ctx, client.ContentStore(), client.platform)
if err != nil {
return err
}

View File

@@ -49,7 +49,7 @@ type Container struct {
// This property is required and immutable.
Runtime RuntimeInfo
// Spec should carry the the runtime specification used to implement the
// Spec should carry the runtime specification used to implement the
// container.
//
// This field is required but mutable.

View File

@@ -1,5 +1,3 @@
// +build linux
/*
Copyright The containerd Authors.

View File

@@ -312,6 +312,7 @@ func DefaultProfile(sp *specs.Spec) *specs.LinuxSeccomp {
"sigaltstack",
"signalfd",
"signalfd4",
"sigprocmask",
"sigreturn",
"socket",
"socketcall",

View File

@@ -1,4 +1,4 @@
// +build !windows
// +build !linux
/*
Copyright The containerd Authors.
@@ -16,9 +16,11 @@
limitations under the License.
*/
package archive
package seccomp
// ApplyOptions provides additional options for an Apply operation
type ApplyOptions struct {
Filter Filter // Filter tar headers
import specs "github.com/opencontainers/runtime-spec/specs-go"
// DefaultProfile defines the whitelist for the default seccomp profile.
func DefaultProfile(sp *specs.Spec) *specs.LinuxSeccomp {
return &specs.LinuxSeccomp{}
}

View File

@@ -22,7 +22,6 @@ import (
"io/ioutil"
"time"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/log"
@@ -94,15 +93,13 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
rc := &readCounter{
r: io.TeeReader(processor, digester.Hash()),
}
if err := mount.WithTempMount(ctx, mounts, func(root string) error {
if _, err := archive.Apply(ctx, root, rc); err != nil {
return err
}
// Read any trailing data
_, err := io.Copy(ioutil.Discard, rc)
return err
}); err != nil {
if err := apply(ctx, mounts, rc); err != nil {
return emptyDesc, err
}
// Read any trailing data
if _, err := io.Copy(ioutil.Discard, rc); err != nil {
return emptyDesc, err
}

View File

@@ -0,0 +1,128 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apply
import (
"context"
"io"
"strings"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/pkg/errors"
)
func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error {
switch {
case len(mounts) == 1 && mounts[0].Type == "overlay":
path, parents, err := getOverlayPath(mounts[0].Options)
if err != nil {
if errdefs.IsInvalidArgument(err) {
break
}
return err
}
opts := []archive.ApplyOpt{
archive.WithConvertWhiteout(archive.OverlayConvertWhiteout),
}
if len(parents) > 0 {
opts = append(opts, archive.WithParents(parents))
}
_, err = archive.Apply(ctx, path, r, opts...)
return err
case len(mounts) == 1 && mounts[0].Type == "aufs":
path, parents, err := getAufsPath(mounts[0].Options)
if err != nil {
if errdefs.IsInvalidArgument(err) {
break
}
return err
}
opts := []archive.ApplyOpt{
archive.WithConvertWhiteout(archive.AufsConvertWhiteout),
}
if len(parents) > 0 {
opts = append(opts, archive.WithParents(parents))
}
_, err = archive.Apply(ctx, path, r, opts...)
return err
}
return mount.WithTempMount(ctx, mounts, func(root string) error {
_, err := archive.Apply(ctx, root, r)
return err
})
}
func getOverlayPath(options []string) (upper string, lower []string, err error) {
const upperdirPrefix = "upperdir="
const lowerdirPrefix = "lowerdir="
for _, o := range options {
if strings.HasPrefix(o, upperdirPrefix) {
upper = strings.TrimPrefix(o, upperdirPrefix)
} else if strings.HasPrefix(o, lowerdirPrefix) {
lower = strings.Split(strings.TrimPrefix(o, lowerdirPrefix), ":")
}
}
if upper == "" {
return "", nil, errors.Wrap(errdefs.ErrInvalidArgument, "upperdir not found")
}
return
}
// getAufsPath handles options as given by the containerd aufs package only,
// formatted as "br:<upper>=rw[:<lower>=ro+wh]*"
func getAufsPath(options []string) (upper string, lower []string, err error) {
const (
sep = ":"
brPrefix = "br:"
rwSuffix = "=rw"
roSuffix = "=ro+wh"
)
for _, o := range options {
if strings.HasPrefix(o, brPrefix) {
o = strings.TrimPrefix(o, brPrefix)
} else {
continue
}
for _, b := range strings.Split(o, sep) {
if strings.HasSuffix(b, rwSuffix) {
if upper != "" {
return "", nil, errors.Wrap(errdefs.ErrInvalidArgument, "multiple rw branch found")
}
upper = strings.TrimSuffix(b, rwSuffix)
} else if strings.HasSuffix(b, roSuffix) {
if upper == "" {
return "", nil, errors.Wrap(errdefs.ErrInvalidArgument, "rw branch be first")
}
lower = append(lower, strings.TrimSuffix(b, roSuffix))
} else {
return "", nil, errors.Wrap(errdefs.ErrInvalidArgument, "unhandled aufs suffix")
}
}
}
if upper == "" {
return "", nil, errors.Wrap(errdefs.ErrInvalidArgument, "rw branch not found")
}
return
}

View File

@@ -1,3 +1,5 @@
// +build !linux
/*
Copyright The containerd Authors.
@@ -14,23 +16,19 @@
limitations under the License.
*/
package containerd
package apply
import (
"context"
"io"
"github.com/containerd/cgroups"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/mount"
)
// WithNamespaceCgroupDeletion removes the cgroup directory that was created for the namespace
func WithNamespaceCgroupDeletion(ctx context.Context, i *namespaces.DeleteInfo) error {
cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(i.Name))
if err != nil {
if err == cgroups.ErrCgroupDeleted {
return nil
}
func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error {
return mount.WithTempMount(ctx, mounts, func(root string) error {
_, err := archive.Apply(ctx, root, r)
return err
}
return cg.Delete()
})
}

View File

@@ -139,7 +139,7 @@ func (s windowsDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts
return emptyDesc, err
}
if _, err := archive.Apply(ctx, layer, rc, archive.WithParentLayers(parentLayerPaths), archive.AsWindowsContainerLayer()); err != nil {
if _, err := archive.Apply(ctx, layer, rc, archive.WithParents(parentLayerPaths), archive.AsWindowsContainerLayer()); err != nil {
return emptyDesc, err
}

View File

@@ -50,7 +50,7 @@ var _ events.Publisher = &Exchange{}
var _ events.Forwarder = &Exchange{}
var _ events.Subscriber = &Exchange{}
// Forward accepts an envelope to be direcly distributed on the exchange.
// Forward accepts an envelope to be directly distributed on the exchange.
//
// This is useful when an event is forwarded on behalf of another namespace or
// when the event is propagated on behalf of another publisher.

View File

@@ -19,6 +19,8 @@ package containerd
import (
"context"
"fmt"
"strings"
"sync/atomic"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
@@ -31,6 +33,7 @@ import (
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)
// Image describes an image used by containers
@@ -47,6 +50,8 @@ type Image interface {
RootFS(ctx context.Context) ([]digest.Digest, error)
// Size returns the total size of the image's packed resources.
Size(ctx context.Context) (int64, error)
// Usage returns a usage calculation for the image.
Usage(context.Context, ...UsageOpt) (int64, error)
// Config descriptor for the image.
Config(ctx context.Context) (ocispec.Descriptor, error)
// IsUnpacked returns whether or not an image is unpacked.
@@ -55,6 +60,49 @@ type Image interface {
ContentStore() content.Store
}
type usageOptions struct {
manifestLimit *int
manifestOnly bool
snapshots bool
}
// UsageOpt is used to configure the usage calculation
type UsageOpt func(*usageOptions) error
// WithUsageManifestLimit sets the limit to the number of manifests which will
// be walked for usage. Setting this value to 0 will require all manifests to
// be walked, returning ErrNotFound if manifests are missing.
// NOTE: By default all manifests which exist will be walked
// and any non-existent manifests and their subobjects will be ignored.
func WithUsageManifestLimit(i int) UsageOpt {
// If 0 then don't filter any manifests
// By default limits to current platform
return func(o *usageOptions) error {
o.manifestLimit = &i
return nil
}
}
// WithSnapshotUsage will check for referenced snapshots from the image objects
// and include the snapshot size in the total usage.
func WithSnapshotUsage() UsageOpt {
return func(o *usageOptions) error {
o.snapshots = true
return nil
}
}
// WithManifestUsage is used to get the usage for an image based on what is
// reported by the manifests rather than what exists in the content store.
// NOTE: This function is best used with the manifest limit set to get a
// consistent value, otherwise non-existent manifests will be excluded.
func WithManifestUsage() UsageOpt {
return func(o *usageOptions) error {
o.manifestOnly = true
return nil
}
}
var _ = (Image)(&image{})
// NewImage returns a client image object from the metadata image
@@ -62,7 +110,7 @@ func NewImage(client *Client, i images.Image) Image {
return &image{
client: client,
i: i,
platform: platforms.Default(),
platform: client.platform,
}
}
@@ -100,8 +148,95 @@ func (i *image) RootFS(ctx context.Context) ([]digest.Digest, error) {
}
func (i *image) Size(ctx context.Context) (int64, error) {
provider := i.client.ContentStore()
return i.i.Size(ctx, provider, i.platform)
return i.Usage(ctx, WithUsageManifestLimit(1), WithManifestUsage())
}
func (i *image) Usage(ctx context.Context, opts ...UsageOpt) (int64, error) {
var config usageOptions
for _, opt := range opts {
if err := opt(&config); err != nil {
return 0, err
}
}
var (
provider = i.client.ContentStore()
handler = images.ChildrenHandler(provider)
size int64
mustExist bool
)
if config.manifestLimit != nil {
handler = images.LimitManifests(handler, i.platform, *config.manifestLimit)
mustExist = true
}
var wh images.HandlerFunc = func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
var usage int64
children, err := handler(ctx, desc)
if err != nil {
if !errdefs.IsNotFound(err) || mustExist {
return nil, err
}
if !config.manifestOnly {
// Do not count size of non-existent objects
desc.Size = 0
}
} else if config.snapshots || !config.manifestOnly {
info, err := provider.Info(ctx, desc.Digest)
if err != nil {
if !errdefs.IsNotFound(err) {
return nil, err
}
if !config.manifestOnly {
// Do not count size of non-existent objects
desc.Size = 0
}
} else if info.Size > desc.Size {
// Count actual usage, Size may be unset or -1
desc.Size = info.Size
}
for k, v := range info.Labels {
const prefix = "containerd.io/gc.ref.snapshot."
if !strings.HasPrefix(k, prefix) {
continue
}
sn := i.client.SnapshotService(k[len(prefix):])
if sn == nil {
continue
}
u, err := sn.Usage(ctx, v)
if err != nil {
if !errdefs.IsNotFound(err) && !errdefs.IsInvalidArgument(err) {
return nil, err
}
} else {
usage += u.Size
}
}
}
// Ignore unknown sizes. Generally unknown sizes should
// never be set in manifests, however, the usage
// calculation does not need to enforce this.
if desc.Size >= 0 {
usage += desc.Size
}
atomic.AddInt64(&size, usage)
return children, nil
}
l := semaphore.NewWeighted(3)
if err := images.Dispatch(ctx, wh, l, i.i.Target); err != nil {
return 0, err
}
return size, nil
}
func (i *image) Config(ctx context.Context) (ocispec.Descriptor, error) {

View File

@@ -117,7 +117,7 @@ func Walk(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) err
//
// If any handler returns an error, the dispatch session will be canceled.
func Dispatch(ctx context.Context, handler Handler, limiter *semaphore.Weighted, descs ...ocispec.Descriptor) error {
eg, ctx := errgroup.WithContext(ctx)
eg, ctx2 := errgroup.WithContext(ctx)
for _, desc := range descs {
desc := desc
@@ -126,10 +126,11 @@ func Dispatch(ctx context.Context, handler Handler, limiter *semaphore.Weighted,
return err
}
}
eg.Go(func() error {
desc := desc
children, err := handler.Handle(ctx, desc)
children, err := handler.Handle(ctx2, desc)
if limiter != nil {
limiter.Release(1)
}
@@ -141,7 +142,7 @@ func Dispatch(ctx context.Context, handler Handler, limiter *semaphore.Weighted,
}
if len(children) > 0 {
return Dispatch(ctx, handler, limiter, children...)
return Dispatch(ctx2, handler, limiter, children...)
}
return nil

View File

@@ -119,7 +119,7 @@ func (image *Image) Size(ctx context.Context, provider content.Provider, platfor
}
size += desc.Size
return nil, nil
}), FilterPlatforms(ChildrenHandler(provider), platform)), image.Target)
}), LimitManifests(FilterPlatforms(ChildrenHandler(provider), platform), platform, 1)), image.Target)
}
type platformManifest struct {

View File

@@ -86,7 +86,7 @@ func WithImportCompression() ImportOpt {
// Import imports an image from a Tar stream using reader.
// Caller needs to specify importer. Future version may use oci.v1 as the default.
// Note that unreferrenced blobs may be imported to the content store as well.
// Note that unreferenced blobs may be imported to the content store as well.
func (c *Client) Import(ctx context.Context, reader io.Reader, opts ...ImportOpt) ([]images.Image, error) {
var iopts importOpts
for _, o := range opts {
@@ -125,7 +125,7 @@ func (c *Client) Import(ctx context.Context, reader io.Reader, opts ...ImportOpt
}
var platformMatcher = platforms.All
if !iopts.allPlatforms {
platformMatcher = platforms.Default()
platformMatcher = c.platform
}
var handler images.HandlerFunc = func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {

View File

@@ -27,7 +27,6 @@ import (
"github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/pkg/errors"
)
@@ -43,7 +42,7 @@ func (c *Client) Install(ctx context.Context, image Image, opts ...InstallOpts)
}
var (
cs = image.ContentStore()
platform = platforms.Default()
platform = c.platform
)
manifest, err := images.Manifest(ctx, cs, image.Target(), platform)
if err != nil {

View File

@@ -30,7 +30,7 @@ var (
// messages.
G = GetLogger
// L is an alias for the the standard logger.
// L is an alias for the standard logger.
L = logrus.NewEntry(logrus.StandardLogger())
)

View File

@@ -32,7 +32,7 @@ import (
bolt "go.etcd.io/bbolt"
)
// LeaseManager manages the create/delete lifecyle of leases
// LeaseManager manages the create/delete lifecycle of leases
// and also returns existing leases
type LeaseManager struct {
tx *bolt.Tx
@@ -95,7 +95,7 @@ func (lm *LeaseManager) Create(ctx context.Context, opts ...leases.Opt) (leases.
return l, nil
}
// Delete delets the lease with the provided lease ID
// Delete deletes the lease with the provided lease ID
func (lm *LeaseManager) Delete(ctx context.Context, lease leases.Lease, _ ...leases.DeleteOpt) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {

View File

@@ -68,7 +68,7 @@ var blkioMetrics = []*metric{
},
{
name: "blkio_io_service_time_recursive",
help: "The blkio io servie time recursive",
help: "The blkio io service time recursive",
unit: metrics.Total,
vt: prometheus.GaugeValue,
labels: []string{"op", "device", "major", "minor"},
@@ -81,7 +81,7 @@ var blkioMetrics = []*metric{
},
{
name: "blkio_io_serviced_recursive",
help: "The blkio io servied recursive",
help: "The blkio io serviced recursive",
unit: metrics.Total,
vt: prometheus.GaugeValue,
labels: []string{"op", "device", "major", "minor"},

View File

@@ -141,7 +141,6 @@ func populateDefaultUnixSpec(ctx context.Context, s *Spec, id string) error {
Path: defaultRootfsPath,
},
Process: &specs.Process{
Env: defaultUnixEnv,
Cwd: "/",
NoNewPrivileges: true,
User: specs.User{

View File

@@ -118,7 +118,7 @@ func WithDefaultSpecForPlatform(platform string) SpecOpts {
}
}
// WithSpecFromBytes loads the the spec from the provided byte slice.
// WithSpecFromBytes loads the spec from the provided byte slice.
func WithSpecFromBytes(p []byte) SpecOpts {
return func(_ context.Context, _ Client, _ *containers.Container, s *Spec) error {
*s = Spec{} // make sure spec is cleared.
@@ -151,6 +151,13 @@ func WithEnv(environmentVariables []string) SpecOpts {
}
}
// WithDefaultPathEnv sets the $PATH environment variable to the
// default PATH defined in this package.
func WithDefaultPathEnv(_ context.Context, _ Client, _ *containers.Container, s *Spec) error {
s.Process.Env = replaceOrAppendEnvValues(s.Process.Env, defaultUnixEnv)
return nil
}
// replaceOrAppendEnvValues returns the defaults with the overrides either
// replaced by env key or appended to the list
func replaceOrAppendEnvValues(defaults, overrides []string) []string {
@@ -326,7 +333,11 @@ func WithImageConfigArgs(image Image, args []string) SpecOpts {
setProcess(s)
if s.Linux != nil {
s.Process.Env = replaceOrAppendEnvValues(s.Process.Env, config.Env)
defaults := config.Env
if len(defaults) == 0 {
defaults = defaultUnixEnv
}
s.Process.Env = replaceOrAppendEnvValues(defaults, s.Process.Env)
cmd := config.Cmd
if len(args) > 0 {
cmd = args
@@ -348,7 +359,7 @@ func WithImageConfigArgs(image Image, args []string) SpecOpts {
// even if there is no specified user in the image config
return WithAdditionalGIDs("root")(ctx, client, c, s)
} else if s.Windows != nil {
s.Process.Env = replaceOrAppendEnvValues(s.Process.Env, config.Env)
s.Process.Env = replaceOrAppendEnvValues(config.Env, s.Process.Env)
cmd := config.Cmd
if len(args) > 0 {
cmd = args
@@ -621,7 +632,7 @@ func WithUserID(uid uint32) SpecOpts {
}
// WithUsername sets the correct UID and GID for the container
// based on the the image's /etc/passwd contents. If /etc/passwd
// based on the image's /etc/passwd contents. If /etc/passwd
// does not exist, or the username is not found in /etc/passwd,
// it returns error.
func WithUsername(username string) SpecOpts {

View File

@@ -0,0 +1,66 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package timeout
import (
"context"
"sync"
"time"
)
var (
mu sync.Mutex
timeouts = make(map[string]time.Duration)
// DefaultTimeout of the timeout package
DefaultTimeout = 1 * time.Second
)
// Set the timeout for the key
func Set(key string, t time.Duration) {
mu.Lock()
timeouts[key] = t
mu.Unlock()
}
// Get returns the timeout for the provided key
func Get(key string) time.Duration {
mu.Lock()
t, ok := timeouts[key]
mu.Unlock()
if !ok {
t = DefaultTimeout
}
return t
}
// WithContext returns a context with the specified timeout for the provided key
func WithContext(ctx context.Context, key string) (context.Context, func()) {
t := Get(key)
return context.WithTimeout(ctx, t)
}
// All returns all keys and their timeouts
func All() map[string]time.Duration {
out := make(map[string]time.Duration)
mu.Lock()
defer mu.Unlock()
for k, v := range timeouts {
out[k] = v
}
return out
}

View File

@@ -130,7 +130,7 @@ type Matcher interface {
// specification. The returned matcher only looks for equality based on os,
// architecture and variant.
//
// One may implement their own matcher if this doesn't provide the the required
// One may implement their own matcher if this doesn't provide the required
// functionality.
//
// Applications should opt to use `Match` over directly parsing specifiers.

View File

@@ -28,12 +28,13 @@ import (
// InitContext is used for plugin inititalization
type InitContext struct {
Context context.Context
Root string
State string
Config interface{}
Address string
Events *exchange.Exchange
Context context.Context
Root string
State string
Config interface{}
Address string
TTRPCAddress string
Events *exchange.Exchange
Meta *Meta // plugins can fill in metadata at init.

View File

@@ -44,7 +44,7 @@ type Process interface {
Wait(context.Context) (<-chan ExitStatus, error)
// CloseIO allows various pipes to be closed on the process
CloseIO(context.Context, ...IOCloserOpts) error
// Resize changes the width and heigh of the process's terminal
// Resize changes the width and height of the process's terminal
Resize(ctx context.Context, w, h uint32) error
// IO returns the io set for the process
IO() cio.IO

View File

@@ -32,7 +32,7 @@ import (
// Pull downloads the provided content into containerd's content store
// and returns a platform specific image object
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image, error) {
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Image, retErr error) {
pullCtx := defaultRemoteContext()
for _, o := range opts {
if err := o(c, pullCtx); err != nil {
@@ -44,7 +44,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
if len(pullCtx.Platforms) > 1 {
return nil, errors.New("cannot pull multiplatform image locally, try Fetch")
} else if len(pullCtx.Platforms) == 0 {
pullCtx.PlatformMatcher = platforms.Default()
pullCtx.PlatformMatcher = c.platform
} else {
p, err := platforms.Parse(pullCtx.Platforms[0])
if err != nil {
@@ -61,6 +61,30 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
}
defer done(ctx)
var unpacks int32
if pullCtx.Unpack {
// unpacker only supports schema 2 image, for schema 1 this is noop.
u, err := c.newUnpacker(ctx, pullCtx)
if err != nil {
return nil, errors.Wrap(err, "create unpacker")
}
unpackWrapper, eg := u.handlerWrapper(ctx, &unpacks)
defer func() {
if err := eg.Wait(); err != nil {
if retErr == nil {
retErr = errors.Wrap(err, "unpack")
}
}
}()
wrapper := pullCtx.HandlerWrapper
pullCtx.HandlerWrapper = func(h images.Handler) images.Handler {
if wrapper == nil {
return unpackWrapper(h)
}
return wrapper(unpackWrapper(h))
}
}
img, err := c.fetch(ctx, pullCtx, ref, 1)
if err != nil {
return nil, err
@@ -69,8 +93,12 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher)
if pullCtx.Unpack {
if err := i.Unpack(ctx, pullCtx.Snapshotter); err != nil {
return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
if unpacks == 0 {
// Try to unpack is none is done previously.
// This is at least required for schema 1 image.
if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil {
return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
}
}
}
@@ -112,9 +140,14 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim
childrenHandler := images.ChildrenHandler(store)
// Set any children labels for that content
childrenHandler = images.SetChildrenLabels(store, childrenHandler)
// Filter manifests by platforms but allow to handle manifest
// and configuration for not-target platforms
childrenHandler = remotes.FilterManifestByPlatformHandler(childrenHandler, rCtx.PlatformMatcher)
if rCtx.AllMetadata {
// Filter manifests by platforms but allow to handle manifest
// and configuration for not-target platforms
childrenHandler = remotes.FilterManifestByPlatformHandler(childrenHandler, rCtx.PlatformMatcher)
} else {
// Filter children by platforms if specified.
childrenHandler = images.FilterPlatforms(childrenHandler, rCtx.PlatformMatcher)
}
// Sort and limit manifests if a finite number is needed
if limit > 0 {
childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit)
@@ -131,22 +164,18 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim
},
)
appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, ref)
if err != nil {
return images.Image{}, err
}
handlers := append(rCtx.BaseHandlers,
remotes.FetchHandler(store, fetcher),
convertibleHandler,
childrenHandler,
appendDistSrcLabelHandler,
)
// append distribution source label to blob data
if rCtx.AppendDistributionSourceLabel {
appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, ref)
if err != nil {
return images.Image{}, err
}
handlers = append(handlers, appendDistSrcLabelHandler)
}
handler = images.Handlers(handlers...)
converterFunc = func(ctx context.Context, desc ocispec.Descriptor) (ocispec.Descriptor, error) {

View File

@@ -137,7 +137,6 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
// for the private repo, we should remove mount-from
// query and send the request again.
resp, err = preq.do(pctx)
//resp, err = p.doRequest(pctx, req)
if err != nil {
return nil, err
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/snapshots"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
@@ -31,6 +32,13 @@ import (
// the content creation and the provided snapshotter and mount differ are used
// for calculating the diff. The descriptor for the layer diff is returned.
func CreateDiff(ctx context.Context, snapshotID string, sn snapshots.Snapshotter, d diff.Comparer, opts ...diff.Opt) (ocispec.Descriptor, error) {
// dctx is used to handle cleanup things just in case the param ctx
// has been canceled, which causes that the defer cleanup fails.
dctx := context.Background()
if ns, ok := namespaces.Namespace(ctx); ok {
dctx = namespaces.WithNamespace(dctx, ns)
}
info, err := sn.Stat(ctx, snapshotID)
if err != nil {
return ocispec.Descriptor{}, err
@@ -41,7 +49,7 @@ func CreateDiff(ctx context.Context, snapshotID string, sn snapshots.Snapshotter
if err != nil {
return ocispec.Descriptor{}, err
}
defer sn.Remove(ctx, lowerKey)
defer sn.Remove(dctx, lowerKey)
var upper []mount.Mount
if info.Kind == snapshots.KindActive {
@@ -55,7 +63,7 @@ func CreateDiff(ctx context.Context, snapshotID string, sn snapshots.Snapshotter
if err != nil {
return ocispec.Descriptor{}, err
}
defer sn.Remove(ctx, upperKey)
defer sn.Remove(dctx, upperKey)
}
return d.Compare(ctx, lower, upper, opts...)

View File

@@ -1,109 +0,0 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"os/exec"
"sync"
"time"
"github.com/containerd/containerd/sys"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
)
// ErrNoSuchProcess is returned when the process no longer exists
var ErrNoSuchProcess = errors.New("no such process")
const bufferSize = 2048
// Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels
func Reap() error {
now := time.Now()
exits, err := sys.Reap(false)
Default.Lock()
for c := range Default.subscribers {
for _, e := range exits {
c <- runc.Exit{
Timestamp: now,
Pid: e.Pid,
Status: e.Status,
}
}
}
Default.Unlock()
return err
}
// Default is the default monitor initialized for the package
var Default = &Monitor{
subscribers: make(map[chan runc.Exit]struct{}),
}
// Monitor monitors the underlying system for process status changes
type Monitor struct {
sync.Mutex
subscribers map[chan runc.Exit]struct{}
}
// Start starts the command a registers the process with the reaper
func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) {
ec := m.Subscribe()
if err := c.Start(); err != nil {
m.Unsubscribe(ec)
return nil, err
}
return ec, nil
}
// Wait blocks until a process is signal as dead.
// User should rely on the value of the exit status to determine if the
// command was successful or not.
func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
for e := range ec {
if e.Pid == c.Process.Pid {
// make sure we flush all IO
c.Wait()
m.Unsubscribe(ec)
return e.Status, nil
}
}
// return no such process if the ec channel is closed and no more exit
// events will be sent
return -1, ErrNoSuchProcess
}
// Subscribe to process exit changes
func (m *Monitor) Subscribe() chan runc.Exit {
c := make(chan runc.Exit, bufferSize)
m.Lock()
m.subscribers[c] = struct{}{}
m.Unlock()
return c
}
// Unsubscribe to process exit changes
func (m *Monitor) Unsubscribe(c chan runc.Exit) {
m.Lock()
delete(m.subscribers, c)
close(c)
m.Unlock()
}

View File

@@ -40,6 +40,7 @@ import (
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes"
shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
"github.com/containerd/containerd/sys/reaper"
runc "github.com/containerd/go-runc"
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
@@ -86,7 +87,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) {
context: ctx,
processes: make(map[string]process.Process),
events: make(chan interface{}, 128),
ec: Default.Subscribe(),
ec: reaper.Default.Subscribe(),
}
go s.processExits()
if err := s.initPlatform(); err != nil {
@@ -514,33 +515,35 @@ func (s *Service) allProcesses() []process.Process {
}
func (s *Service) checkProcesses(e runc.Exit) {
shouldKillAll, err := shouldKillAllOnExit(s.bundle)
if err != nil {
log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
}
for _, p := range s.allProcesses() {
if p.Pid() == e.Pid {
if p.Pid() != e.Pid {
continue
}
if ip, ok := p.(*process.Init); ok {
shouldKillAll, err := shouldKillAllOnExit(s.bundle)
if err != nil {
log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
}
// Ensure all children are killed
if shouldKillAll {
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
p.SetExited(e.Status)
s.events <- &eventstypes.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: p.ExitedAt(),
}
return
}
p.SetExited(e.Status)
s.events <- &eventstypes.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: p.ExitedAt(),
}
return
}
}

View File

@@ -183,7 +183,7 @@ Current supported schemes for logging are:
* file - Linux & Windows
* npipe - Windows
Binary logging has the abilty to forward a container's STDIO to an external binary for consumption.
Binary logging has the ability to forward a container's STDIO to an external binary for consumption.
A sample logging driver that forwards the container's STDOUT and STDERR to `journald` is:
```go

View File

@@ -35,22 +35,24 @@ import (
"github.com/sirupsen/logrus"
)
func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary {
func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary {
return &binary{
bundle: bundle,
runtime: runtime,
containerdAddress: containerdAddress,
events: events,
rtTasks: rt,
bundle: bundle,
runtime: runtime,
containerdAddress: containerdAddress,
containerdTTRPCAddress: containerdTTRPCAddress,
events: events,
rtTasks: rt,
}
}
type binary struct {
runtime string
containerdAddress string
bundle *Bundle
events *exchange.Exchange
rtTasks *runtime.TaskList
runtime string
containerdAddress string
containerdTTRPCAddress string
bundle *Bundle
events *exchange.Exchange
rtTasks *runtime.TaskList
}
func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
@@ -64,6 +66,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
ctx,
b.runtime,
b.containerdAddress,
b.containerdTTRPCAddress,
b.bundle.Path,
opts,
args...,
@@ -85,13 +88,10 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
// copy the shim's logs to containerd's output
go func() {
defer f.Close()
if _, err := io.Copy(os.Stderr, f); err != nil {
// When using a multi-container shim the 2nd to Nth container in the
// shim will not have a separate log pipe. Ignore the failure log
// message here when the shim connect times out.
if !os.IsNotExist(errors.Cause(err)) {
log.G(ctx).WithError(err).Error("copy shim log")
}
_, err := io.Copy(os.Stderr, f)
err = checkCopyShimLogError(ctx, err)
if err != nil {
log.G(ctx).WithError(err).Error("copy shim log")
}
}()
out, err := cmd.CombinedOutput()
@@ -127,6 +127,7 @@ func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
cmd, err := client.Command(ctx,
b.runtime,
b.containerdAddress,
b.containerdTTRPCAddress,
bundlePath,
nil,
"-id", b.bundle.ID,

View File

@@ -69,25 +69,26 @@ func init() {
if err != nil {
return nil, err
}
return New(ic.Context, ic.Root, ic.State, ic.Address, ic.Events, m.(*metadata.DB))
return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, m.(*metadata.DB))
},
})
}
// New task manager for v2 shims
func New(ctx context.Context, root, state, containerdAddress string, events *exchange.Exchange, db *metadata.DB) (*TaskManager, error) {
func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAddress string, events *exchange.Exchange, db *metadata.DB) (*TaskManager, error) {
for _, d := range []string{root, state} {
if err := os.MkdirAll(d, 0711); err != nil {
return nil, err
}
}
m := &TaskManager{
root: root,
state: state,
containerdAddress: containerdAddress,
tasks: runtime.NewTaskList(),
events: events,
db: db,
root: root,
state: state,
containerdAddress: containerdAddress,
containerdTTRPCAddress: containerdTTRPCAddress,
tasks: runtime.NewTaskList(),
events: events,
db: db,
}
if err := m.loadExistingTasks(ctx); err != nil {
return nil, err
@@ -97,9 +98,10 @@ func New(ctx context.Context, root, state, containerdAddress string, events *exc
// TaskManager manages v2 shim's and their tasks
type TaskManager struct {
root string
state string
containerdAddress string
root string
state string
containerdAddress string
containerdTTRPCAddress string
tasks *runtime.TaskList
events *exchange.Exchange
@@ -131,7 +133,7 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create
topts = opts.RuntimeOptions
}
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.events, m.tasks)
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
shim, err := b.Start(ctx, topts, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")
_, err := m.tasks.Get(ctx, id)
@@ -254,7 +256,7 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
bundle.Delete()
continue
}
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.events, m.tasks)
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")
_, err := m.tasks.Get(ctx, id)

View File

@@ -32,6 +32,7 @@ import (
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/runtime"
client "github.com/containerd/containerd/runtime/v2/shim"
"github.com/containerd/containerd/runtime/v2/task"
@@ -41,6 +42,18 @@ import (
"github.com/sirupsen/logrus"
)
const (
loadTimeout = "io.containerd.timeout.shim.load"
cleanupTimeout = "io.containerd.timeout.shim.cleanup"
shutdownTimeout = "io.containerd.timeout.shim.shutdown"
)
func init() {
timeout.Set(loadTimeout, 5*time.Second)
timeout.Set(cleanupTimeout, 5*time.Second)
timeout.Set(shutdownTimeout, 3*time.Second)
}
func loadAddress(path string) (string, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
@@ -100,7 +113,7 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
events: events,
rtTasks: rt,
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := timeout.WithContext(ctx, loadTimeout)
defer cancel()
if err := s.Connect(ctx); err != nil {
return nil, err
@@ -110,7 +123,7 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) {
ctx = namespaces.WithNamespace(ctx, ns)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := timeout.WithContext(ctx, cleanupTimeout)
defer cancel()
log.G(ctx).WithFields(logrus.Fields{
@@ -185,7 +198,7 @@ func (s *shim) Shutdown(ctx context.Context) error {
}
func (s *shim) waitShutdown(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
ctx, cancel := timeout.WithContext(ctx, shutdownTimeout)
defer cancel()
return s.Shutdown(ctx)
}

View File

@@ -41,13 +41,13 @@ type item struct {
count int
}
func newPublisher(address string) (*remoteEventsPublisher, error) {
func NewPublisher(address string) (*RemoteEventsPublisher, error) {
client, err := ttrpcutil.NewClient(address)
if err != nil {
return nil, err
}
l := &remoteEventsPublisher{
l := &RemoteEventsPublisher{
client: client,
closed: make(chan struct{}),
requeue: make(chan *item, queueSize),
@@ -57,18 +57,18 @@ func newPublisher(address string) (*remoteEventsPublisher, error) {
return l, nil
}
type remoteEventsPublisher struct {
type RemoteEventsPublisher struct {
client *ttrpcutil.Client
closed chan struct{}
closer sync.Once
requeue chan *item
}
func (l *remoteEventsPublisher) Done() <-chan struct{} {
func (l *RemoteEventsPublisher) Done() <-chan struct{} {
return l.closed
}
func (l *remoteEventsPublisher) Close() (err error) {
func (l *RemoteEventsPublisher) Close() (err error) {
err = l.client.Close()
l.closer.Do(func() {
close(l.closed)
@@ -76,7 +76,7 @@ func (l *remoteEventsPublisher) Close() (err error) {
return err
}
func (l *remoteEventsPublisher) processQueue() {
func (l *RemoteEventsPublisher) processQueue() {
for i := range l.requeue {
if i.count > maxRequeue {
logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
@@ -91,7 +91,7 @@ func (l *remoteEventsPublisher) processQueue() {
}
}
func (l *remoteEventsPublisher) queue(i *item) {
func (l *RemoteEventsPublisher) queue(i *item) {
go func() {
i.count++
// re-queue after a short delay
@@ -100,7 +100,7 @@ func (l *remoteEventsPublisher) queue(i *item) {
}()
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
@@ -127,7 +127,7 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
return nil
}
func (l *remoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
_, err := l.client.EventsService().Forward(ctx, req)
if err == nil {
return nil

View File

@@ -57,7 +57,7 @@ type Init func(context.Context, string, Publisher, func()) (Shim, error)
type Shim interface {
shimapi.TaskService
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error)
StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error)
}
// OptsKey is the context key for the Opts value.
@@ -93,6 +93,10 @@ var (
action string
)
const (
ttrpcAddressEnv = "TTRPC_ADDRESS"
)
func parseFlags() {
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
@@ -163,8 +167,9 @@ func run(id string, initFunc Init, config Config) error {
}
}
address := fmt.Sprintf("%s.ttrpc", addressFlag)
publisher, err := newPublisher(address)
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
publisher, err := NewPublisher(ttrpcAddress)
if err != nil {
return err
}
@@ -203,7 +208,7 @@ func run(id string, initFunc Init, config Config) error {
}
return nil
case "start":
address, err := service.StartShim(ctx, idFlag, containerdBinaryFlag, addressFlag)
address, err := service.StartShim(ctx, idFlag, containerdBinaryFlag, addressFlag, ttrpcAddress)
if err != nil {
return err
}

View File

@@ -26,6 +26,7 @@ import (
"os/signal"
"syscall"
"github.com/containerd/containerd/sys/reaper"
"github.com/containerd/fifo"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -79,7 +80,7 @@ func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Si
case s := <-signals:
switch s {
case unix.SIGCHLD:
if err := Reap(); err != nil {
if err := reaper.Reap(); err != nil {
logger.WithError(err).Error("reap exit status")
}
case unix.SIGPIPE:

View File

@@ -38,7 +38,7 @@ import (
var runtimePaths sync.Map
// Command returns the shim command with the provided args and configuration
func Command(ctx context.Context, runtime, containerdAddress, path string, opts *types.Any, cmdArgs ...string) (*exec.Cmd, error) {
func Command(ctx context.Context, runtime, containerdAddress, containerdTTRPCAddress, path string, opts *types.Any, cmdArgs ...string) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
@@ -95,7 +95,11 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, opts
cmd := exec.Command(cmdPath, args...)
cmd.Dir = path
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
cmd.Env = append(
os.Environ(),
"GOMAXPROCS=2",
fmt.Sprintf("%s=%s", ttrpcAddressEnv, containerdTTRPCAddress),
)
cmd.SysProcAttr = getSysProcAttr()
if opts != nil {
d, err := proto.Marshal(opts)

View File

@@ -30,3 +30,17 @@ import (
func openShimLog(ctx context.Context, bundle *Bundle) (io.ReadCloser, error) {
return fifo.OpenFifo(ctx, filepath.Join(bundle.Path, "log"), unix.O_RDONLY|unix.O_CREAT|unix.O_NONBLOCK, 0700)
}
func checkCopyShimLogError(ctx context.Context, err error) error {
// When using a multi-container shim, the fifo of the 2nd to Nth
// container will not be opened when the ctx is done. This will
// cause an ErrReadClosed that can be ignored.
select {
case <-ctx.Done():
if err == fifo.ErrReadClosed {
return nil
}
default:
}
return err
}

View File

@@ -21,6 +21,7 @@ import (
"fmt"
"io"
"net"
"os"
"sync"
"time"
@@ -85,3 +86,13 @@ func openShimLog(ctx context.Context, bundle *Bundle) (io.ReadCloser, error) {
}()
return dpc, nil
}
func checkCopyShimLogError(ctx context.Context, err error) error {
// When using a multi-container shim the 2nd to Nth container in the
// shim will not have a separate log pipe. Ignore the failure log
// message here when the shim connect times out.
if os.IsNotExist(errors.Cause(err)) {
return nil
}
return err
}

View File

@@ -17,12 +17,15 @@
package config
import (
"path/filepath"
"strings"
"github.com/BurntSushi/toml"
"github.com/imdario/mergo"
"github.com/pkg/errors"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/plugin"
"github.com/pkg/errors"
)
// Config provides containerd configuration data for the server
@@ -37,6 +40,8 @@ type Config struct {
PluginDir string `toml:"plugin_dir"`
// GRPC configuration settings
GRPC GRPCConfig `toml:"grpc"`
// TTRPC configuration settings
TTRPC TTRPCConfig `toml:"ttrpc"`
// Debug and profiling settings
Debug Debug `toml:"debug"`
// Metrics and monitoring settings
@@ -55,16 +60,16 @@ type Config struct {
Cgroup CgroupConfig `toml:"cgroup"`
// ProxyPlugins configures plugins which are communicated to over GRPC
ProxyPlugins map[string]ProxyPlugin `toml:"proxy_plugins"`
// Timeouts specified as a duration
Timeouts map[string]string `toml:"timeouts"`
// Imports are additional file path list to config files that can overwrite main config file fields
Imports []string `toml:"imports"`
StreamProcessors []StreamProcessor `toml:"stream_processors"`
md toml.MetaData
StreamProcessors map[string]StreamProcessor `toml:"stream_processors"`
}
// StreamProcessor provides configuration for diff content processors
type StreamProcessor struct {
// ID of the processor, also used to fetch the specific payload
ID string `toml:"id"`
// Accepts specific media-types
Accepts []string `toml:"accepts"`
// Returns the media-type
@@ -103,11 +108,6 @@ func (c *Config) ValidateV2() error {
return errors.Errorf("invalid plugin key URI %q expect io.containerd.x.vx", p)
}
}
for p := range c.ProxyPlugins {
if len(strings.Split(p, ".")) < 4 {
return errors.Errorf("invalid proxy plugin key URI %q expect io.containerd.x.vx", p)
}
}
return nil
}
@@ -123,6 +123,13 @@ type GRPCConfig struct {
MaxSendMsgSize int `toml:"max_send_message_size"`
}
// TTRPCConfig provides TTRPC configuration for the socket
type TTRPCConfig struct {
Address string `toml:"address"`
UID int `toml:"uid"`
GID int `toml:"gid"`
}
// Debug provides debug configuration
type Debug struct {
Address string `toml:"address"`
@@ -196,23 +203,125 @@ func (c *Config) Decode(p *plugin.Registration) (interface{}, error) {
if !ok {
return p.Config, nil
}
if err := c.md.PrimitiveDecode(data, p.Config); err != nil {
if err := toml.PrimitiveDecode(data, p.Config); err != nil {
return nil, err
}
return p.Config, nil
}
// LoadConfig loads the containerd server config from the provided path
func LoadConfig(path string, v *Config) error {
if v == nil {
return errors.Wrapf(errdefs.ErrInvalidArgument, "argument v must not be nil")
func LoadConfig(path string, out *Config) error {
if out == nil {
return errors.Wrapf(errdefs.ErrInvalidArgument, "argument out must not be nil")
}
md, err := toml.DecodeFile(path, v)
var (
loaded = map[string]bool{}
pending = []string{path}
)
for len(pending) > 0 {
path, pending = pending[0], pending[1:]
// Check if a file at the given path already loaded to prevent circular imports
if _, ok := loaded[path]; ok {
continue
}
config, err := loadConfigFile(path)
if err != nil {
return err
}
if err := mergeConfig(out, config); err != nil {
return err
}
imports, err := resolveImports(path, config.Imports)
if err != nil {
return err
}
loaded[path] = true
pending = append(pending, imports...)
}
// Fix up the list of config files loaded
out.Imports = []string{}
for path := range loaded {
out.Imports = append(out.Imports, path)
}
return out.ValidateV2()
}
// loadConfigFile decodes a TOML file at the given path
func loadConfigFile(path string) (*Config, error) {
config := &Config{}
_, err := toml.DecodeFile(path, &config)
if err != nil {
return nil, err
}
return config, nil
}
// resolveImports resolves import strings list to absolute paths list:
// - If path contains *, glob pattern matching applied
// - Non abs path is relative to parent config file directory
// - Abs paths returned as is
func resolveImports(parent string, imports []string) ([]string, error) {
var out []string
for _, path := range imports {
if strings.Contains(path, "*") {
matches, err := filepath.Glob(path)
if err != nil {
return nil, err
}
out = append(out, matches...)
} else {
path = filepath.Clean(path)
if !filepath.IsAbs(path) {
path = filepath.Join(filepath.Dir(parent), path)
}
out = append(out, path)
}
}
return out, nil
}
// mergeConfig merges Config structs with the following rules:
// 'to' 'from' 'result'
// "" "value" "value"
// "value" "" "value"
// 1 0 1
// 0 1 1
// []{"1"} []{"2"} []{"1","2"}
// []{"1"} []{} []{"1"}
// Maps merged by keys, but values are replaced entirely.
func mergeConfig(to, from *Config) error {
err := mergo.Merge(to, from, mergo.WithOverride, mergo.WithAppendSlice)
if err != nil {
return err
}
v.md = md
return v.ValidateV2()
// Replace entire sections instead of merging map's values.
for k, v := range from.Plugins {
to.Plugins[k] = v
}
for k, v := range from.StreamProcessors {
to.StreamProcessors[k] = v
}
for k, v := range from.ProxyPlugins {
to.ProxyPlugins[k] = v
}
return nil
}
// V1DisabledFilter matches based on ID

View File

@@ -40,6 +40,7 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/pkg/dialer"
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/plugin"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/containerd/containerd/snapshots"
@@ -77,12 +78,19 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
if err := apply(ctx, config); err != nil {
return nil, err
}
for key, sec := range config.Timeouts {
d, err := time.ParseDuration(sec)
if err != nil {
return nil, errors.Errorf("unable to parse %s into a time duration", sec)
}
timeout.Set(key, d)
}
plugins, err := LoadPlugins(ctx, config)
if err != nil {
return nil, err
}
for _, p := range config.StreamProcessors {
diff.RegisterProcessor(diff.BinaryHandler(p.ID, p.Returns, p.Accepts, p.Path, p.Args))
for id, p := range config.StreamProcessors {
diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args))
}
serverOpts := []grpc.ServerOption{
@@ -146,6 +154,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
)
initContext.Events = s.events
initContext.Address = config.GRPC.Address
initContext.TTRPCAddress = config.TTRPC.Address
// load the plugin specific configuration if it is provided
if p.Config != nil {

View File

@@ -40,6 +40,7 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes"
@@ -61,6 +62,10 @@ var (
empty = &ptypes.Empty{}
)
const (
stateTimeout = "io.containerd.timeout.task.state"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
@@ -68,6 +73,8 @@ func init() {
Requires: tasksServiceRequires,
InitFn: initFunc,
})
timeout.Set(stateTimeout, 2*time.Second)
}
func initFunc(ic *plugin.InitContext) (interface{}, error) {
@@ -266,7 +273,7 @@ func (l *local) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest,
}
func getProcessState(ctx context.Context, p runtime.Process) (*task.Process, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
ctx, cancel := timeout.WithContext(ctx, stateTimeout)
defer cancel()
state, err := p.State(ctx)

View File

@@ -16,7 +16,7 @@
limitations under the License.
*/
package shim
package reaper
import (
"os/exec"
@@ -31,37 +31,61 @@ import (
// ErrNoSuchProcess is returned when the process no longer exists
var ErrNoSuchProcess = errors.New("no such process")
const bufferSize = 2048
const bufferSize = 32
type subscriber struct {
sync.Mutex
c chan runc.Exit
closed bool
}
func (s *subscriber) close() {
s.Lock()
if s.closed {
s.Unlock()
return
}
close(s.c)
s.closed = true
s.Unlock()
}
func (s *subscriber) do(fn func()) {
s.Lock()
fn()
s.Unlock()
}
// Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels
func Reap() error {
now := time.Now()
exits, err := sys.Reap(false)
Default.Lock()
for c := range Default.subscribers {
for _, e := range exits {
c <- runc.Exit{
Timestamp: now,
Pid: e.Pid,
Status: e.Status,
}
for _, e := range exits {
done := Default.notify(runc.Exit{
Timestamp: now,
Pid: e.Pid,
Status: e.Status,
})
select {
case <-done:
case <-time.After(1 * time.Second):
}
}
Default.Unlock()
return err
}
// Default is the default monitor initialized for the package
var Default = &Monitor{
subscribers: make(map[chan runc.Exit]struct{}),
subscribers: make(map[chan runc.Exit]*subscriber),
}
// Monitor monitors the underlying system for process status changes
type Monitor struct {
sync.Mutex
subscribers map[chan runc.Exit]struct{}
subscribers map[chan runc.Exit]*subscriber
}
// Start starts the command a registers the process with the reaper
@@ -95,7 +119,9 @@ func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
func (m *Monitor) Subscribe() chan runc.Exit {
c := make(chan runc.Exit, bufferSize)
m.Lock()
m.subscribers[c] = struct{}{}
m.subscribers[c] = &subscriber{
c: c,
}
m.Unlock()
return c
}
@@ -103,7 +129,74 @@ func (m *Monitor) Subscribe() chan runc.Exit {
// Unsubscribe to process exit changes
func (m *Monitor) Unsubscribe(c chan runc.Exit) {
m.Lock()
s, ok := m.subscribers[c]
if !ok {
m.Unlock()
return
}
s.close()
delete(m.subscribers, c)
close(c)
m.Unlock()
}
func (m *Monitor) getSubscribers() map[chan runc.Exit]*subscriber {
out := make(map[chan runc.Exit]*subscriber)
m.Lock()
for k, v := range m.subscribers {
out[k] = v
}
m.Unlock()
return out
}
func (m *Monitor) notify(e runc.Exit) chan struct{} {
const timeout = 1 * time.Millisecond
var (
done = make(chan struct{}, 1)
timer = time.NewTimer(timeout)
success = make(map[chan runc.Exit]struct{})
)
stop(timer, true)
go func() {
defer close(done)
for {
var (
failed int
subscribers = m.getSubscribers()
)
for _, s := range subscribers {
s.do(func() {
if s.closed {
return
}
if _, ok := success[s.c]; ok {
return
}
timer.Reset(timeout)
recv := true
select {
case s.c <- e:
success[s.c] = struct{}{}
case <-timer.C:
recv = false
failed++
}
stop(timer, recv)
})
}
// all subscribers received the message
if failed == 0 {
return
}
}
}()
return done
}
func stop(timer *time.Timer, recv bool) {
if !timer.Stop() && recv {
<-timer.C
}
}

247
vendor/github.com/containerd/containerd/unpacker.go generated vendored Normal file
View File

@@ -0,0 +1,247 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package containerd
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/rootfs"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
type layerState struct {
layer rootfs.Layer
downloaded bool
unpacked bool
}
type unpacker struct {
updateCh chan ocispec.Descriptor
snapshotter string
config UnpackConfig
c *Client
}
func (c *Client) newUnpacker(ctx context.Context, rCtx *RemoteContext) (*unpacker, error) {
snapshotter, err := c.resolveSnapshotterName(ctx, rCtx.Snapshotter)
if err != nil {
return nil, err
}
var config UnpackConfig
for _, o := range rCtx.UnpackOpts {
if err := o(ctx, &config); err != nil {
return nil, err
}
}
return &unpacker{
updateCh: make(chan ocispec.Descriptor, 128),
snapshotter: snapshotter,
config: config,
c: c,
}, nil
}
func (u *unpacker) unpack(ctx context.Context, config ocispec.Descriptor, layers []ocispec.Descriptor) error {
p, err := content.ReadBlob(ctx, u.c.ContentStore(), config)
if err != nil {
return err
}
var i ocispec.Image
if err := json.Unmarshal(p, &i); err != nil {
return errors.Wrap(err, "unmarshal image config")
}
diffIDs := i.RootFS.DiffIDs
if len(layers) != len(diffIDs) {
return errors.Errorf("number of layers and diffIDs don't match: %d != %d", len(layers), len(diffIDs))
}
var (
sn = u.c.SnapshotService(u.snapshotter)
a = u.c.DiffService()
cs = u.c.ContentStore()
states []layerState
chain []digest.Digest
)
for i, desc := range layers {
states = append(states, layerState{
layer: rootfs.Layer{
Blob: desc,
Diff: ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageLayer,
Digest: diffIDs[i],
},
},
})
}
for {
var layer ocispec.Descriptor
select {
case layer = <-u.updateCh:
case <-ctx.Done():
return ctx.Err()
}
log.G(ctx).WithField("desc", layer).Debug("layer downloaded")
for i := range states {
if states[i].layer.Blob.Digest != layer.Digest {
continue
}
// Different layers may have the same digest. When that
// happens, we should continue marking the next layer
// as downloaded.
if states[i].downloaded {
continue
}
states[i].downloaded = true
break
}
for i := range states {
if !states[i].downloaded {
break
}
if states[i].unpacked {
continue
}
log.G(ctx).WithFields(logrus.Fields{
"desc": states[i].layer.Blob,
"diff": states[i].layer.Diff,
}).Debug("unpack layer")
unpacked, err := rootfs.ApplyLayerWithOpts(ctx, states[i].layer, chain, sn, a,
u.config.SnapshotOpts, u.config.ApplyOpts)
if err != nil {
return err
}
if unpacked {
// Set the uncompressed label after the uncompressed
// digest has been verified through apply.
cinfo := content.Info{
Digest: states[i].layer.Blob.Digest,
Labels: map[string]string{
"containerd.io/uncompressed": states[i].layer.Diff.Digest.String(),
},
}
if _, err := cs.Update(ctx, cinfo, "labels.containerd.io/uncompressed"); err != nil {
return err
}
}
chain = append(chain, states[i].layer.Diff.Digest)
states[i].unpacked = true
log.G(ctx).WithFields(logrus.Fields{
"desc": states[i].layer.Blob,
"diff": states[i].layer.Diff,
}).Debug("layer unpacked")
}
// Check whether all layers are unpacked.
if states[len(states)-1].unpacked {
break
}
}
chainID := identity.ChainID(chain).String()
cinfo := content.Info{
Digest: config.Digest,
Labels: map[string]string{
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", u.snapshotter): chainID,
},
}
_, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", u.snapshotter))
if err != nil {
return err
}
log.G(ctx).WithFields(logrus.Fields{
"config": config.Digest,
"chainID": chainID,
}).Debug("image unpacked")
return nil
}
func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(images.Handler) images.Handler, *errgroup.Group) {
eg, uctx := errgroup.WithContext(uctx)
return func(f images.Handler) images.Handler {
var (
lock sync.Mutex
layers []ocispec.Descriptor
schema1 bool
)
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
children, err := f.Handle(ctx, desc)
if err != nil {
return children, err
}
// `Pull` only supports one platform, so there is only
// one manifest to handle, and manifest list can be
// safely skipped.
// TODO: support multi-platform unpack.
switch desc.MediaType {
case images.MediaTypeDockerSchema1Manifest:
lock.Lock()
schema1 = true
lock.Unlock()
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
lock.Lock()
for _, child := range children {
if child.MediaType == images.MediaTypeDockerSchema2Config ||
child.MediaType == ocispec.MediaTypeImageConfig {
continue
}
layers = append(layers, child)
}
lock.Unlock()
case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
lock.Lock()
l := append([]ocispec.Descriptor{}, layers...)
lock.Unlock()
if len(l) > 0 {
atomic.AddInt32(unpacks, 1)
eg.Go(func() error {
return u.unpack(uctx, desc, l)
})
}
case images.MediaTypeDockerSchema2Layer, images.MediaTypeDockerSchema2LayerGzip,
images.MediaTypeDockerSchema2LayerForeign, images.MediaTypeDockerSchema2LayerForeignGzip,
ocispec.MediaTypeImageLayer, ocispec.MediaTypeImageLayerGzip,
ocispec.MediaTypeImageLayerNonDistributable, ocispec.MediaTypeImageLayerNonDistributableGzip,
images.MediaTypeDockerSchema2LayerEnc, images.MediaTypeDockerSchema2LayerGzipEnc:
lock.Lock()
update := !schema1
lock.Unlock()
if update {
u.updateCh <- desc
}
}
return children, nil
})
}, eg
}

View File

@@ -2,9 +2,9 @@ github.com/containerd/go-runc 9007c2405372fe28918845901a3276c0915689a1
github.com/containerd/console 0650fd9eeb50bab4fc99dceb9f2e14cf58f36e7f
github.com/containerd/cgroups c4b9ac5c7601384c965b9646fc515884e091ebb9
github.com/containerd/typeurl a93fcdb778cd272c6e9b3028b2f42d813e785d40
github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c
github.com/containerd/fifo bda0ff6ed73c67bfb5e62bc9c697f146b7fd7f13
github.com/containerd/btrfs af5082808c833de0e79c1e72eea9fea239364877
github.com/containerd/continuity bd77b46c8352f74eb12c85bdc01f4b90f69d66b4
github.com/containerd/continuity f2a389ac0a02ce21c09edd7344677a601970f41c
github.com/coreos/go-systemd 48702e0da86bd25e76cfef347e2adeb434a0d0a6
github.com/docker/go-metrics 4ea375f7759c82740c893fc030bc37088d2ec098
github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9
@@ -25,37 +25,40 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1
github.com/sirupsen/logrus v1.4.1
github.com/urfave/cli 7bc6a0acffa589f415f88aca16cc1de5ffd66f9c
golang.org/x/net f3200d17e092c607f615320ecaad13d87ad9a2b3
google.golang.org/grpc 25c4f928eaa6d96443009bd842389fb4fa48664e # v1.20.1
google.golang.org/grpc 6eaf6f47437a6b4e2153a190160ef39a92c7eceb # v1.23.0
github.com/pkg/errors v0.8.1
github.com/opencontainers/go-digest c9281466c8b2f606084ac71339773efd177436e7
golang.org/x/sys 4c4f7f33c9ed00de01c4c741d2177abfcfe19307 https://github.com/golang/sys
golang.org/x/sys 9eafafc0a87e0fd0aeeba439a4573537970c44c7 https://github.com/golang/sys
github.com/opencontainers/image-spec v1.0.1
golang.org/x/sync 42b317875d0fa942474b76e1b46a6060d720ae6e
github.com/BurntSushi/toml v0.3.1
github.com/grpc-ecosystem/go-grpc-prometheus 6b7015e65d366bf3f19b2b2a000a831940f0f7e0
github.com/Microsoft/go-winio v0.4.14
github.com/Microsoft/hcsshim 8abdbb8205e4192c68b5f84c31197156f31be517
github.com/Microsoft/hcsshim 9e921883ac929bbe515b39793ece99ce3a9d7706
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4
github.com/containerd/ttrpc 1fb3814edf44a76e0ccf503decf726d994919a9a
github.com/containerd/ttrpc 92c8520ef9f86600c650dd540266a007bf03670f
github.com/syndtr/gocapability d98352740cb2c55f81556b63d4a1ec64c5a319c2
gotest.tools v2.3.0
github.com/google/go-cmp v0.2.0
go.etcd.io/bbolt 2eb7227adea1d5cf85f0bc2a82b7059b13c2fa68
go.etcd.io/bbolt v1.3.3
github.com/hashicorp/errwrap v1.0.0
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/golang-lru v0.5.3
go.opencensus.io v0.22.0
github.com/imdario/mergo v0.3.7
# cri dependencies
github.com/containerd/cri b213648c5bd0a1d2ee42709c10dff63fbfee3ad7 # master
github.com/containerd/go-cni 22460c018b64cf8bf4151b3ff9c4d077e6a88cbf
github.com/containernetworking/cni v0.6.0
github.com/containernetworking/plugins v0.7.0
github.com/davecgh/go-spew v1.1.0
github.com/containerd/cri 0165d516161e25e52b4ab52a404a00823f8f0ef6 # master
github.com/containerd/go-cni 49fbd9b210f3c8ee3b7fd3cd797aabaf364627c1
github.com/containernetworking/cni v0.7.1
github.com/containernetworking/plugins v0.7.6
github.com/davecgh/go-spew v1.1.1
github.com/docker/distribution 0d3efadf0154c2b8a4e7b6621fff9809655cc580
github.com/docker/docker 86f080cff0914e9694068ed78d503701667c4c00
github.com/docker/spdystream 449fdfce4d962303d702fec724ef0ad181c92528
github.com/emicklei/go-restful v2.2.1
github.com/google/gofuzz 44d81051d367757e1c7c6a5a86423ece9afcf63c
github.com/hashicorp/errwrap 7554cd9344cec97297fa6649b055a8c98c2a1e55
github.com/hashicorp/go-multierror ed905158d87462226a13fe39ddf685ea65f1c11f
github.com/google/gofuzz 24818f796faf91cd76ec7bddd72458fbced7a6c1
github.com/json-iterator/go 1.1.5
github.com/modern-go/reflect2 1.0.1
github.com/modern-go/concurrent 1.0.3
@@ -63,9 +66,9 @@ github.com/opencontainers/selinux v1.2.2
github.com/seccomp/libseccomp-golang v0.9.1
github.com/tchap/go-patricia v2.2.6
golang.org/x/crypto 88737f569e3a9c7ab309cdc09a07fe7fc87233c3
golang.org/x/oauth2 a6bd8cefa1811bd24b86f8902872e4e8225f74c4
golang.org/x/oauth2 9f3314589c9a9136388751d9adae6b0ed400978a
golang.org/x/time f51c12702a4d776e4c1fa9b0fabab841babae631
gopkg.in/inf.v0 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4
gopkg.in/inf.v0 v0.9.0
gopkg.in/yaml.v2 v2.2.1
k8s.io/api kubernetes-1.15.0
k8s.io/apimachinery kubernetes-1.15.0
@@ -78,14 +81,9 @@ k8s.io/utils c2654d5206da6b7b6ace12841e8f359bb89b443c
sigs.k8s.io/yaml v1.1.0
# zfs dependencies
github.com/containerd/zfs 31af176f2ae84fe142ef2655bf7bb2aa618b3b1f
github.com/containerd/zfs 2ceb2dbb8154202ed1b8fd32e4ea25b491d7b251
github.com/mistifyio/go-zfs f784269be439d704d3dfa1906f45dd848fed2beb
github.com/google/uuid v1.1.1
# aufs dependencies
github.com/containerd/aufs f894a800659b6e11c1a13084abd1712f346e349c
# image encryption dependencies
gopkg.in/square/go-jose.v2 8254d6c783765f38c8675fae4427a1fe73fbd09d https://github.com/square/go-jose.git
github.com/fullsailor/pkcs7 8306686428a5fe132eac8cb7c4848af725098bd4
github.com/miscreant/miscreant-go 325cbd69228b6af571a635f7502586a920a2749a https://github.com/miscreant/miscreant.go

View File

@@ -1,6 +1,7 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
https://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
@@ -175,28 +176,16 @@
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Copyright The containerd Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@@ -72,3 +72,13 @@ If you change the proto file you will need to rebuild the generated Go with `go
```console
$ go generate ./proto
```
## Project details
continuity is a containerd sub-project, licensed under the [Apache 2.0 license](./LICENSE).
As a containerd sub-project, you will find the:
* [Project governance](https://github.com/containerd/project/blob/master/GOVERNANCE.md),
* [Maintainers](https://github.com/containerd/project/blob/master/MAINTAINERS),
* and [Contributing guidelines](https://github.com/containerd/project/blob/master/CONTRIBUTING.md)
information in our [`containerd/project`](https://github.com/containerd/project) repository.

View File

@@ -32,14 +32,49 @@ var bufferPool = &sync.Pool{
},
}
// CopyDir copies the directory from src to dst.
// Most efficient copy of files is attempted.
func CopyDir(dst, src string) error {
inodes := map[uint64]string{}
return copyDirectory(dst, src, inodes)
// XAttrErrorHandlers transform a non-nil xattr error.
// Return nil to ignore an error.
// xattrKey can be empty for listxattr operation.
type XAttrErrorHandler func(dst, src, xattrKey string, err error) error
type copyDirOpts struct {
xeh XAttrErrorHandler
}
func copyDirectory(dst, src string, inodes map[uint64]string) error {
type CopyDirOpt func(*copyDirOpts) error
// WithXAttrErrorHandler allows specifying XAttrErrorHandler
// If nil XAttrErrorHandler is specified (default), CopyDir stops
// on a non-nil xattr error.
func WithXAttrErrorHandler(xeh XAttrErrorHandler) CopyDirOpt {
return func(o *copyDirOpts) error {
o.xeh = xeh
return nil
}
}
// WithAllowXAttrErrors allows ignoring xattr errors.
func WithAllowXAttrErrors() CopyDirOpt {
xeh := func(dst, src, xattrKey string, err error) error {
return nil
}
return WithXAttrErrorHandler(xeh)
}
// CopyDir copies the directory from src to dst.
// Most efficient copy of files is attempted.
func CopyDir(dst, src string, opts ...CopyDirOpt) error {
var o copyDirOpts
for _, opt := range opts {
if err := opt(&o); err != nil {
return err
}
}
inodes := map[uint64]string{}
return copyDirectory(dst, src, inodes, &o)
}
func copyDirectory(dst, src string, inodes map[uint64]string, o *copyDirOpts) error {
stat, err := os.Stat(src)
if err != nil {
return errors.Wrapf(err, "failed to stat %s", src)
@@ -75,7 +110,7 @@ func copyDirectory(dst, src string, inodes map[uint64]string) error {
switch {
case fi.IsDir():
if err := copyDirectory(target, source, inodes); err != nil {
if err := copyDirectory(target, source, inodes, o); err != nil {
return err
}
continue
@@ -111,7 +146,7 @@ func copyDirectory(dst, src string, inodes map[uint64]string) error {
return errors.Wrap(err, "failed to copy file info")
}
if err := copyXAttrs(target, source); err != nil {
if err := copyXAttrs(target, source, o.xeh); err != nil {
return errors.Wrap(err, "failed to copy xattrs")
}
}

View File

@@ -59,6 +59,8 @@ func copyFileInfo(fi os.FileInfo, name string) error {
return nil
}
const maxSSizeT = int64(^uint(0) >> 1)
func copyFileContent(dst, src *os.File) error {
st, err := src.Stat()
if err != nil {
@@ -71,7 +73,16 @@ func copyFileContent(dst, src *os.File) error {
dstFd := int(dst.Fd())
for size > 0 {
n, err := unix.CopyFileRange(srcFd, nil, dstFd, nil, int(size), 0)
// Ensure that we are never trying to copy more than SSIZE_MAX at a
// time and at the same time avoids overflows when the file is larger
// than 4GB on 32-bit systems.
var copySize int
if size > maxSSizeT {
copySize = int(maxSSizeT)
} else {
copySize = int(size)
}
n, err := unix.CopyFileRange(srcFd, nil, dstFd, nil, copySize, 0)
if err != nil {
if (err != unix.ENOSYS && err != unix.EXDEV) || !first {
return errors.Wrap(err, "copy file range failed")
@@ -90,18 +101,34 @@ func copyFileContent(dst, src *os.File) error {
return nil
}
func copyXAttrs(dst, src string) error {
func copyXAttrs(dst, src string, xeh XAttrErrorHandler) error {
xattrKeys, err := sysx.LListxattr(src)
if err != nil {
return errors.Wrapf(err, "failed to list xattrs on %s", src)
e := errors.Wrapf(err, "failed to list xattrs on %s", src)
if xeh != nil {
e = xeh(dst, src, "", e)
}
return e
}
for _, xattr := range xattrKeys {
data, err := sysx.LGetxattr(src, xattr)
if err != nil {
return errors.Wrapf(err, "failed to get xattr %q on %s", xattr, src)
e := errors.Wrapf(err, "failed to get xattr %q on %s", xattr, src)
if xeh != nil {
if e = xeh(dst, src, xattr, e); e == nil {
continue
}
}
return e
}
if err := sysx.LSetxattr(dst, xattr, data, 0); err != nil {
return errors.Wrapf(err, "failed to set xattr %q on %s", xattr, dst)
e := errors.Wrapf(err, "failed to set xattr %q on %s", xattr, dst)
if xeh != nil {
if e = xeh(dst, src, xattr, e); e == nil {
continue
}
}
return e
}
}

View File

@@ -69,18 +69,34 @@ func copyFileContent(dst, src *os.File) error {
return err
}
func copyXAttrs(dst, src string) error {
func copyXAttrs(dst, src string, xeh XAttrErrorHandler) error {
xattrKeys, err := sysx.LListxattr(src)
if err != nil {
return errors.Wrapf(err, "failed to list xattrs on %s", src)
e := errors.Wrapf(err, "failed to list xattrs on %s", src)
if xeh != nil {
e = xeh(dst, src, "", e)
}
return e
}
for _, xattr := range xattrKeys {
data, err := sysx.LGetxattr(src, xattr)
if err != nil {
return errors.Wrapf(err, "failed to get xattr %q on %s", xattr, src)
e := errors.Wrapf(err, "failed to get xattr %q on %s", xattr, src)
if xeh != nil {
if e = xeh(dst, src, xattr, e); e == nil {
continue
}
}
return e
}
if err := sysx.LSetxattr(dst, xattr, data, 0); err != nil {
return errors.Wrapf(err, "failed to set xattr %q on %s", xattr, dst)
e := errors.Wrapf(err, "failed to set xattr %q on %s", xattr, dst)
if xeh != nil {
if e = xeh(dst, src, xattr, e); e == nil {
continue
}
}
return e
}
}

View File

@@ -40,7 +40,7 @@ func copyFileContent(dst, src *os.File) error {
return err
}
func copyXAttrs(dst, src string) error {
func copyXAttrs(dst, src string, xeh XAttrErrorHandler) error {
return nil
}

View File

@@ -22,7 +22,6 @@ import (
"io"
"os"
"path/filepath"
"strings"
"github.com/pkg/errors"
)
@@ -47,9 +46,8 @@ func pathChange(lower, upper *currentPath) (ChangeKind, string) {
if upper == nil {
return ChangeKindDelete, lower.path
}
// TODO: compare by directory
switch i := strings.Compare(lower.path, upper.path); {
switch i := directoryCompare(lower.path, upper.path); {
case i < 0:
// File in lower that is not in upper
return ChangeKindDelete, lower.path
@@ -61,6 +59,35 @@ func pathChange(lower, upper *currentPath) (ChangeKind, string) {
}
}
func directoryCompare(a, b string) int {
l := len(a)
if len(b) < l {
l = len(b)
}
for i := 0; i < l; i++ {
c1, c2 := a[i], b[i]
if c1 == filepath.Separator {
c1 = byte(0)
}
if c2 == filepath.Separator {
c2 = byte(0)
}
if c1 < c2 {
return -1
}
if c1 > c2 {
return +1
}
}
if len(a) < len(b) {
return -1
}
if len(a) > len(b) {
return +1
}
return 0
}
func sameFile(f1, f2 *currentPath) (bool, error) {
if os.SameFile(f1.f, f2.f) {
return true, nil

30
vendor/github.com/containerd/fifo/errors.go generated vendored Normal file
View File

@@ -0,0 +1,30 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fifo
import (
"errors"
)
var (
ErrClosed = errors.New("fifo closed")
ErrCtrlClosed = errors.New("control of closed fifo")
ErrRdFrmWRONLY = errors.New("reading from write-only fifo")
ErrReadClosed = errors.New("reading from a closed fifo")
ErrWrToRDONLY = errors.New("writing to read-only fifo")
ErrWriteClosed = errors.New("writing to a closed fifo")
)

View File

@@ -147,7 +147,7 @@ func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.Re
// Read from a fifo to a byte array.
func (f *fifo) Read(b []byte) (int, error) {
if f.flag&syscall.O_WRONLY > 0 {
return 0, errors.New("reading from write-only fifo")
return 0, ErrRdFrmWRONLY
}
select {
case <-f.opened:
@@ -158,14 +158,14 @@ func (f *fifo) Read(b []byte) (int, error) {
case <-f.opened:
return f.file.Read(b)
case <-f.closed:
return 0, errors.New("reading from a closed fifo")
return 0, ErrReadClosed
}
}
// Write from byte array to a fifo.
func (f *fifo) Write(b []byte) (int, error) {
if f.flag&(syscall.O_WRONLY|syscall.O_RDWR) == 0 {
return 0, errors.New("writing to read-only fifo")
return 0, ErrWrToRDONLY
}
select {
case <-f.opened:
@@ -176,7 +176,7 @@ func (f *fifo) Write(b []byte) (int, error) {
case <-f.opened:
return f.file.Write(b)
case <-f.closed:
return 0, errors.New("writing to a closed fifo")
return 0, ErrWriteClosed
}
}

114
vendor/github.com/containerd/fifo/raw.go generated vendored Normal file
View File

@@ -0,0 +1,114 @@
// +build go1.12
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fifo
import (
"syscall"
)
// SyscallConn provides raw access to the fifo's underlying filedescrptor.
// See syscall.Conn for guarentees provided by this interface.
func (f *fifo) SyscallConn() (syscall.RawConn, error) {
// deterministic check for closed
select {
case <-f.closed:
return nil, ErrClosed
default:
}
select {
case <-f.closed:
return nil, ErrClosed
case <-f.opened:
return f.file.SyscallConn()
default:
}
// Not opened and not closed, this means open is non-blocking AND it's not open yet
// Use rawConn to deal with non-blocking open.
rc := &rawConn{f: f, ready: make(chan struct{})}
go func() {
select {
case <-f.closed:
return
case <-f.opened:
rc.raw, rc.err = f.file.SyscallConn()
close(rc.ready)
}
}()
return rc, nil
}
type rawConn struct {
f *fifo
ready chan struct{}
raw syscall.RawConn
err error
}
func (r *rawConn) Control(f func(fd uintptr)) error {
select {
case <-r.f.closed:
return ErrCtrlClosed
case <-r.ready:
}
if r.err != nil {
return r.err
}
return r.raw.Control(f)
}
func (r *rawConn) Read(f func(fd uintptr) (done bool)) error {
if r.f.flag&syscall.O_WRONLY > 0 {
return ErrRdFrmWRONLY
}
select {
case <-r.f.closed:
return ErrReadClosed
case <-r.ready:
}
if r.err != nil {
return r.err
}
return r.raw.Read(f)
}
func (r *rawConn) Write(f func(fd uintptr) (done bool)) error {
if r.f.flag&(syscall.O_WRONLY|syscall.O_RDWR) == 0 {
return ErrWrToRDONLY
}
select {
case <-r.f.closed:
return ErrWriteClosed
case <-r.ready:
}
if r.err != nil {
return r.err
}
return r.raw.Write(f)
}

View File

@@ -1,6 +1,7 @@
### fifo
[![Build Status](https://travis-ci.org/containerd/fifo.svg?branch=master)](https://travis-ci.org/containerd/fifo)
[![codecov](https://codecov.io/gh/containerd/fifo/branch/master/graph/badge.svg)](https://codecov.io/gh/containerd/fifo)
Go package for handling fifos in a sane way.
@@ -30,3 +31,14 @@ func (f *fifo) Write(b []byte) (int, error)
// before open(2) has returned and fifo was never opened.
func (f *fifo) Close() error
```
## Project details
The fifo is a containerd sub-project, licensed under the [Apache 2.0 license](./LICENSE).
As a containerd sub-project, you will find the:
* [Project governance](https://github.com/containerd/project/blob/master/GOVERNANCE.md),
* [Maintainers](https://github.com/containerd/project/blob/master/MAINTAINERS),
* and [Contributing guidelines](https://github.com/containerd/project/blob/master/CONTRIBUTING.md)
information in our [`containerd/project`](https://github.com/containerd/project) repository.

View File

@@ -29,6 +29,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -134,11 +135,10 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
return err
}
if cresp.Status == nil {
return errors.New("no status provided on response")
if cresp.Status != nil && cresp.Status.Code != int32(codes.OK) {
return status.ErrorProto(cresp.Status)
}
return status.ErrorProto(cresp.Status)
return nil
}
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {

View File

@@ -152,5 +152,5 @@ func convertCode(err error) codes.Code {
}
func fullPath(service, method string) string {
return "/" + path.Join("/", service, method)
return "/" + path.Join(service, method)
}