Update cri to v1.11.0.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu
2018-06-28 18:29:10 +00:00
parent e51ddf43e9
commit f530a3c267
267 changed files with 23066 additions and 15230 deletions

View File

@@ -96,6 +96,10 @@ type PluginConfig struct {
SystemdCgroup bool `toml:"systemd_cgroup" json:"systemdCgroup"`
// EnableTLSStreaming indicates to enable the TLS streaming support.
EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"`
// MaxContainerLogLineSize is the maximum log line size in bytes for a container.
// Log line longer than the limit will be split into multiple lines. Non-positive
// value means no limit.
MaxContainerLogLineSize int `toml:"max_container_log_line_size" json:"maxContainerLogSize"`
}
// Config contains all configurations for cri server.
@@ -129,13 +133,14 @@ func DefaultConfig() PluginConfig {
Root: "",
},
},
StreamServerAddress: "",
StreamServerPort: "10010",
EnableSelinux: false,
EnableTLSStreaming: false,
SandboxImage: "k8s.gcr.io/pause:3.1",
StatsCollectPeriod: 10,
SystemdCgroup: false,
StreamServerAddress: "",
StreamServerPort: "10010",
EnableSelinux: false,
EnableTLSStreaming: false,
SandboxImage: "k8s.gcr.io/pause:3.1",
StatsCollectPeriod: 10,
SystemdCgroup: false,
MaxContainerLogLineSize: 16 * 1024,
Registry: Registry{
Mirrors: map[string]Mirror{
"docker.io": {

View File

@@ -53,7 +53,6 @@ func WithNewSnapshot(id string, i containerd.Image) containerd.NewContainerOpts
// WithVolumes copies ownership of volume in rootfs to its corresponding host path.
// It doesn't update runtime spec.
// The passed in map is a host path to container path map for all volumes.
// TODO(random-liu): Figure out whether we need to copy volume content.
func WithVolumes(volumeMounts map[string]string) containerd.NewContainerOpts {
return func(ctx context.Context, client *containerd.Client, c *containers.Container) (err error) {
if c.Snapshotter == "" {
@@ -108,14 +107,6 @@ func WithVolumes(volumeMounts map[string]string) containerd.NewContainerOpts {
// copyExistingContents copies from the source to the destination and
// ensures the ownership is appropriately set.
func copyExistingContents(source, destination string) error {
srcList, err := ioutil.ReadDir(source)
if err != nil {
return err
}
if len(srcList) == 0 {
// Skip copying if source directory is empty.
return nil
}
dstList, err := ioutil.ReadDir(destination)
if err != nil {
return err

View File

@@ -16,7 +16,10 @@ limitations under the License.
package ioutil
import "io"
import (
"io"
"sync"
)
// writeCloseInformer wraps passed in write closer with a close channel.
// Caller could wait on the close channel for the write closer to be
@@ -66,3 +69,34 @@ func (n *nopWriteCloser) Write(p []byte) (int, error) {
func (n *nopWriteCloser) Close() error {
return nil
}
// serialWriteCloser wraps a write closer and makes sure all writes
// are done in serial.
// Parallel write won't intersect with each other. Use case:
// 1) Pipe: Write content longer than PIPE_BUF.
// See http://man7.org/linux/man-pages/man7/pipe.7.html
// 2) <3.14 Linux Kernel: write is not atomic
// See http://man7.org/linux/man-pages/man2/write.2.html
type serialWriteCloser struct {
mu sync.Mutex
wc io.WriteCloser
}
// NewSerialWriteCloser creates a SerialWriteCloser from a write closer.
func NewSerialWriteCloser(wc io.WriteCloser) io.WriteCloser {
return &serialWriteCloser{wc: wc}
}
// Write writes a group of byte arrays in order atomically.
func (s *serialWriteCloser) Write(data []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.wc.Write(data)
}
// Close closes the write closer.
func (s *serialWriteCloser) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.wc.Close()
}

View File

@@ -372,6 +372,11 @@ func (c *criService) generateContainerSpec(id string, sandboxID string, sandboxP
securityContext.GetCapabilities())
}
}
// Clear all ambient capabilities. The implication of non-root + caps
// is not clearly defined in Kubernetes.
// See https://github.com/kubernetes/kubernetes/issues/56374
// Keep docker's behavior for now.
g.Config.Process.Capabilities.Ambient = []string{}
g.SetProcessSelinuxLabel(processLabel)
g.SetLinuxMountLabel(mountLabel)
@@ -402,7 +407,7 @@ func (c *criService) generateContainerSpec(id string, sandboxID string, sandboxP
g.AddAnnotation(annotations.ContainerType, annotations.ContainerTypeContainer)
g.AddAnnotation(annotations.SandboxID, sandboxID)
return g.Spec(), nil
return g.Config, nil
}
// generateVolumeMounts sets up image volumes for container. Rely on the removal of container
@@ -528,7 +533,7 @@ func clearReadOnly(m *runtimespec.Mount) {
// addDevices set device mapping without privilege.
func (c *criService) addOCIDevices(g *generate.Generator, devs []*runtime.Device) error {
spec := g.Spec()
spec := g.Config
for _, device := range devs {
path, err := c.os.ResolveSymbolicLink(device.HostPath)
if err != nil {
@@ -560,7 +565,7 @@ func (c *criService) addOCIDevices(g *generate.Generator, devs []*runtime.Device
// addDevices set device mapping with privilege.
func setOCIDevicesPrivileged(g *generate.Generator) error {
spec := g.Spec()
spec := g.Config
hostDevices, err := devices.HostDevices()
if err != nil {
return err
@@ -592,7 +597,12 @@ func setOCIDevicesPrivileged(g *generate.Generator) error {
// addOCIBindMounts adds bind mounts.
func (c *criService) addOCIBindMounts(g *generate.Generator, mounts []*runtime.Mount, mountLabel string) error {
// Mount cgroup into the container as readonly, which inherits docker's behavior.
g.AddCgroupsMount("ro") // nolint: errcheck
g.AddMount(runtimespec.Mount{
Source: "cgroup",
Destination: "/sys/fs/cgroup",
Type: "cgroup",
Options: []string{"nosuid", "noexec", "nodev", "relatime", "ro"},
})
for _, mount := range mounts {
dst := mount.GetContainerPath()
src := mount.GetHostPath()
@@ -630,8 +640,8 @@ func (c *criService) addOCIBindMounts(g *generate.Generator, mounts []*runtime.M
return err
}
options = append(options, "rslave")
if g.Spec().Linux.RootfsPropagation != "rshared" &&
g.Spec().Linux.RootfsPropagation != "rslave" {
if g.Config.Linux.RootfsPropagation != "rshared" &&
g.Config.Linux.RootfsPropagation != "rslave" {
g.SetLinuxRootPropagation("rslave") // nolint: errcheck
}
default:
@@ -652,14 +662,19 @@ func (c *criService) addOCIBindMounts(g *generate.Generator, mounts []*runtime.M
return errors.Wrapf(err, "relabel %q with %q failed", src, mountLabel)
}
}
g.AddBindMount(src, dst, options)
g.AddMount(runtimespec.Mount{
Source: src,
Destination: dst,
Type: "bind",
Options: options,
})
}
return nil
}
func setOCIBindMountsPrivileged(g *generate.Generator) {
spec := g.Spec()
spec := g.Config
// clear readonly for /sys and cgroup
for i, m := range spec.Mounts {
if spec.Mounts[i].Destination == "/sys" {
@@ -699,6 +714,40 @@ func getOCICapabilitiesList() []string {
return caps
}
// Adds capabilities to all sets relevant to root (bounding, permitted, effective, inheritable)
func addProcessRootCapability(g *generate.Generator, c string) error {
if err := g.AddProcessCapabilityBounding(c); err != nil {
return err
}
if err := g.AddProcessCapabilityPermitted(c); err != nil {
return err
}
if err := g.AddProcessCapabilityEffective(c); err != nil {
return err
}
if err := g.AddProcessCapabilityInheritable(c); err != nil {
return err
}
return nil
}
// Drops capabilities to all sets relevant to root (bounding, permitted, effective, inheritable)
func dropProcessRootCapability(g *generate.Generator, c string) error {
if err := g.DropProcessCapabilityBounding(c); err != nil {
return err
}
if err := g.DropProcessCapabilityPermitted(c); err != nil {
return err
}
if err := g.DropProcessCapabilityEffective(c); err != nil {
return err
}
if err := g.DropProcessCapabilityInheritable(c); err != nil {
return err
}
return nil
}
// setOCICapabilities adds/drops process capabilities.
func setOCICapabilities(g *generate.Generator, capabilities *runtime.Capability) error {
if capabilities == nil {
@@ -711,14 +760,14 @@ func setOCICapabilities(g *generate.Generator, capabilities *runtime.Capability)
// will be all capabilities without `CAP_CHOWN`.
if util.InStringSlice(capabilities.GetAddCapabilities(), "ALL") {
for _, c := range getOCICapabilitiesList() {
if err := g.AddProcessCapability(c); err != nil {
if err := addProcessRootCapability(g, c); err != nil {
return err
}
}
}
if util.InStringSlice(capabilities.GetDropCapabilities(), "ALL") {
for _, c := range getOCICapabilitiesList() {
if err := g.DropProcessCapability(c); err != nil {
if err := dropProcessRootCapability(g, c); err != nil {
return err
}
}
@@ -729,7 +778,7 @@ func setOCICapabilities(g *generate.Generator, capabilities *runtime.Capability)
continue
}
// Capabilities in CRI doesn't have `CAP_` prefix, so add it.
if err := g.AddProcessCapability("CAP_" + strings.ToUpper(c)); err != nil {
if err := addProcessRootCapability(g, "CAP_"+strings.ToUpper(c)); err != nil {
return err
}
}
@@ -738,7 +787,7 @@ func setOCICapabilities(g *generate.Generator, capabilities *runtime.Capability)
if strings.ToUpper(c) == "ALL" {
continue
}
if err := g.DropProcessCapability("CAP_" + strings.ToUpper(c)); err != nil {
if err := dropProcessRootCapability(g, "CAP_"+strings.ToUpper(c)); err != nil {
return err
}
}
@@ -772,6 +821,10 @@ func defaultRuntimeSpec(id string) (*runtimespec.Spec, error) {
if mount.Destination == "/run" {
continue
}
// CRI plugin handles `/dev/shm` itself.
if mount.Destination == "/dev/shm" {
continue
}
mounts = append(mounts, mount)
}
spec.Mounts = mounts

View File

@@ -102,7 +102,7 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp
if opts.tty {
g := newSpecGenerator(spec)
g.AddProcessEnv("TERM", "xterm")
spec = g.Spec()
spec = g.Config
}
pspec := spec.Process
pspec.Args = opts.cmd

View File

@@ -36,7 +36,7 @@ func (c *criService) ReopenContainerLog(ctx context.Context, r *runtime.ReopenCo
}
// Create new container logger and replace the existing ones.
stdoutWC, stderrWC, err := createContainerLoggers(container.LogPath, container.Config.GetTty())
stdoutWC, stderrWC, err := c.createContainerLoggers(container.LogPath, container.Config.GetTty())
if err != nil {
return nil, err
}

View File

@@ -18,6 +18,7 @@ package server
import (
"io"
"os"
"time"
"github.com/containerd/containerd"
@@ -29,6 +30,7 @@ import (
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
cioutil "github.com/containerd/cri/pkg/ioutil"
cio "github.com/containerd/cri/pkg/server/io"
containerstore "github.com/containerd/cri/pkg/store/container"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
@@ -97,20 +99,10 @@ func (c *criService) startContainer(ctx context.Context,
}
ioCreation := func(id string) (_ containerdio.IO, err error) {
stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, config.GetTty())
stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, config.GetTty())
if err != nil {
return nil, errors.Wrap(err, "failed to create container loggers")
}
defer func() {
if err != nil {
if stdoutWC != nil {
stdoutWC.Close()
}
if stderrWC != nil {
stderrWC.Close()
}
}
}()
cntr.IO.AddOutput("log", stdoutWC, stderrWC)
cntr.IO.Pipe()
return cntr.IO, nil
@@ -142,24 +134,36 @@ func (c *criService) startContainer(ctx context.Context,
return nil
}
// Create container loggers and return write closer for stdout and stderr.
func createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) {
// createContainerLoggers creates container loggers and return write closer for stdout and stderr.
func (c *criService) createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) {
if logPath != "" {
// Only generate container log when log path is specified.
if stdout, err = cio.NewCRILogger(logPath, cio.Stdout); err != nil {
return nil, nil, errors.Wrap(err, "failed to start container stdout logger")
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create and open log file")
}
defer func() {
if err != nil {
stdout.Close()
f.Close()
}
}()
var stdoutCh, stderrCh <-chan struct{}
wc := cioutil.NewSerialWriteCloser(f)
stdout, stdoutCh = cio.NewCRILogger(logPath, wc, cio.Stdout, c.config.MaxContainerLogLineSize)
// Only redirect stderr when there is no tty.
if !tty {
if stderr, err = cio.NewCRILogger(logPath, cio.Stderr); err != nil {
return nil, nil, errors.Wrap(err, "failed to start container stderr logger")
}
stderr, stderrCh = cio.NewCRILogger(logPath, wc, cio.Stderr, c.config.MaxContainerLogLineSize)
}
go func() {
if stdoutCh != nil {
<-stdoutCh
}
if stderrCh != nil {
<-stderrCh
}
logrus.Debugf("Finish redirecting log file %q, closing it", logPath)
f.Close()
}()
} else {
stdout = cio.NewDiscardLogger()
stderr = cio.NewDiscardLogger()

View File

@@ -157,5 +157,5 @@ func updateOCILinuxResource(spec *runtimespec.Spec, new *runtime.LinuxContainerR
g.SetLinuxResourcesCPUMems(new.GetCpusetMems())
}
return g.Spec(), nil
return g.Config, nil
}

View File

@@ -186,6 +186,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
// TODO(random-liu): [P2] Handle containerd-shim exit.
case *eventtypes.TaskExit:
e := any.(*eventtypes.TaskExit)
logrus.Infof("TaskExit event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID)
if err == nil {
if err := handleContainerExit(ctx, e, cntr); err != nil {

View File

@@ -21,10 +21,8 @@ import (
"bytes"
"io"
"io/ioutil"
"os"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
@@ -38,11 +36,8 @@ const (
eol = '\n'
// timestampFormat is the timestamp format used in CRI logging format.
timestampFormat = time.RFC3339Nano
// pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes.
// POSIX.1 says that write less than PIPE_BUF is atmoic.
pipeBufSize = 4096
// bufSize is the size of the read buffer.
bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - len(runtime.LogTagPartial) - 3 /*3 delimiter*/ - 1 /*eol*/
// defaultBufSize is the default size of the read buffer in bytes.
defaultBufSize = 4096
)
// NewDiscardLogger creates logger which discards all the input.
@@ -51,46 +46,91 @@ func NewDiscardLogger() io.WriteCloser {
}
// NewCRILogger returns a write closer which redirect container log into
// log file, and decorate the log line into CRI defined format.
func NewCRILogger(path string, stream StreamType) (io.WriteCloser, error) {
logrus.Debugf("Start writing log file %q", path)
// log file, and decorate the log line into CRI defined format. It also
// returns a channel which indicates whether the logger is stopped.
// maxLen is the max length limit of a line. A line longer than the
// limit will be cut into multiple lines.
func NewCRILogger(path string, w io.Writer, stream StreamType, maxLen int) (io.WriteCloser, <-chan struct{}) {
logrus.Debugf("Start writing stream %q to log file %q", stream, path)
prc, pwc := io.Pipe()
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return nil, errors.Wrap(err, "failed to open log file")
}
go redirectLogs(path, prc, f, stream)
return pwc, nil
stop := make(chan struct{})
go func() {
redirectLogs(path, prc, w, stream, maxLen)
close(stop)
}()
return pwc, stop
}
func redirectLogs(path string, rc io.ReadCloser, wc io.WriteCloser, stream StreamType) {
func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxLen int) {
defer rc.Close()
defer wc.Close()
streamBytes := []byte(stream)
delimiterBytes := []byte{delimiter}
partialBytes := []byte(runtime.LogTagPartial)
fullBytes := []byte(runtime.LogTagFull)
r := bufio.NewReaderSize(rc, bufSize)
for {
lineBytes, isPrefix, err := r.ReadLine()
if err == io.EOF {
logrus.Debugf("Finish redirecting log file %q", path)
return
}
if err != nil {
logrus.WithError(err).Errorf("An error occurred when redirecting log file %q", path)
return
}
tagBytes := fullBytes
if isPrefix {
tagBytes = partialBytes
}
timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano)
data := bytes.Join([][]byte{timestampBytes, streamBytes, tagBytes, lineBytes}, delimiterBytes)
data = append(data, eol)
if _, err := wc.Write(data); err != nil {
logrus.WithError(err).Errorf("Fail to write %q log to log file %q", stream, path)
}
// Continue on write error to drain the input.
var (
stream = []byte(s)
delimiter = []byte{delimiter}
partial = []byte(runtime.LogTagPartial)
full = []byte(runtime.LogTagFull)
buf [][]byte
length int
bufSize = defaultBufSize
)
// Make sure bufSize <= maxLen
if maxLen > 0 && maxLen < bufSize {
bufSize = maxLen
}
r := bufio.NewReaderSize(rc, bufSize)
writeLine := func(tag, line []byte) {
timestamp := time.Now().AppendFormat(nil, timestampFormat)
data := bytes.Join([][]byte{timestamp, stream, tag, line}, delimiter)
data = append(data, eol)
if _, err := w.Write(data); err != nil {
logrus.WithError(err).Errorf("Fail to write %q log to log file %q", s, path)
// Continue on write error to drain the container output.
}
}
for {
var stop bool
newLine, isPrefix, err := r.ReadLine()
if err != nil {
if err == io.EOF {
logrus.Debugf("Getting EOF from stream %q while redirecting to log file %q", s, path)
} else {
logrus.WithError(err).Errorf("An error occurred when redirecting stream %q to log file %q", s, path)
}
if length == 0 {
// No content left to write, break.
break
}
// Stop after writing the content left in buffer.
stop = true
} else {
// Buffer returned by ReadLine will change after
// next read, copy it.
l := make([]byte, len(newLine))
copy(l, newLine)
buf = append(buf, l)
length += len(l)
}
if maxLen > 0 && length > maxLen {
exceedLen := length - maxLen
last := buf[len(buf)-1]
if exceedLen > len(last) {
// exceedLen must <= len(last), or else the buffer
// should have be written in the previous iteration.
panic("exceed length should <= last buffer size")
}
buf[len(buf)-1] = last[:len(last)-exceedLen]
writeLine(partial, bytes.Join(buf, nil))
buf = [][]byte{last[len(last)-exceedLen:]}
length = exceedLen
}
if isPrefix {
continue
}
writeLine(full, bytes.Join(buf, nil))
buf = nil
length = 0
if stop {
break
}
}
logrus.Debugf("Finish redirecting stream %q to log file %q", s, path)
}

View File

@@ -78,9 +78,7 @@ func (c *criService) recover(ctx context.Context) error {
return errors.Wrap(err, "failed to list containers")
}
for _, container := range containers {
containerDir := c.getContainerRootDir(container.ID())
volatileContainerDir := c.getVolatileContainerRootDir(container.ID())
cntr, err := loadContainer(ctx, container, containerDir, volatileContainerDir)
cntr, err := c.loadContainer(ctx, container)
if err != nil {
logrus.WithError(err).Errorf("Failed to load container %q", container.ID())
continue
@@ -149,8 +147,10 @@ func (c *criService) recover(ctx context.Context) error {
}
// loadContainer loads container from containerd and status checkpoint.
func loadContainer(ctx context.Context, cntr containerd.Container, containerDir, volatileContainerDir string) (containerstore.Container, error) {
func (c *criService) loadContainer(ctx context.Context, cntr containerd.Container) (containerstore.Container, error) {
id := cntr.ID()
containerDir := c.getContainerRootDir(id)
volatileContainerDir := c.getVolatileContainerRootDir(id)
var container containerstore.Container
// Load container metadata.
exts, err := cntr.Extensions(ctx)
@@ -176,11 +176,21 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir,
// Load up-to-date status from containerd.
var containerIO *cio.ContainerIO
t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (containerdio.IO, error) {
stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, meta.Config.GetTty())
t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (_ containerdio.IO, err error) {
stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, meta.Config.GetTty())
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if stdoutWC != nil {
stdoutWC.Close()
}
if stderrWC != nil {
stderrWC.Close()
}
}
}()
containerIO, err = cio.NewContainerIO(id,
cio.WithFIFOs(fifos),
)

View File

@@ -344,9 +344,9 @@ func (c *criService) generateSandboxContainerSpec(id string, config *runtime.Pod
g.SetProcessCwd(imageConfig.WorkingDir)
}
if len(imageConfig.Entrypoint) == 0 {
// Pause image must have entrypoint.
return nil, errors.Errorf("invalid empty entrypoint in image config %+v", imageConfig)
if len(imageConfig.Entrypoint) == 0 && len(imageConfig.Cmd) == 0 {
// Pause image must have entrypoint or cmd.
return nil, errors.Errorf("invalid empty entrypoint and cmd in image config %+v", imageConfig)
}
// Set process commands.
g.SetProcessArgs(append(imageConfig.Entrypoint, imageConfig.Cmd...))
@@ -388,6 +388,19 @@ func (c *criService) generateSandboxContainerSpec(id string, config *runtime.Pod
g.RemoveLinuxNamespace(string(runtimespec.IPCNamespace)) // nolint: errcheck
}
// It's fine to generate the spec before the sandbox /dev/shm
// is actually created.
sandboxDevShm := c.getSandboxDevShm(id)
if nsOptions.GetIpc() == runtime.NamespaceMode_NODE {
sandboxDevShm = devShm
}
g.AddMount(runtimespec.Mount{
Source: sandboxDevShm,
Destination: devShm,
Type: "bind",
Options: []string{"rbind", "ro"},
})
selinuxOpt := securityContext.GetSelinuxOptions()
processLabel, mountLabel, err := initSelinuxOpts(selinuxOpt)
if err != nil {
@@ -415,7 +428,7 @@ func (c *criService) generateSandboxContainerSpec(id string, config *runtime.Pod
g.AddAnnotation(annotations.ContainerType, annotations.ContainerTypeSandbox)
g.AddAnnotation(annotations.SandboxID, id)
return g.Spec(), nil
return g.Config, nil
}
// setupSandboxFiles sets up necessary sandbox files including /dev/shm, /etc/hosts
@@ -524,7 +537,7 @@ func (c *criService) setupPod(id string, path string, config *runtime.PodSandbox
}
// Check if the default interface has IP config
if configs, ok := result.Interfaces[defaultIfName]; ok && len(configs.IPConfigs) > 0 {
return configs.IPConfigs[0].IP.String(), nil
return selectPodIP(configs.IPConfigs), nil
}
// If it comes here then the result was invalid so destroy the pod network and return error
if err := c.teardownPod(id, path, config); err != nil {
@@ -550,6 +563,16 @@ func toCNIPortMappings(criPortMappings []*runtime.PortMapping) []cni.PortMapping
return portMappings
}
// selectPodIP select an ip from the ip list. It prefers ipv4 more than ipv6.
func selectPodIP(ipConfigs []*cni.IPConfig) string {
for _, c := range ipConfigs {
if c.IP.To4() != nil {
return c.IP.String()
}
}
return ipConfigs[0].IP.String()
}
// untrustedWorkload returns true if the sandbox contains untrusted workload.
func untrustedWorkload(config *runtime.PodSandboxConfig) bool {
return config.GetAnnotations()[annotations.UntrustedWorkload] == "true"

View File

@@ -128,6 +128,10 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi
selinux.SetDisabled()
}
if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil {
return nil, errors.Errorf("failed to find snapshotter %q", c.config.ContainerdConfig.Snapshotter)
}
c.imageFSPath = imageFSPath(config.ContainerdRootDir, config.ContainerdConfig.Snapshotter)
logrus.Infof("Get image filesystem path %q", c.imageFSPath)
@@ -144,7 +148,7 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi
// Try to load the config if it exists. Just log the error if load fails
// This is not disruptive for containerd to panic
if err := c.netPlugin.Load(cni.WithLoNetwork(), cni.WithDefaultConf()); err != nil {
if err := c.netPlugin.Load(cni.WithLoNetwork, cni.WithDefaultConf); err != nil {
logrus.WithError(err).Error("Failed to load cni during init, please check CRI plugin status before setting up network for pods")
}
// prepare streaming server

View File

@@ -22,6 +22,7 @@ import (
goruntime "runtime"
cni "github.com/containerd/go-cni"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)
@@ -41,14 +42,16 @@ func (c *criService) Status(ctx context.Context, r *runtime.StatusRequest) (*run
Type: runtime.NetworkReady,
Status: true,
}
// Load the latest cni configuration to be in sync with the latest network configuration
if err := c.netPlugin.Load(cni.WithLoNetwork, cni.WithDefaultConf); err != nil {
logrus.WithError(err).Errorf("Failed to load cni configuration")
}
// Check the status of the cni initialization
if err := c.netPlugin.Status(); err != nil {
// If it is not initialized, then load the config and retry
if err = c.netPlugin.Load(cni.WithLoNetwork(), cni.WithDefaultConf()); err != nil {
networkCondition.Status = false
networkCondition.Reason = networkNotReadyReason
networkCondition.Message = fmt.Sprintf("Network plugin returns error: %v", err)
}
networkCondition.Status = false
networkCondition.Reason = networkNotReadyReason
networkCondition.Message = fmt.Sprintf("Network plugin returns error: %v", err)
}
resp := &runtime.StatusResponse{

View File

@@ -52,7 +52,7 @@ func (c *criService) UpdateRuntimeConfig(ctx context.Context, r *runtime.UpdateR
if err := c.netPlugin.Status(); err == nil {
logrus.Infof("Network plugin is ready, skip generating cni config from template %q", confTemplate)
return &runtime.UpdateRuntimeConfigResponse{}, nil
} else if err := c.netPlugin.Load(cni.WithLoNetwork(), cni.WithDefaultConf()); err == nil {
} else if err := c.netPlugin.Load(cni.WithLoNetwork, cni.WithDefaultConf); err == nil {
logrus.Infof("CNI config is successfully loaded, skip generating cni config from template %q", confTemplate)
return &runtime.UpdateRuntimeConfigResponse{}, nil
}

View File

@@ -3,11 +3,11 @@ github.com/blang/semver v3.1.0
github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895
github.com/containerd/cgroups fe281dd265766145e943a034aa41086474ea6130
github.com/containerd/console cb7008ab3d8359b78c5f464cb7cf160107ad5925
github.com/containerd/console 9290d21dc56074581f619579c43d970b4514bc08
github.com/containerd/containerd 84bebdd91d347c99069d1705b7d4e6d6f746160c
github.com/containerd/continuity d3c23511c1bf5851696cba83143d9cbcd666869b
github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c
github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7
github.com/containerd/go-cni 5882530828ecf62032409b298a3e8b19e08b6534
github.com/containerd/go-runc f271fa2021de855d4d918dbef83c5fe19db1bdd5
github.com/containerd/typeurl f6943554a7e7e88b3c14aad190bf05932da84788
github.com/containernetworking/cni v0.6.0
@@ -31,15 +31,17 @@ github.com/google/gofuzz 44d81051d367757e1c7c6a5a86423ece9afcf63c
github.com/grpc-ecosystem/go-grpc-prometheus 6b7015e65d366bf3f19b2b2a000a831940f0f7e0
github.com/hashicorp/errwrap 7554cd9344cec97297fa6649b055a8c98c2a1e55
github.com/hashicorp/go-multierror ed905158d87462226a13fe39ddf685ea65f1c11f
github.com/json-iterator/go 1.0.4
github.com/json-iterator/go f2b4162afba35581b6d4a50d3b8f34e33c144682
github.com/matttproud/golang_protobuf_extensions v1.0.0
github.com/Microsoft/go-winio v0.4.7
github.com/Microsoft/hcsshim v0.6.11
github.com/modern-go/reflect2 05fbef0ca5da472bbf96c9322b84a53edc03c9fd
github.com/modern-go/concurrent 1.0.3
github.com/opencontainers/go-digest c9281466c8b2f606084ac71339773efd177436e7
github.com/opencontainers/image-spec v1.0.1
github.com/opencontainers/runc 69663f0bd4b60df09991c08812a60108003fa340
github.com/opencontainers/runtime-spec v1.0.1
github.com/opencontainers/runtime-tools 6073aff4ac61897f75895123f7e24135204a404d
github.com/opencontainers/runtime-tools v0.6.0
github.com/opencontainers/selinux 4a2974bf1ee960774ffd517717f1f45325af0206
github.com/pkg/errors v0.8.0
github.com/pmezard/go-difflib v1.0.0
@@ -49,12 +51,14 @@ github.com/prometheus/common 89604d197083d4781071d3c65855d24ecfb0a563
github.com/prometheus/procfs cb4147076ac75738c9a7d279075a253c0cc5acbd
github.com/seccomp/libseccomp-golang 32f571b70023028bd57d9288c20efbcb237f3ce0
github.com/sirupsen/logrus v1.0.0
github.com/spf13/pflag v1.0.0
github.com/stevvooe/ttrpc d4528379866b0ce7e9d71f3eb96f0582fc374577
github.com/stretchr/testify v1.1.4
github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16
github.com/tchap/go-patricia 5ad6cdb7538b0097d5598c7e57f0a24072adf7dc
github.com/urfave/cli 7bc6a0acffa589f415f88aca16cc1de5ffd66f9c
github.com/xeipuuv/gojsonpointer 4e3ac2762d5f479393488629ee9370b50873b3a6
github.com/xeipuuv/gojsonreference bd5ef7bd5415a7ac448318e64f11a24cd21e594b
github.com/xeipuuv/gojsonschema 1d523034197ff1f222f6429836dd36a2457a1874
golang.org/x/crypto 49796115aa4b964c318aad4f3084fdb41e9aa067
golang.org/x/net b3756b4b77d7b13260a0a2ec658753cf48922eac
golang.org/x/sync 450f422ab23cf9881c94e2db30cac0eb1b7cf80c
@@ -65,9 +69,9 @@ google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
google.golang.org/grpc v1.12.0
gopkg.in/inf.v0 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4
gopkg.in/yaml.v2 53feefa2559fb8dfa8d81baad31be332c97d6c77
k8s.io/api 7e796de92438aede7cb5d6bcf6c10f4fa65db560
k8s.io/apimachinery fcb9a12f7875d01f8390b28faedc37dcf2e713b9
k8s.io/apiserver 4a8377c547bbff4576a35b5b5bf4026d9b5aa763
k8s.io/client-go b9a0cf870f239c4a4ecfd3feb075a50e7cbe1473
k8s.io/kubernetes v1.10.0
k8s.io/utils 258e2a2fa64568210fbd6267cf1d8fd87c3cb86e
k8s.io/api 9e5ffd1f1320950b238cfce291b926411f0af722
k8s.io/apimachinery ed135c5b96450fd24e5e981c708114fbbd950697
k8s.io/apiserver a90e3a95c2e91b944bfca8225c4e0d12e42a9eb5
k8s.io/client-go 03bfb9bdcfe5482795b999f39ca3ed9ad42ce5bb
k8s.io/kubernetes v1.11.0
k8s.io/utils 733eca437aa39379e4bcc25e726439dfca40fcff

View File

@@ -1,7 +1,10 @@
[![Build Status](https://travis-ci.org/containerd/go-cni.svg?branch=master)](https://travis-ci.org/containerd/go-cni)
# go-cni
A generic CNI library to provide APIs for CNI plugin interactions. The library provides APIs to:
- Load CNI network config from different sources
- Setup networks for container namespace
- Remove networks from container namespace
- Query status of CNI network plugin initialization
@@ -16,11 +19,17 @@ func main() {
defaultIfName := "eth0"
// Initialize library
l = gocni.New(gocni.WithMinNetworkCount(2),
gocni.WithLoNetwork(),
gocni.WithPluginConfDir("/etc/mycni/net.d"),
gocni.WithPluginDir([]string{"/opt/mycni/bin", "/opt/cni/bin"}),
gocni.WithDefaultIfName(defaultIfName))
// Load the cni configuration
err:= l.Load(gocni.WithLoNetwork,gocni.WithDefaultConf)
if err != nil{
log.Errorf("failed to load cni configuration: %v", err)
return
}
// Setup network for namespace.
labels := map[string]string{
"K8S_POD_NAMESPACE": "namespace1",
@@ -29,16 +38,10 @@ func main() {
}
result, err := l.Setup(id, netns, gocni.WithLabels(labels))
if err != nil {
return nil, fmt.Errorf("failed to setup network for namespace %q: %v", id, err)
log.Errorf("failed to setup network for namespace %q: %v",id, err)
return
}
defer func() {
if retErr != nil {
// Teardown network if an error is returned.
if err := l.Remove(id, netns, gocni.WithLabels(labels)); err != nil {
fmt.Errorf("Failed to destroy network for namespace %q", id)
}
}
}()
// Get IP of the default interface
IP := result.Interfaces[defaultIfName].IPConfigs[0].IP.String()
fmt.Printf("IP of the default interface %s:%s", defaultIfName, IP)

View File

@@ -31,7 +31,7 @@ type CNI interface {
// Remove tears down the network of the namespace.
Remove(id string, path string, opts ...NamespaceOpts) error
// Load loads the cni network config
Load(opts ...LoadOption) error
Load(opts ...CNIOpt) error
// Status checks the status of the cni initialization
Status() error
}
@@ -59,7 +59,7 @@ func defaultCNIConfig() *libcni {
}
}
func New(config ...ConfigOption) (CNI, error) {
func New(config ...CNIOpt) (CNI, error) {
cni := defaultCNIConfig()
var err error
for _, c := range config {
@@ -70,8 +70,10 @@ func New(config ...ConfigOption) (CNI, error) {
return cni, nil
}
func (c *libcni) Load(opts ...LoadOption) error {
func (c *libcni) Load(opts ...CNIOpt) error {
var err error
c.Lock()
defer c.Unlock()
// Reset the networks on a load operation to ensure
// config happens on a clean slate
c.reset()
@@ -81,30 +83,27 @@ func (c *libcni) Load(opts ...LoadOption) error {
return errors.Wrapf(ErrLoad, fmt.Sprintf("cni config load failed: %v", err))
}
}
return c.Status()
return nil
}
func (c *libcni) Status() error {
c.RLock()
defer c.RUnlock()
if len(c.networks) < c.networkCount {
return ErrCNINotInitialized
}
return nil
return c.status()
}
// Setup setups the network in the namespace
func (c *libcni) Setup(id string, path string, opts ...NamespaceOpts) (*CNIResult, error) {
if err:=c.Status();err!=nil{
return nil,err
c.RLock()
defer c.RUnlock()
if err := c.status(); err != nil {
return nil, err
}
ns, err := newNamespace(id, path, opts...)
if err != nil {
return nil, err
}
var results []*current.Result
c.RLock()
defer c.RUnlock()
for _, network := range c.networks {
r, err := network.Attach(ns)
if err != nil {
@@ -117,15 +116,15 @@ func (c *libcni) Setup(id string, path string, opts ...NamespaceOpts) (*CNIResul
// Remove removes the network config from the namespace
func (c *libcni) Remove(id string, path string, opts ...NamespaceOpts) error {
if err:=c.Status();err!=nil{
return err
}
c.RLock()
defer c.RUnlock()
if err := c.status(); err != nil {
return err
}
ns, err := newNamespace(id, path, opts...)
if err != nil {
return err
}
c.RLock()
defer c.RUnlock()
for _, network := range c.networks {
if err := network.Remove(ns); err != nil {
return err
@@ -135,7 +134,12 @@ func (c *libcni) Remove(id string, path string, opts ...NamespaceOpts) error {
}
func (c *libcni) reset() {
c.Lock()
defer c.Unlock()
c.networks = nil
}
func (c *libcni) status() error {
if len(c.networks) < c.networkCount {
return ErrCNINotInitialized
}
return nil
}

View File

@@ -24,11 +24,11 @@ import (
"github.com/pkg/errors"
)
type ConfigOption func(c *libcni) error
type CNIOpt func(c *libcni) error
// WithInterfacePrefix sets the prefix for network interfaces
// e.g. eth or wlan
func WithInterfacePrefix(prefix string) ConfigOption {
func WithInterfacePrefix(prefix string) CNIOpt {
return func(c *libcni) error {
c.prefix = prefix
return nil
@@ -37,7 +37,7 @@ func WithInterfacePrefix(prefix string) ConfigOption {
// WithPluginDir can be used to set the locations of
// the cni plugin binaries
func WithPluginDir(dirs []string) ConfigOption {
func WithPluginDir(dirs []string) CNIOpt {
return func(c *libcni) error {
c.pluginDirs = dirs
c.cniConfig = &cnilibrary.CNIConfig{Path: dirs}
@@ -47,7 +47,7 @@ func WithPluginDir(dirs []string) ConfigOption {
// WithPluginConfDir can be used to configure the
// cni configuration directory.
func WithPluginConfDir(dir string) ConfigOption {
func WithPluginConfDir(dir string) CNIOpt {
return func(c *libcni) error {
c.pluginConfDir = dir
return nil
@@ -57,23 +57,17 @@ func WithPluginConfDir(dir string) ConfigOption {
// WithMinNetworkCount can be used to configure the
// minimum networks to be configured and initalized
// for the status to report success. By default its 1.
func WithMinNetworkCount(count int) ConfigOption {
func WithMinNetworkCount(count int) CNIOpt {
return func(c *libcni) error {
c.networkCount = count
return nil
}
}
// LoadOption can be used with Load API
// to load network configuration from different
// sources.
type LoadOption func(c *libcni) error
// WithLoNetwork can be used to load the loopback
// network config.
func WithLoNetwork() LoadOption {
return func(c *libcni) error {
loConfig, _ := cnilibrary.ConfListFromBytes([]byte(`{
func WithLoNetwork(c *libcni) error {
loConfig, _ := cnilibrary.ConfListFromBytes([]byte(`{
"cniVersion": "0.3.1",
"name": "cni-loopback",
"plugins": [{
@@ -81,20 +75,17 @@ func WithLoNetwork() LoadOption {
}]
}`))
c.Lock()
defer c.Unlock()
c.networks = append(c.networks,&Network{
cni: c.cniConfig,
config: loConfig,
ifName: "lo",
})
return nil
}
c.networks = append(c.networks, &Network{
cni: c.cniConfig,
config: loConfig,
ifName: "lo",
})
return nil
}
// WithConf can be used to load config directly
// from byte.
func WithConf(bytes []byte) LoadOption {
func WithConf(bytes []byte) CNIOpt {
return func(c *libcni) error {
conf, err := cnilibrary.ConfFromBytes(bytes)
if err != nil {
@@ -104,8 +95,6 @@ func WithConf(bytes []byte) LoadOption {
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
c.networks = append(c.networks, &Network{
cni: c.cniConfig,
config: confList,
@@ -118,7 +107,7 @@ func WithConf(bytes []byte) LoadOption {
// WithConfFile can be used to load network config
// from an .conf file. Supported with absolute fileName
// with path only.
func WithConfFile(fileName string) LoadOption {
func WithConfFile(fileName string) CNIOpt {
return func(c *libcni) error {
conf, err := cnilibrary.ConfFromFile(fileName)
if err != nil {
@@ -129,8 +118,6 @@ func WithConfFile(fileName string) LoadOption {
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
c.networks = append(c.networks, &Network{
cni: c.cniConfig,
config: confList,
@@ -143,15 +130,13 @@ func WithConfFile(fileName string) LoadOption {
// WithConfListFile can be used to load network config
// from an .conflist file. Supported with absolute fileName
// with path only.
func WithConfListFile(fileName string) LoadOption {
func WithConfListFile(fileName string) CNIOpt {
return func(c *libcni) error {
confList, err := cnilibrary.ConfListFromFile(fileName)
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
c.networks = append(c.networks,&Network{
c.networks = append(c.networks, &Network{
cni: c.cniConfig,
config: confList,
ifName: getIfName(c.prefix, 0),
@@ -163,64 +148,60 @@ func WithConfListFile(fileName string) LoadOption {
// WithDefaultConf can be used to detect network config
// files from the configured cni config directory and load
// them.
func WithDefaultConf() LoadOption {
return func(c *libcni) error {
files, err := cnilibrary.ConfFiles(c.pluginConfDir, []string{".conf", ".conflist", ".json"})
switch {
case err != nil:
return errors.Wrapf(ErrRead, "failed to read config file: %v", err)
case len(files) == 0:
return errors.Wrapf(ErrCNINotInitialized, "no network config found in %s", c.pluginConfDir)
}
// files contains the network config files associated with cni network.
// Use lexicographical way as a defined order for network config files.
sort.Strings(files)
// Since the CNI spec does not specify a way to detect default networks,
// the convention chosen is - the first network configuration in the sorted
// list of network conf files as the default network and choose the default
// interface provided during init as the network interface for this default
// network. For every other network use a generated interface id.
i := 0
c.Lock()
defer c.Unlock()
for _, confFile := range files {
var confList *cnilibrary.NetworkConfigList
if strings.HasSuffix(confFile, ".conflist") {
confList, err = cnilibrary.ConfListFromFile(confFile)
if err != nil {
return errors.Wrapf(ErrInvalidConfig, "failed to load CNI config list file %s: %v", confFile, err)
}
} else {
conf, err := cnilibrary.ConfFromFile(confFile)
if err != nil {
return errors.Wrapf(ErrInvalidConfig, "failed to load CNI config file %s: %v", confFile, err)
}
// Ensure the config has a "type" so we know what plugin to run.
// Also catches the case where somebody put a conflist into a conf file.
if conf.Network.Type == "" {
return errors.Wrapf(ErrInvalidConfig, "network type not found in %s", confFile)
}
confList, err = cnilibrary.ConfListFromConf(conf)
if err != nil {
return errors.Wrapf(ErrInvalidConfig, "failed to convert CNI config file %s to list: %v", confFile, err)
}
}
if len(confList.Plugins) == 0 {
return errors.Wrapf(ErrInvalidConfig, "CNI config list %s has no networks, skipping", confFile)
}
c.networks = append(c.networks, &Network{
cni: c.cniConfig,
config: confList,
ifName: getIfName(c.prefix, i),
})
i++
}
if len(c.networks) == 0 {
return errors.Wrapf(ErrCNINotInitialized, "no valid networks found in %s", c.pluginDirs)
}
return nil
func WithDefaultConf(c *libcni) error {
files, err := cnilibrary.ConfFiles(c.pluginConfDir, []string{".conf", ".conflist", ".json"})
switch {
case err != nil:
return errors.Wrapf(ErrRead, "failed to read config file: %v", err)
case len(files) == 0:
return errors.Wrapf(ErrCNINotInitialized, "no network config found in %s", c.pluginConfDir)
}
// files contains the network config files associated with cni network.
// Use lexicographical way as a defined order for network config files.
sort.Strings(files)
// Since the CNI spec does not specify a way to detect default networks,
// the convention chosen is - the first network configuration in the sorted
// list of network conf files as the default network and choose the default
// interface provided during init as the network interface for this default
// network. For every other network use a generated interface id.
i := 0
for _, confFile := range files {
var confList *cnilibrary.NetworkConfigList
if strings.HasSuffix(confFile, ".conflist") {
confList, err = cnilibrary.ConfListFromFile(confFile)
if err != nil {
return errors.Wrapf(ErrInvalidConfig, "failed to load CNI config list file %s: %v", confFile, err)
}
} else {
conf, err := cnilibrary.ConfFromFile(confFile)
if err != nil {
return errors.Wrapf(ErrInvalidConfig, "failed to load CNI config file %s: %v", confFile, err)
}
// Ensure the config has a "type" so we know what plugin to run.
// Also catches the case where somebody put a conflist into a conf file.
if conf.Network.Type == "" {
return errors.Wrapf(ErrInvalidConfig, "network type not found in %s", confFile)
}
confList, err = cnilibrary.ConfListFromConf(conf)
if err != nil {
return errors.Wrapf(ErrInvalidConfig, "failed to convert CNI config file %s to list: %v", confFile, err)
}
}
if len(confList.Plugins) == 0 {
return errors.Wrapf(ErrInvalidConfig, "CNI config list %s has no networks, skipping", confFile)
}
c.networks = append(c.networks, &Network{
cni: c.cniConfig,
config: confList,
ifName: getIfName(c.prefix, i),
})
i++
}
if len(c.networks) == 0 {
return errors.Wrapf(ErrCNINotInitialized, "no valid networks found in %s", c.pluginDirs)
}
return nil
}