Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2019-04-26 10:54:51 -07:00
parent 3a3f0aac88
commit 5d17ed2302
20 changed files with 435 additions and 301 deletions

View File

@ -44,8 +44,8 @@ github.com/google/go-cmp v0.2.0
go.etcd.io/bbolt v1.3.2 go.etcd.io/bbolt v1.3.2
# cri dependencies # cri dependencies
github.com/containerd/cri 6d353571e64417d80c9478ffaea793714dd539d0 # master github.com/containerd/cri 2fc62db8146ce66f27b37306ad5fda34207835f3 # master
github.com/containerd/go-cni 40bcf8ec8acd7372be1d77031d585d5d8e561c90 github.com/containerd/go-cni 891c2a41e18144b2d7921f971d6c9789a68046b2
github.com/containernetworking/cni v0.6.0 github.com/containernetworking/cni v0.6.0
github.com/containernetworking/plugins v0.7.0 github.com/containernetworking/plugins v0.7.0
github.com/davecgh/go-spew v1.1.0 github.com/davecgh/go-spew v1.1.0

View File

@ -17,6 +17,7 @@ limitations under the License.
package cri package cri
import ( import (
"context"
"flag" "flag"
"path/filepath" "path/filepath"
"time" "time"
@ -73,7 +74,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
} }
log.G(ctx).Infof("Start cri plugin with config %+v", c) log.G(ctx).Infof("Start cri plugin with config %+v", c)
if err := validateConfig(&c); err != nil { if err := validateConfig(ctx, &c); err != nil {
return nil, errors.Wrap(err, "invalid config") return nil, errors.Wrap(err, "invalid config")
} }
@ -111,14 +112,36 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
} }
// validateConfig validates the given configuration. // validateConfig validates the given configuration.
func validateConfig(c *criconfig.Config) error { func validateConfig(ctx context.Context, c *criconfig.Config) error {
// It is an error to provide both an UntrustedWorkloadRuntime & define an 'untrusted' runtime. if c.ContainerdConfig.Runtimes == nil {
if _, ok := c.ContainerdConfig.Runtimes[criconfig.RuntimeUntrusted]; ok { c.ContainerdConfig.Runtimes = make(map[string]criconfig.Runtime)
if c.ContainerdConfig.UntrustedWorkloadRuntime.Type != "" {
return errors.New("conflicting definitions: configuration includes untrusted_workload_runtime and runtimes['untrusted']")
}
} }
// Validation for deprecated untrusted_workload_runtime.
if c.ContainerdConfig.UntrustedWorkloadRuntime.Type != "" {
log.G(ctx).Warning("`untrusted_workload_runtime` is deprecated, please use `untrusted` runtime in `runtimes` instead")
if _, ok := c.ContainerdConfig.Runtimes[criconfig.RuntimeUntrusted]; ok {
return errors.Errorf("conflicting definitions: configuration includes both `untrusted_workload_runtime` and `runtimes[%q]`", criconfig.RuntimeUntrusted)
}
c.ContainerdConfig.Runtimes[criconfig.RuntimeUntrusted] = c.ContainerdConfig.UntrustedWorkloadRuntime
}
// Validation for deprecated default_runtime field.
if c.ContainerdConfig.DefaultRuntime.Type != "" {
log.G(ctx).Warning("`default_runtime` is deprecated, please use `default_runtime_name` to reference the default configuration you have defined in `runtimes`")
c.ContainerdConfig.DefaultRuntimeName = criconfig.RuntimeDefault
c.ContainerdConfig.Runtimes[criconfig.RuntimeDefault] = c.ContainerdConfig.DefaultRuntime
}
// Validation for default_runtime_name
if c.ContainerdConfig.DefaultRuntimeName == "" {
return errors.New("`default_runtime_name` is empty")
}
if _, ok := c.ContainerdConfig.Runtimes[c.ContainerdConfig.DefaultRuntimeName]; !ok {
return errors.New("no corresponding runtime configured in `runtimes` for `default_runtime_name`")
}
// Validation for stream_idle_timeout
if c.StreamIdleTimeout != "" { if c.StreamIdleTimeout != "" {
if _, err := time.ParseDuration(c.StreamIdleTimeout); err != nil { if _, err := time.ParseDuration(c.StreamIdleTimeout); err != nil {
return errors.Wrap(err, "invalid stream idle timeout") return errors.Wrap(err, "invalid stream idle timeout")

View File

@ -47,14 +47,14 @@ type Runtime struct {
type ContainerdConfig struct { type ContainerdConfig struct {
// Snapshotter is the snapshotter used by containerd. // Snapshotter is the snapshotter used by containerd.
Snapshotter string `toml:"snapshotter" json:"snapshotter"` Snapshotter string `toml:"snapshotter" json:"snapshotter"`
// DefaultRuntimeName is the default runtime name to use from the runtimes table.
DefaultRuntimeName string `toml:"default_runtime_name" json:"defaultRuntimeName"`
// DefaultRuntime is the default runtime to use in containerd. // DefaultRuntime is the default runtime to use in containerd.
// This runtime is used when no runtime handler (or the empty string) is provided. // This runtime is used when no runtime handler (or the empty string) is provided.
// DEPRECATED: use DefaultRuntimeName instead. Remove in containerd 1.4.
DefaultRuntime Runtime `toml:"default_runtime" json:"defaultRuntime"` DefaultRuntime Runtime `toml:"default_runtime" json:"defaultRuntime"`
// UntrustedWorkloadRuntime is a runtime to run untrusted workloads on it. // UntrustedWorkloadRuntime is a runtime to run untrusted workloads on it.
// DEPRECATED: use Runtimes instead. If provided, this runtime is mapped to the runtime handler // DEPRECATED: use `untrusted` runtime in Runtimes instead. Remove in containerd 1.4.
// named 'untrusted'. It is a configuration error to provide both the (now deprecated)
// UntrustedWorkloadRuntime and a handler in the Runtimes handler map (below) for 'untrusted'
// workloads at the same time. Please provide one or the other.
UntrustedWorkloadRuntime Runtime `toml:"untrusted_workload_runtime" json:"untrustedWorkloadRuntime"` UntrustedWorkloadRuntime Runtime `toml:"untrusted_workload_runtime" json:"untrustedWorkloadRuntime"`
// Runtimes is a map from CRI RuntimeHandler strings, which specify types of runtime // Runtimes is a map from CRI RuntimeHandler strings, which specify types of runtime
// configurations, to the matching configurations. // configurations, to the matching configurations.
@ -196,12 +196,13 @@ func DefaultConfig() PluginConfig {
}, },
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
Snapshotter: containerd.DefaultSnapshotter, Snapshotter: containerd.DefaultSnapshotter,
DefaultRuntime: Runtime{ DefaultRuntimeName: "runc",
Type: "io.containerd.runtime.v1.linux",
Engine: "",
Root: "",
},
NoPivot: false, NoPivot: false,
Runtimes: map[string]Runtime{
"runc": {
Type: "io.containerd.runc.v1",
},
},
}, },
StreamServerAddress: "127.0.0.1", StreamServerAddress: "127.0.0.1",
StreamServerPort: "0", StreamServerPort: "0",
@ -229,4 +230,6 @@ func DefaultConfig() PluginConfig {
const ( const (
// RuntimeUntrusted is the implicit runtime defined for ContainerdConfig.UntrustedWorkloadRuntime // RuntimeUntrusted is the implicit runtime defined for ContainerdConfig.UntrustedWorkloadRuntime
RuntimeUntrusted = "untrusted" RuntimeUntrusted = "untrusted"
// RuntimeDefault is the implicit runtime defined for ContainerdConfig.DefaultRuntime
RuntimeDefault = "default"
) )

View File

@ -105,6 +105,9 @@ func setContainerRemoving(container containerstore.Container) error {
if status.State() == runtime.ContainerState_CONTAINER_UNKNOWN { if status.State() == runtime.ContainerState_CONTAINER_UNKNOWN {
return status, errors.New("container state is unknown, to stop first") return status, errors.New("container state is unknown, to stop first")
} }
if status.Starting {
return status, errors.New("container is in starting state, can't be removed")
}
if status.Removing { if status.Removing {
return status, errors.New("container is already in removing state") return status, errors.New("container is already in removing state")
} }

View File

@ -38,64 +38,48 @@ import (
// StartContainer starts the container. // StartContainer starts the container.
func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (retRes *runtime.StartContainerResponse, retErr error) { func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (retRes *runtime.StartContainerResponse, retErr error) {
container, err := c.containerStore.Get(r.GetContainerId()) cntr, err := c.containerStore.Get(r.GetContainerId())
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "an error occurred when try to find container %q", r.GetContainerId()) return nil, errors.Wrapf(err, "an error occurred when try to find container %q", r.GetContainerId())
} }
var startErr error
// update container status in one transaction to avoid race with event monitor.
if err := container.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
// Always apply status change no matter startContainer fails or not. Because startContainer
// may change container state no matter it fails or succeeds.
startErr = c.startContainer(ctx, container, &status)
return status, nil
}); startErr != nil {
return nil, startErr
} else if err != nil {
return nil, errors.Wrapf(err, "failed to update container %q metadata", container.ID)
}
return &runtime.StartContainerResponse{}, nil
}
// startContainer actually starts the container. The function needs to be run in one transaction. Any updates
// to the status passed in will be applied no matter the function returns error or not.
func (c *criService) startContainer(ctx context.Context,
cntr containerstore.Container,
status *containerstore.Status) (retErr error) {
id := cntr.ID id := cntr.ID
meta := cntr.Metadata meta := cntr.Metadata
container := cntr.Container container := cntr.Container
config := meta.Config config := meta.Config
// Return error if container is not in created state. // Set starting state to prevent other start/remove operations against this container
if status.State() != runtime.ContainerState_CONTAINER_CREATED { // while it's being started.
return errors.Errorf("container %q is in %s state", id, criContainerStateToString(status.State())) if err := setContainerStarting(cntr); err != nil {
return nil, errors.Wrapf(err, "failed to set starting state for container %q", id)
} }
// Do not start the container when there is a removal in progress.
if status.Removing {
return errors.Errorf("container %q is in removing state", id)
}
defer func() { defer func() {
if retErr != nil { if retErr != nil {
// Set container to exited if fail to start. // Set container to exited if fail to start.
if err := cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
status.Pid = 0 status.Pid = 0
status.FinishedAt = time.Now().UnixNano() status.FinishedAt = time.Now().UnixNano()
status.ExitCode = errorStartExitCode status.ExitCode = errorStartExitCode
status.Reason = errorStartReason status.Reason = errorStartReason
status.Message = retErr.Error() status.Message = retErr.Error()
return status, nil
}); err != nil {
logrus.WithError(err).Errorf("failed to set start failure state for container %q", id)
}
}
if err := resetContainerStarting(cntr); err != nil {
logrus.WithError(err).Errorf("failed to reset starting state for container %q", id)
} }
}() }()
// Get sandbox config from sandbox store. // Get sandbox config from sandbox store.
sandbox, err := c.sandboxStore.Get(meta.SandboxID) sandbox, err := c.sandboxStore.Get(meta.SandboxID)
if err != nil { if err != nil {
return errors.Wrapf(err, "sandbox %q not found", meta.SandboxID) return nil, errors.Wrapf(err, "sandbox %q not found", meta.SandboxID)
} }
sandboxID := meta.SandboxID sandboxID := meta.SandboxID
if sandbox.Status.Get().State != sandboxstore.StateReady { if sandbox.Status.Get().State != sandboxstore.StateReady {
return errors.Errorf("sandbox container %q is not running", sandboxID) return nil, errors.Errorf("sandbox container %q is not running", sandboxID)
} }
ioCreation := func(id string) (_ containerdio.IO, err error) { ioCreation := func(id string) (_ containerdio.IO, err error) {
@ -110,7 +94,7 @@ func (c *criService) startContainer(ctx context.Context,
ctrInfo, err := container.Info(ctx) ctrInfo, err := container.Info(ctx)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to get container info") return nil, errors.Wrap(err, "failed to get container info")
} }
var taskOpts []containerd.NewTaskOpts var taskOpts []containerd.NewTaskOpts
@ -120,7 +104,7 @@ func (c *criService) startContainer(ctx context.Context,
} }
task, err := container.NewTask(ctx, ioCreation, taskOpts...) task, err := container.NewTask(ctx, ioCreation, taskOpts...)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to create containerd task") return nil, errors.Wrap(err, "failed to create containerd task")
} }
defer func() { defer func() {
if retErr != nil { if retErr != nil {
@ -133,15 +117,61 @@ func (c *criService) startContainer(ctx context.Context,
} }
}() }()
// wait is a long running background request, no timeout needed.
exitCh, err := task.Wait(ctrdutil.NamespacedContext())
if err != nil {
return nil, errors.Wrap(err, "failed to wait for containerd task")
}
// Start containerd task. // Start containerd task.
if err := task.Start(ctx); err != nil { if err := task.Start(ctx); err != nil {
return errors.Wrapf(err, "failed to start containerd task %q", id) return nil, errors.Wrapf(err, "failed to start containerd task %q", id)
} }
// Update container start timestamp. // Update container start timestamp.
if err := cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
status.Pid = task.Pid() status.Pid = task.Pid()
status.StartedAt = time.Now().UnixNano() status.StartedAt = time.Now().UnixNano()
return nil return status, nil
}); err != nil {
return nil, errors.Wrapf(err, "failed to update container %q state", id)
}
// start the monitor after updating container state, this ensures that
// event monitor receives the TaskExit event and update container state
// after this.
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh)
return &runtime.StartContainerResponse{}, nil
}
// setContainerStarting sets the container into starting state. In starting state, the
// container will not be removed or started again.
func setContainerStarting(container containerstore.Container) error {
return container.Status.Update(func(status containerstore.Status) (containerstore.Status, error) {
// Return error if container is not in created state.
if status.State() != runtime.ContainerState_CONTAINER_CREATED {
return status, errors.Errorf("container is in %s state", criContainerStateToString(status.State()))
}
// Do not start the container when there is a removal in progress.
if status.Removing {
return status, errors.New("container is in removing state, can't be started")
}
if status.Starting {
return status, errors.New("container is already in starting state")
}
status.Starting = true
return status, nil
})
}
// resetContainerStarting resets the container starting state on start failure. So
// that we could remove the container later.
func resetContainerStarting(container containerstore.Container) error {
return container.Status.Update(func(status containerstore.Status) (containerstore.Status, error) {
status.Starting = false
return status, nil
})
} }
// createContainerLoggers creates container loggers and return write closer for stdout and stderr. // createContainerLoggers creates container loggers and return write closer for stdout and stderr.

View File

@ -28,6 +28,7 @@ import (
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/store" "github.com/containerd/cri/pkg/store"
containerstore "github.com/containerd/cri/pkg/store/container" containerstore "github.com/containerd/cri/pkg/store/container"
) )
@ -74,36 +75,34 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
return errors.Wrapf(err, "failed to get task for container %q", id) return errors.Wrapf(err, "failed to get task for container %q", id)
} }
// Don't return for unknown state, some cleanup needs to be done. // Don't return for unknown state, some cleanup needs to be done.
if state != runtime.ContainerState_CONTAINER_UNKNOWN { if state == runtime.ContainerState_CONTAINER_UNKNOWN {
return nil return cleanupUnknownContainer(ctx, id, container)
} }
// Task is an interface, explicitly set it to nil just in case. return nil
task = nil
} }
// Handle unknown state. // Handle unknown state.
if state == runtime.ContainerState_CONTAINER_UNKNOWN { if state == runtime.ContainerState_CONTAINER_UNKNOWN {
status, err := getTaskStatus(ctx, task) // Start an exit handler for containers in unknown state.
waitCtx, waitCancel := context.WithCancel(ctrdutil.NamespacedContext())
defer waitCancel()
exitCh, err := task.Wait(waitCtx)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to get task status for %q", id) if !errdefs.IsNotFound(err) {
return errors.Wrapf(err, "failed to wait for task for %q", id)
} }
switch status.Status { return cleanupUnknownContainer(ctx, id, container)
case containerd.Running, containerd.Created:
// The task is still running, continue stopping the task.
case containerd.Stopped:
// The task has exited. If the task exited after containerd
// started, the event monitor will receive its exit event; if it
// exited before containerd started, the event monitor will never
// receive its exit event.
// However, we can't tell that because the task state was not
// successfully loaded during containerd start (container is
// in UNKNOWN state).
// So always do cleanup here, just in case that we've missed the
// exit event.
return cleanupUnknownContainer(ctx, id, status, container)
default:
return errors.Wrapf(err, "unsupported task status %q", status.Status)
} }
exitCtx, exitCancel := context.WithCancel(context.Background())
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh)
defer func() {
exitCancel()
// This ensures that exit monitor is stopped before
// `Wait` is cancelled, so no exit event is generated
// because of the `Wait` cancellation.
<-stopCh
}()
} }
// We only need to kill the task. The event handler will Delete the // We only need to kill the task. The event handler will Delete the
@ -176,19 +175,13 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers
} }
// cleanupUnknownContainer cleanup stopped container in unknown state. // cleanupUnknownContainer cleanup stopped container in unknown state.
func cleanupUnknownContainer(ctx context.Context, id string, status containerd.Status, func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error {
cntr containerstore.Container) error {
// Reuse handleContainerExit to do the cleanup. // Reuse handleContainerExit to do the cleanup.
// NOTE(random-liu): If the task did exit after containerd started, both
// the event monitor and the cleanup function would update the container
// state. The final container state will be whatever being updated first.
// There is no way to completely avoid this race condition, and for best
// effort unknown state container cleanup, this seems acceptable.
return handleContainerExit(ctx, &eventtypes.TaskExit{ return handleContainerExit(ctx, &eventtypes.TaskExit{
ContainerID: id, ContainerID: id,
ID: id, ID: id,
Pid: 0, Pid: 0,
ExitStatus: status.ExitStatus, ExitStatus: unknownExitCode,
ExitedAt: status.ExitTime, ExitedAt: time.Now(),
}, cntr) }, cntr)
} }

View File

@ -50,6 +50,8 @@ const (
// Add a timeout for each event handling, events that timeout will be requeued and // Add a timeout for each event handling, events that timeout will be requeued and
// handled again in the future. // handled again in the future.
handleEventTimeout = 10 * time.Second handleEventTimeout = 10 * time.Second
exitChannelSize = 1024
) )
// eventMonitor monitors containerd event and updates internal state correspondingly. // eventMonitor monitors containerd event and updates internal state correspondingly.
@ -57,6 +59,8 @@ const (
type eventMonitor struct { type eventMonitor struct {
c *criService c *criService
ch <-chan *events.Envelope ch <-chan *events.Envelope
// exitCh receives container/sandbox exit events from exit monitors.
exitCh chan *eventtypes.TaskExit
errCh <-chan error errCh <-chan error
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -89,6 +93,7 @@ func newEventMonitor(c *criService) *eventMonitor {
c: c, c: c,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
exitCh: make(chan *eventtypes.TaskExit, exitChannelSize),
backOff: newBackOff(), backOff: newBackOff(),
} }
} }
@ -98,13 +103,38 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
// note: filters are any match, if you want any match but not in namespace foo // note: filters are any match, if you want any match but not in namespace foo
// then you have to manually filter namespace foo // then you have to manually filter namespace foo
filters := []string{ filters := []string{
`topic=="/tasks/exit"`,
`topic=="/tasks/oom"`, `topic=="/tasks/oom"`,
`topic~="/images/"`, `topic~="/images/"`,
} }
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...) em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
} }
// startExitMonitor starts an exit monitor for a given container/sandbox.
func (em *eventMonitor) startExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
stopCh := make(chan struct{})
go func() {
defer close(stopCh)
select {
case exitRes := <-exitCh:
exitStatus, exitedAt, err := exitRes.Result()
if err != nil {
logrus.WithError(err).Errorf("Failed to get task exit status for %q", id)
exitStatus = unknownExitCode
exitedAt = time.Now()
}
em.exitCh <- &eventtypes.TaskExit{
ContainerID: id,
ID: id,
Pid: pid,
ExitStatus: exitStatus,
ExitedAt: exitedAt,
}
case <-ctx.Done():
}
}()
return stopCh
}
func convertEvent(e *gogotypes.Any) (string, interface{}, error) { func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
id := "" id := ""
evt, err := typeurl.UnmarshalAny(e) evt, err := typeurl.UnmarshalAny(e)
@ -113,8 +143,6 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
} }
switch e := evt.(type) { switch e := evt.(type) {
case *eventtypes.TaskExit:
id = e.ContainerID
case *eventtypes.TaskOOM: case *eventtypes.TaskOOM:
id = e.ContainerID id = e.ContainerID
case *eventtypes.ImageCreate: case *eventtypes.ImageCreate:
@ -142,6 +170,18 @@ func (em *eventMonitor) start() <-chan error {
defer close(errCh) defer close(errCh)
for { for {
select { select {
case e := <-em.exitCh:
logrus.Debugf("Received exit event %+v", e)
id := e.ID
if em.backOff.isInBackOff(id) {
logrus.Infof("Events for %q is in backoff, enqueue event %+v", id, e)
em.backOff.enBackOff(id, e)
break
}
if err := em.handleEvent(e); err != nil {
logrus.WithError(err).Errorf("Failed to handle exit event %+v for %s", e, id)
em.backOff.enBackOff(id, e)
}
case e := <-em.ch: case e := <-em.ch:
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic) logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
if e.Namespace != constants.K8sContainerdNamespace { if e.Namespace != constants.K8sContainerdNamespace {
@ -213,8 +253,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
} else if err != store.ErrNotExist { } else if err != store.ErrNotExist {
return errors.Wrap(err, "can't find container for TaskExit event") return errors.Wrap(err, "can't find container for TaskExit event")
} }
// Use GetAll to include sandbox in init state. sb, err := em.c.sandboxStore.Get(e.ID)
sb, err := em.c.sandboxStore.GetAll(e.ID)
if err == nil { if err == nil {
if err := handleSandboxExit(ctx, e, sb); err != nil { if err := handleSandboxExit(ctx, e, sb); err != nil {
return errors.Wrap(err, "failed to handle sandbox TaskExit event") return errors.Wrap(err, "failed to handle sandbox TaskExit event")
@ -322,15 +361,7 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst
} }
} }
err = sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { err = sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
// NOTE(random-liu): We SHOULD NOT change INIT state here.
// If sandbox state is INIT when event monitor receives an TaskExit event,
// it means that sandbox start has failed. In that case, `RunPodSandbox` will
// cleanup everything immediately.
// Once sandbox state goes out of INIT, it becomes visable to the user, which
// is not what we want.
if status.State != sandboxstore.StateInit {
status.State = sandboxstore.StateNotReady status.State = sandboxstore.StateNotReady
}
status.Pid = 0 status.Pid = 0
return status, nil return status, nil
}) })

View File

@ -23,12 +23,9 @@ import (
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers" "github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/linux/runctypes"
runcoptions "github.com/containerd/containerd/runtime/v2/runc/options" runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
@ -477,31 +474,6 @@ func unknownSandboxStatus() sandboxstore.Status {
} }
} }
// unknownExitStatus generates containerd.Status for container exited with unknown exit code.
func unknownExitStatus() containerd.Status {
return containerd.Status{
Status: containerd.Stopped,
ExitStatus: unknownExitCode,
ExitTime: time.Now(),
}
}
// getTaskStatus returns status for a given task. It returns unknown exit status if
// the task is nil or not found.
func getTaskStatus(ctx context.Context, task containerd.Task) (containerd.Status, error) {
if task == nil {
return unknownExitStatus(), nil
}
status, err := task.Status(ctx)
if err != nil {
if !errdefs.IsNotFound(err) {
return containerd.Status{}, err
}
return unknownExitStatus(), nil
}
return status, nil
}
// getPassthroughAnnotations filters requested pod annotations by comparing // getPassthroughAnnotations filters requested pod annotations by comparing
// against permitted annotations for the given runtime. // against permitted annotations for the given runtime.
func getPassthroughAnnotations(podAnnotations map[string]string, func getPassthroughAnnotations(podAnnotations map[string]string,

View File

@ -34,6 +34,7 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/netns" "github.com/containerd/cri/pkg/netns"
cio "github.com/containerd/cri/pkg/server/io" cio "github.com/containerd/cri/pkg/server/io"
containerstore "github.com/containerd/cri/pkg/store/container" containerstore "github.com/containerd/cri/pkg/store/container"
@ -57,7 +58,7 @@ func (c *criService) recover(ctx context.Context) error {
return errors.Wrap(err, "failed to list sandbox containers") return errors.Wrap(err, "failed to list sandbox containers")
} }
for _, sandbox := range sandboxes { for _, sandbox := range sandboxes {
sb, err := loadSandbox(ctx, sandbox) sb, err := c.loadSandbox(ctx, sandbox)
if err != nil { if err != nil {
logrus.WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID()) logrus.WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID())
continue continue
@ -275,6 +276,22 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
status.StartedAt = time.Now().UnixNano() status.StartedAt = time.Now().UnixNano()
status.Pid = t.Pid() status.Pid = t.Pid()
} }
// Wait for the task for exit monitor.
// wait is a long running background request, no timeout needed.
exitCh, err := t.Wait(ctrdutil.NamespacedContext())
if err != nil {
if !errdefs.IsNotFound(err) {
return errors.Wrap(err, "failed to wait for task")
}
// Container was in running state, but its task has been deleted,
// set unknown exited state.
status.FinishedAt = time.Now().UnixNano()
status.ExitCode = unknownExitCode
status.Reason = unknownExitReason
} else {
// Start exit monitor.
c.eventMonitor.startExitMonitor(context.Background(), id, status.Pid, exitCh)
}
case containerd.Stopped: case containerd.Stopped:
// Task is stopped. Updata status and delete the task. // Task is stopped. Updata status and delete the task.
if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
@ -304,7 +321,7 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
} }
// loadSandbox loads sandbox from containerd. // loadSandbox loads sandbox from containerd.
func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) { func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) {
ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout) ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout)
defer cancel() defer cancel()
var sandbox sandboxstore.Sandbox var sandbox sandboxstore.Sandbox
@ -358,9 +375,20 @@ func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.S
status.State = sandboxstore.StateNotReady status.State = sandboxstore.StateNotReady
} else { } else {
if taskStatus.Status == containerd.Running { if taskStatus.Status == containerd.Running {
// Wait for the task for sandbox monitor.
// wait is a long running background request, no timeout needed.
exitCh, err := t.Wait(ctrdutil.NamespacedContext())
if err != nil {
if !errdefs.IsNotFound(err) {
return status, errors.Wrap(err, "failed to wait for task")
}
status.State = sandboxstore.StateNotReady
} else {
// Task is running, set sandbox state as READY. // Task is running, set sandbox state as READY.
status.State = sandboxstore.StateReady status.State = sandboxstore.StateReady
status.Pid = t.Pid() status.Pid = t.Pid()
c.eventMonitor.startExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
}
} else { } else {
// Task is not running. Delete the task and set sandbox state as NOTREADY. // Task is not running. Delete the task and set sandbox state as NOTREADY.
if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {

View File

@ -87,7 +87,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
RuntimeHandler: r.GetRuntimeHandler(), RuntimeHandler: r.GetRuntimeHandler(),
}, },
sandboxstore.Status{ sandboxstore.Status{
State: sandboxstore.StateInit, State: sandboxstore.StateUnknown,
}, },
) )
@ -253,50 +253,6 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to get sandbox container info") return nil, errors.Wrap(err, "failed to get sandbox container info")
} }
if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
status.CreatedAt = info.CreatedAt
return status, nil
}); err != nil {
return nil, errors.Wrap(err, "failed to update sandbox created timestamp")
}
// Add sandbox into sandbox store in INIT state.
sandbox.Container = container
if err := c.sandboxStore.Add(sandbox); err != nil {
return nil, errors.Wrapf(err, "failed to add sandbox %+v into store", sandbox)
}
defer func() {
// Delete sandbox from sandbox store if there is an error.
if retErr != nil {
c.sandboxStore.Delete(id)
}
}()
// NOTE(random-liu): Sandbox state only stay in INIT state after this point
// and before the end of this function.
// * If `Update` succeeds, sandbox state will become READY in one transaction.
// * If `Update` fails, sandbox will be removed from the store in the defer above.
// * If containerd stops at any point before `Update` finishes, because sandbox
// state is not checkpointed, it will be recovered from corresponding containerd task
// status during restart:
// * If the task is running, sandbox state will be READY,
// * Or else, sandbox state will be NOTREADY.
//
// In any case, sandbox will leave INIT state, so it's safe to ignore sandbox
// in INIT state in other functions.
// Start sandbox container in one transaction to avoid race condition with
// event monitor.
if err := sandbox.Status.Update(func(status sandboxstore.Status) (_ sandboxstore.Status, retErr error) {
// NOTE(random-liu): We should not change the sandbox state to NOTREADY
// if `Update` fails.
//
// If `Update` fails, the sandbox will be cleaned up by all the defers
// above. We should not let user see this sandbox, or else they will
// see the sandbox disappear after the defer clean up, which may confuse
// them.
//
// Given so, we should keep the sandbox in INIT state if `Update` fails,
// and ignore sandbox in INIT state in all the inspection functions.
// Create sandbox task in containerd. // Create sandbox task in containerd.
log.Tracef("Create sandbox container (id=%q, name=%q).", log.Tracef("Create sandbox container (id=%q, name=%q).",
@ -310,32 +266,53 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
// We don't need stdio for sandbox container. // We don't need stdio for sandbox container.
task, err := container.NewTask(ctx, containerdio.NullIO, taskOpts...) task, err := container.NewTask(ctx, containerdio.NullIO, taskOpts...)
if err != nil { if err != nil {
return status, errors.Wrap(err, "failed to create containerd task") return nil, errors.Wrap(err, "failed to create containerd task")
} }
defer func() { defer func() {
if retErr != nil { if retErr != nil {
deferCtx, deferCancel := ctrdutil.DeferContext() deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel() defer deferCancel()
// Cleanup the sandbox container if an error is returned. // Cleanup the sandbox container if an error is returned.
// It's possible that task is deleted by event monitor.
if _, err := task.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { if _, err := task.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("Failed to delete sandbox container %q", id) logrus.WithError(err).Errorf("Failed to delete sandbox container %q", id)
} }
} }
}() }()
if err := task.Start(ctx); err != nil { // wait is a long running background request, no timeout needed.
return status, errors.Wrapf(err, "failed to start sandbox container task %q", id) exitCh, err := task.Wait(ctrdutil.NamespacedContext())
if err != nil {
return nil, errors.Wrap(err, "failed to wait for sandbox container task")
} }
if err := task.Start(ctx); err != nil {
return nil, errors.Wrapf(err, "failed to start sandbox container task %q", id)
}
if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
// Set the pod sandbox as ready after successfully start sandbox container. // Set the pod sandbox as ready after successfully start sandbox container.
status.Pid = task.Pid() status.Pid = task.Pid()
status.State = sandboxstore.StateReady status.State = sandboxstore.StateReady
status.CreatedAt = info.CreatedAt
return status, nil return status, nil
}); err != nil { }); err != nil {
return nil, errors.Wrap(err, "failed to start sandbox container") return nil, errors.Wrap(err, "failed to update sandbox status")
} }
// Add sandbox into sandbox store in INIT state.
sandbox.Container = container
if err := c.sandboxStore.Add(sandbox); err != nil {
return nil, errors.Wrapf(err, "failed to add sandbox %+v into store", sandbox)
}
// start the monitor after adding sandbox into the store, this ensures
// that sandbox is in the store, when event monitor receives the TaskExit event.
//
// TaskOOM from containerd may come before sandbox is added to store,
// but we don't care about sandbox TaskOOM right now, so it is fine.
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh)
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
} }
@ -651,16 +628,11 @@ func (c *criService) getSandboxRuntime(config *runtime.PodSandboxConfig, runtime
return criconfig.Runtime{}, errors.New("untrusted workload with host access is not allowed") return criconfig.Runtime{}, errors.New("untrusted workload with host access is not allowed")
} }
// Handle the deprecated UntrustedWorkloadRuntime.
if c.config.ContainerdConfig.UntrustedWorkloadRuntime.Type != "" {
return c.config.ContainerdConfig.UntrustedWorkloadRuntime, nil
}
runtimeHandler = criconfig.RuntimeUntrusted runtimeHandler = criconfig.RuntimeUntrusted
} }
if runtimeHandler == "" { if runtimeHandler == "" {
return c.config.ContainerdConfig.DefaultRuntime, nil runtimeHandler = c.config.ContainerdConfig.DefaultRuntimeName
} }
handler, ok := c.config.ContainerdConfig.Runtimes[runtimeHandler] handler, ok := c.config.ContainerdConfig.Runtimes[runtimeHandler]

View File

@ -19,7 +19,6 @@ package server
import ( import (
"time" "time"
"github.com/containerd/containerd"
eventtypes "github.com/containerd/containerd/api/events" eventtypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
cni "github.com/containerd/go-cni" cni "github.com/containerd/go-cni"
@ -29,6 +28,7 @@ import (
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox" sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
) )
@ -97,6 +97,7 @@ func (c *criService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandb
// `task.Delete` is not called here because it will be called when // `task.Delete` is not called here because it will be called when
// the event monitor handles the `TaskExit` event. // the event monitor handles the `TaskExit` event.
func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxstore.Sandbox) error { func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxstore.Sandbox) error {
id := sandbox.ID
container := sandbox.Container container := sandbox.Container
state := sandbox.Status.Get().State state := sandbox.Status.Get().State
task, err := container.Task(ctx, nil) task, err := container.Task(ctx, nil)
@ -105,29 +106,35 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst
return errors.Wrap(err, "failed to get sandbox container") return errors.Wrap(err, "failed to get sandbox container")
} }
// Don't return for unknown state, some cleanup needs to be done. // Don't return for unknown state, some cleanup needs to be done.
if state != sandboxstore.StateUnknown { if state == sandboxstore.StateUnknown {
return nil return cleanupUnknownSandbox(ctx, id, sandbox)
} }
// Task is an interface, explicitly set it to nil just in case. return nil
task = nil
} }
// Handle unknown state. // Handle unknown state.
// The cleanup logic is the same with container unknown state. // The cleanup logic is the same with container unknown state.
if state == sandboxstore.StateUnknown { if state == sandboxstore.StateUnknown {
status, err := getTaskStatus(ctx, task) // Start an exit handler for containers in unknown state.
waitCtx, waitCancel := context.WithCancel(ctrdutil.NamespacedContext())
defer waitCancel()
exitCh, err := task.Wait(waitCtx)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to get task status for %q", sandbox.ID) if !errdefs.IsNotFound(err) {
return errors.Wrap(err, "failed to wait for task")
} }
switch status.Status { return cleanupUnknownSandbox(ctx, id, sandbox)
case containerd.Running, containerd.Created:
// The task is still running, continue stopping the task.
case containerd.Stopped:
// The task has exited, explicitly cleanup.
return cleanupUnknownSandbox(ctx, sandbox.ID, status, sandbox)
default:
return errors.Wrapf(err, "unsupported task status %q", status.Status)
} }
exitCtx, exitCancel := context.WithCancel(context.Background())
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh)
defer func() {
exitCancel()
// This ensures that exit monitor is stopped before
// `Wait` is cancelled, so no exit event is generated
// because of the `Wait` cancellation.
<-stopCh
}()
} }
// Kill the sandbox container. // Kill the sandbox container.
@ -166,14 +173,13 @@ func (c *criService) teardownPod(id string, path string, config *runtime.PodSand
} }
// cleanupUnknownSandbox cleanup stopped sandbox in unknown state. // cleanupUnknownSandbox cleanup stopped sandbox in unknown state.
func cleanupUnknownSandbox(ctx context.Context, id string, status containerd.Status, func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error {
sandbox sandboxstore.Sandbox) error {
// Reuse handleSandboxExit to do the cleanup. // Reuse handleSandboxExit to do the cleanup.
return handleSandboxExit(ctx, &eventtypes.TaskExit{ return handleSandboxExit(ctx, &eventtypes.TaskExit{
ContainerID: id, ContainerID: id,
ID: id, ID: id,
Pid: 0, Pid: 0,
ExitStatus: status.ExitStatus, ExitStatus: unknownExitCode,
ExitedAt: status.ExitTime, ExitedAt: time.Now(),
}, sandbox) }, sandbox)
} }

View File

@ -88,6 +88,10 @@ type Status struct {
// Human-readable message indicating details about why container is in its // Human-readable message indicating details about why container is in its
// current state. // current state.
Message string Message string
// Starting indicates that the container is in starting state.
// This field doesn't need to be checkpointed.
// TODO(now): Add unit test.
Starting bool `json:"-"`
// Removing indicates that the container is in removing state. // Removing indicates that the container is in removing state.
// This field doesn't need to be checkpointed. // This field doesn't need to be checkpointed.
Removing bool `json:"-"` Removing bool `json:"-"`

View File

@ -86,22 +86,9 @@ func (s *Store) Add(sb Sandbox) error {
return nil return nil
} }
// Get returns the sandbox with specified id. Returns store.ErrNotExist // Get returns the sandbox with specified id.
// if the sandbox doesn't exist. // Returns store.ErrNotExist if the sandbox doesn't exist.
func (s *Store) Get(id string) (Sandbox, error) { func (s *Store) Get(id string) (Sandbox, error) {
sb, err := s.GetAll(id)
if err != nil {
return sb, err
}
if sb.Status.Get().State == StateInit {
return Sandbox{}, store.ErrNotExist
}
return sb, nil
}
// GetAll returns the sandbox with specified id, including sandbox in unknown
// state. Returns store.ErrNotExist if the sandbox doesn't exist.
func (s *Store) GetAll(id string) (Sandbox, error) {
s.lock.RLock() s.lock.RLock()
defer s.lock.RUnlock() defer s.lock.RUnlock()
id, err := s.idIndex.Get(id) id, err := s.idIndex.Get(id)
@ -123,9 +110,6 @@ func (s *Store) List() []Sandbox {
defer s.lock.RUnlock() defer s.lock.RUnlock()
var sandboxes []Sandbox var sandboxes []Sandbox
for _, sb := range s.sandboxes { for _, sb := range s.sandboxes {
if sb.Status.Get().State == StateInit {
continue
}
sandboxes = append(sandboxes, sb) sandboxes = append(sandboxes, sb)
} }
return sandboxes return sandboxes

View File

@ -26,11 +26,11 @@ import (
// | | // | |
// | Create(Run) | Load // | Create(Run) | Load
// | | // | |
// Start +----v----+ | // Start | |
// (failed) | | | // (failed) | |
// +-------------+ INIT | +-----------+ // +------------------+ +-----------+
// | | | | | // | | | |
// | +----+----+ | | // | | | |
// | | | | // | | | |
// | | Start(Run) | | // | | Start(Run) | |
// | | | | // | | | |
@ -53,23 +53,17 @@ import (
// +-------------> DELETED // +-------------> DELETED
// State is the sandbox state we use in containerd/cri. // State is the sandbox state we use in containerd/cri.
// It includes init and unknown, which are internal states not defined in CRI. // It includes unknown, which is internal states not defined in CRI.
// The state mapping from internal states to CRI states: // The state mapping from internal states to CRI states:
// * ready -> ready // * ready -> ready
// * not ready -> not ready // * not ready -> not ready
// * init -> not exist
// * unknown -> not ready // * unknown -> not ready
type State uint32 type State uint32
const ( const (
// StateInit is init state of sandbox. Sandbox
// is in init state before its corresponding sandbox container
// is created. Sandbox in init state should be ignored by most
// functions, unless the caller needs to update sandbox state.
StateInit State = iota
// StateReady is ready state, it means sandbox container // StateReady is ready state, it means sandbox container
// is running. // is running.
StateReady StateReady = iota
// StateNotReady is notready state, it ONLY means sandbox // StateNotReady is notready state, it ONLY means sandbox
// container is not running. // container is not running.
// StopPodSandbox should still be called for NOTREADY sandbox to // StopPodSandbox should still be called for NOTREADY sandbox to

View File

@ -1,11 +1,11 @@
github.com/beorn7/perks 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 github.com/beorn7/perks 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895 github.com/BurntSushi/toml v0.3.1
github.com/containerd/cgroups 1152b960fcee041f50df15cdc67c29dbccf801ef github.com/containerd/cgroups 4994991857f9b0ae8dc439551e8bebdbb4bf66c1
github.com/containerd/console c12b1e7919c14469339a5d38f2f8ed9b64a9de23 github.com/containerd/console c12b1e7919c14469339a5d38f2f8ed9b64a9de23
github.com/containerd/containerd f2a20ead833f8caf3ffc12be058d6ce668b4ebed github.com/containerd/containerd 32e788a8be3ab4418265693d9e742c30495fdd4c
github.com/containerd/continuity bd77b46c8352f74eb12c85bdc01f4b90f69d66b4 github.com/containerd/continuity bd77b46c8352f74eb12c85bdc01f4b90f69d66b4
github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c
github.com/containerd/go-cni 40bcf8ec8acd7372be1d77031d585d5d8e561c90 github.com/containerd/go-cni 891c2a41e18144b2d7921f971d6c9789a68046b2
github.com/containerd/go-runc 5a6d9f37cfa36b15efba46dc7ea349fa9b7143c3 github.com/containerd/go-runc 5a6d9f37cfa36b15efba46dc7ea349fa9b7143c3
github.com/containerd/ttrpc f02858b1457c5ca3aaec3a0803eb0d59f96e41d6 github.com/containerd/ttrpc f02858b1457c5ca3aaec3a0803eb0d59f96e41d6
github.com/containerd/typeurl a93fcdb778cd272c6e9b3028b2f42d813e785d40 github.com/containerd/typeurl a93fcdb778cd272c6e9b3028b2f42d813e785d40
@ -21,35 +21,35 @@ github.com/docker/go-units v0.3.1
github.com/docker/spdystream 449fdfce4d962303d702fec724ef0ad181c92528 github.com/docker/spdystream 449fdfce4d962303d702fec724ef0ad181c92528
github.com/emicklei/go-restful v2.2.1 github.com/emicklei/go-restful v2.2.1
github.com/godbus/dbus v3 github.com/godbus/dbus v3
github.com/gogo/googleapis 08a7655d27152912db7aaf4f983275eaf8d128ef github.com/gogo/googleapis v1.2.0
github.com/gogo/protobuf v1.0.0 github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.1.0 github.com/golang/protobuf v1.2.0
github.com/google/gofuzz 44d81051d367757e1c7c6a5a86423ece9afcf63c github.com/google/gofuzz 44d81051d367757e1c7c6a5a86423ece9afcf63c
github.com/grpc-ecosystem/go-grpc-prometheus v1.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.1
github.com/json-iterator/go 1.1.5 github.com/json-iterator/go 1.1.5
github.com/matttproud/golang_protobuf_extensions v1.0.0 github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/Microsoft/go-winio c599b533b43b1363d7d7c6cfda5ede70ed73ff13 github.com/Microsoft/go-winio c599b533b43b1363d7d7c6cfda5ede70ed73ff13
github.com/Microsoft/hcsshim 8abdbb8205e4192c68b5f84c31197156f31be517 github.com/Microsoft/hcsshim 8abdbb8205e4192c68b5f84c31197156f31be517
github.com/modern-go/concurrent 1.0.3 github.com/modern-go/concurrent 1.0.3
github.com/modern-go/reflect2 1.0.1 github.com/modern-go/reflect2 1.0.1
github.com/opencontainers/go-digest c9281466c8b2f606084ac71339773efd177436e7 github.com/opencontainers/go-digest c9281466c8b2f606084ac71339773efd177436e7
github.com/opencontainers/image-spec v1.0.1 github.com/opencontainers/image-spec v1.0.1
github.com/opencontainers/runc 12f6a991201fdb8f82579582d5e00e28fba06d0a github.com/opencontainers/runc 029124da7af7360afa781a0234d1b083550f797c
github.com/opencontainers/runtime-spec eba862dc2470385a233c7507392675cbeadf7353 github.com/opencontainers/runtime-spec 29686dbc5559d93fb1ef402eeda3e35c38d75af4
github.com/opencontainers/selinux v1.2.1 github.com/opencontainers/selinux v1.2.1
github.com/pkg/errors v0.8.0 github.com/pkg/errors v0.8.1
github.com/pmezard/go-difflib v1.0.0 github.com/pmezard/go-difflib v1.0.0
github.com/prometheus/client_golang f4fb1b73fb099f396a7f0036bf86aa8def4ed823 github.com/prometheus/client_golang f4fb1b73fb099f396a7f0036bf86aa8def4ed823
github.com/prometheus/client_model 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c github.com/prometheus/client_model 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c
github.com/prometheus/common 89604d197083d4781071d3c65855d24ecfb0a563 github.com/prometheus/common 89604d197083d4781071d3c65855d24ecfb0a563
github.com/prometheus/procfs cb4147076ac75738c9a7d279075a253c0cc5acbd github.com/prometheus/procfs cb4147076ac75738c9a7d279075a253c0cc5acbd
github.com/seccomp/libseccomp-golang 32f571b70023028bd57d9288c20efbcb237f3ce0 github.com/seccomp/libseccomp-golang 32f571b70023028bd57d9288c20efbcb237f3ce0
github.com/sirupsen/logrus v1.0.3 github.com/sirupsen/logrus v1.4.1
github.com/stretchr/testify v1.1.4 github.com/stretchr/testify v1.1.4
github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16 github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16
github.com/tchap/go-patricia v2.2.6 github.com/tchap/go-patricia v2.2.6
github.com/urfave/cli 7bc6a0acffa589f415f88aca16cc1de5ffd66f9c github.com/urfave/cli 7bc6a0acffa589f415f88aca16cc1de5ffd66f9c
go.etcd.io/bbolt v1.3.1-etcd.8 go.etcd.io/bbolt v1.3.2
golang.org/x/crypto 49796115aa4b964c318aad4f3084fdb41e9aa067 golang.org/x/crypto 49796115aa4b964c318aad4f3084fdb41e9aa067
golang.org/x/net b3756b4b77d7b13260a0a2ec658753cf48922eac golang.org/x/net b3756b4b77d7b13260a0a2ec658753cf48922eac
golang.org/x/oauth2 a6bd8cefa1811bd24b86f8902872e4e8225f74c4 golang.org/x/oauth2 a6bd8cefa1811bd24b86f8902872e4e8225f74c4

View File

@ -47,3 +47,14 @@ func main() {
fmt.Printf("IP of the default interface %s:%s", defaultIfName, IP) fmt.Printf("IP of the default interface %s:%s", defaultIfName, IP)
} }
``` ```
## Project details
The go-cni is a containerd sub-project, licensed under the [Apache 2.0 license](./LICENSE).
As a containerd sub-project, you will find the:
* [Project governance](https://github.com/containerd/project/blob/master/GOVERNANCE.md),
* [Maintainers](https://github.com/containerd/project/blob/master/MAINTAINERS),
* and [Contributing guidelines](https://github.com/containerd/project/blob/master/CONTRIBUTING.md)
information in our [`containerd/project`](https://github.com/containerd/project) repository.

View File

@ -22,6 +22,7 @@ import (
"sync" "sync"
cnilibrary "github.com/containernetworking/cni/libcni" cnilibrary "github.com/containernetworking/cni/libcni"
"github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current" "github.com/containernetworking/cni/pkg/types/current"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -35,6 +36,34 @@ type CNI interface {
Load(opts ...CNIOpt) error Load(opts ...CNIOpt) error
// Status checks the status of the cni initialization // Status checks the status of the cni initialization
Status() error Status() error
// GetConfig returns a copy of the CNI plugin configurations as parsed by CNI
GetConfig() *ConfigResult
}
type ConfigResult struct {
PluginDirs []string
PluginConfDir string
Prefix string
Networks []*ConfNetwork
}
type ConfNetwork struct {
Config *NetworkConfList
IFName string
}
// NetworkConfList is a source bytes to string version of cnilibrary.NetworkConfigList
type NetworkConfList struct {
Name string
CNIVersion string
Plugins []*NetworkConf
Source string
}
// NetworkConf is a source bytes to string conversion of cnilibrary.NetworkConfig
type NetworkConf struct {
Network *types.NetConf
Source string
} }
type libcni struct { type libcni struct {
@ -60,6 +89,7 @@ func defaultCNIConfig() *libcni {
} }
} }
// New creates a new libcni instance.
func New(config ...CNIOpt) (CNI, error) { func New(config ...CNIOpt) (CNI, error) {
cni := defaultCNIConfig() cni := defaultCNIConfig()
var err error var err error
@ -71,6 +101,7 @@ func New(config ...CNIOpt) (CNI, error) {
return cni, nil return cni, nil
} }
// Load loads the latest config from cni config files.
func (c *libcni) Load(opts ...CNIOpt) error { func (c *libcni) Load(opts ...CNIOpt) error {
var err error var err error
c.Lock() c.Lock()
@ -87,17 +118,27 @@ func (c *libcni) Load(opts ...CNIOpt) error {
return nil return nil
} }
// Status returns the status of CNI initialization.
func (c *libcni) Status() error { func (c *libcni) Status() error {
c.RLock() c.RLock()
defer c.RUnlock() defer c.RUnlock()
return c.status() if len(c.networks) < c.networkCount {
return ErrCNINotInitialized
}
return nil
}
// Networks returns all the configured networks.
// NOTE: Caller MUST NOT modify anything in the returned array.
func (c *libcni) Networks() []*Network {
c.RLock()
defer c.RUnlock()
return append([]*Network{}, c.networks...)
} }
// Setup setups the network in the namespace // Setup setups the network in the namespace
func (c *libcni) Setup(id string, path string, opts ...NamespaceOpts) (*CNIResult, error) { func (c *libcni) Setup(id string, path string, opts ...NamespaceOpts) (*CNIResult, error) {
c.RLock() if err := c.Status(); err != nil {
defer c.RUnlock()
if err := c.status(); err != nil {
return nil, err return nil, err
} }
ns, err := newNamespace(id, path, opts...) ns, err := newNamespace(id, path, opts...)
@ -105,7 +146,7 @@ func (c *libcni) Setup(id string, path string, opts ...NamespaceOpts) (*CNIResul
return nil, err return nil, err
} }
var results []*current.Result var results []*current.Result
for _, network := range c.networks { for _, network := range c.Networks() {
r, err := network.Attach(ns) r, err := network.Attach(ns)
if err != nil { if err != nil {
return nil, err return nil, err
@ -117,16 +158,14 @@ func (c *libcni) Setup(id string, path string, opts ...NamespaceOpts) (*CNIResul
// Remove removes the network config from the namespace // Remove removes the network config from the namespace
func (c *libcni) Remove(id string, path string, opts ...NamespaceOpts) error { func (c *libcni) Remove(id string, path string, opts ...NamespaceOpts) error {
c.RLock() if err := c.Status(); err != nil {
defer c.RUnlock()
if err := c.status(); err != nil {
return err return err
} }
ns, err := newNamespace(id, path, opts...) ns, err := newNamespace(id, path, opts...)
if err != nil { if err != nil {
return err return err
} }
for _, network := range c.networks { for _, network := range c.Networks() {
if err := network.Remove(ns); err != nil { if err := network.Remove(ns); err != nil {
// Based on CNI spec v0.7.0, empty network namespace is allowed to // Based on CNI spec v0.7.0, empty network namespace is allowed to
// do best effort cleanup. However, it is not handled consistently // do best effort cleanup. However, it is not handled consistently
@ -143,13 +182,35 @@ func (c *libcni) Remove(id string, path string, opts ...NamespaceOpts) error {
return nil return nil
} }
// GetConfig returns a copy of the CNI plugin configurations as parsed by CNI
func (c *libcni) GetConfig() *ConfigResult {
c.RLock()
defer c.RUnlock()
r := &ConfigResult{
PluginDirs: c.config.pluginDirs,
PluginConfDir: c.config.pluginConfDir,
Prefix: c.config.prefix,
}
for _, network := range c.networks {
conf := &NetworkConfList{
Name: network.config.Name,
CNIVersion: network.config.CNIVersion,
Source: string(network.config.Bytes),
}
for _, plugin := range network.config.Plugins {
conf.Plugins = append(conf.Plugins, &NetworkConf{
Network: plugin.Network,
Source: string(plugin.Bytes),
})
}
r.Networks = append(r.Networks, &ConfNetwork{
Config: conf,
IFName: network.ifName,
})
}
return r
}
func (c *libcni) reset() { func (c *libcni) reset() {
c.networks = nil c.networks = nil
} }
func (c *libcni) status() error {
if len(c.networks) < c.networkCount {
return ErrCNINotInitialized
}
return nil
}

View File

@ -1,3 +1,19 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cni package cni
import ( import (
@ -13,7 +29,7 @@ var (
ErrLoad = errors.New("failed to load cni config") ErrLoad = errors.New("failed to load cni config")
) )
// IsCNINotInitialized returns true if the error is due cni config not being intialized // IsCNINotInitialized returns true if the error is due to cni config not being initialized
func IsCNINotInitialized(err error) bool { func IsCNINotInitialized(err error) bool {
return errors.Cause(err) == ErrCNINotInitialized return errors.Cause(err) == ErrCNINotInitialized
} }

View File

@ -55,7 +55,7 @@ func WithPluginConfDir(dir string) CNIOpt {
} }
// WithMinNetworkCount can be used to configure the // WithMinNetworkCount can be used to configure the
// minimum networks to be configured and initalized // minimum networks to be configured and initialized
// for the status to report success. By default its 1. // for the status to report success. By default its 1.
func WithMinNetworkCount(count int) CNIOpt { func WithMinNetworkCount(count int) CNIOpt {
return func(c *libcni) error { return func(c *libcni) error {

View File

@ -54,6 +54,9 @@ type Config struct {
// c) DNS information. Dictionary that includes DNS information for nameservers, // c) DNS information. Dictionary that includes DNS information for nameservers,
// domain, search domains and options. // domain, search domains and options.
func (c *libcni) GetCNIResultFromResults(results []*current.Result) (*CNIResult, error) { func (c *libcni) GetCNIResultFromResults(results []*current.Result) (*CNIResult, error) {
c.RLock()
defer c.RUnlock()
r := &CNIResult{ r := &CNIResult{
Interfaces: make(map[string]*Config), Interfaces: make(map[string]*Config),
} }