Merge pull request #332 from Random-Liu/use-container-update

Use container update
This commit is contained in:
Lantao Liu 2017-10-06 13:20:30 -07:00 committed by GitHub
commit e77795b968
76 changed files with 905 additions and 410 deletions

View File

@ -1,5 +1,5 @@
RUNC_VERSION=593914b8bd5448a93f7c3e4902a03408b6d5c0ce
RUNC_VERSION=0351df1c5a66838d0c392b4ac4cf9450de844e2d
CNI_VERSION=v0.6.0
CONTAINERD_VERSION=v1.0.0-beta.1
CONTAINERD_VERSION=8558b98eb19aeb415d5da331e9c17c2513717671
CRITOOL_VERSION=v0.2
KUBERNETES_VERSION=c6a3f26988bce604c035dcd9024ce775ae4aafef

View File

@ -60,7 +60,7 @@ func TestUpdateContainerResources(t *testing.T) {
t.Logf("Check memory limit in container OCI spec")
container, err := containerdClient.LoadContainer(context.Background(), cn)
require.NoError(t, err)
spec, err := container.Spec()
spec, err := container.Spec(context.Background())
require.NoError(t, err)
checkMemoryLimit(t, spec, 2*1024*1024)
@ -71,9 +71,7 @@ func TestUpdateContainerResources(t *testing.T) {
require.NoError(t, err)
t.Logf("Check memory limit in container OCI spec")
container, err = containerdClient.LoadContainer(context.Background(), cn)
require.NoError(t, err)
spec, err = container.Spec()
spec, err = container.Spec(context.Background())
require.NoError(t, err)
checkMemoryLimit(t, spec, 4*1024*1024)
@ -96,9 +94,7 @@ func TestUpdateContainerResources(t *testing.T) {
require.NoError(t, err)
t.Logf("Check memory limit in container OCI spec")
container, err = containerdClient.LoadContainer(context.Background(), cn)
require.NoError(t, err)
spec, err = container.Spec()
spec, err = container.Spec(context.Background())
require.NoError(t, err)
checkMemoryLimit(t, spec, 8*1024*1024)

View File

@ -61,7 +61,7 @@ func (c *criContainerdService) attachContainer(ctx context.Context, id string, s
return fmt.Errorf("container is in %s state", criContainerStateToString(state))
}
task, err := cntr.Container.Get().Task(ctx, nil)
task, err := cntr.Container.Task(ctx, nil)
if err != nil {
return fmt.Errorf("failed to load task: %v", err)
}

View File

@ -87,8 +87,8 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state))
}
container := cntr.Container.Get()
spec, err := container.Spec()
container := cntr.Container
spec, err := container.Spec(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get container spec: %v", err)
}

View File

@ -75,7 +75,7 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R
}
// Delete containerd container.
if err := container.Container.Get().Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
if err := container.Container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
if !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to delete containerd container %q: %v", id, err)
}

View File

@ -59,7 +59,7 @@ func (c *criContainerdService) startContainer(ctx context.Context,
status *containerstore.Status) (retErr error) {
id := cntr.ID
meta := cntr.Metadata
container := cntr.Container.Get()
container := cntr.Container
config := meta.Config
// Return error if container is not in created state.

View File

@ -88,7 +88,7 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont
}
}
glog.V(2).Infof("Stop container %q with signal %v", id, stopSignal)
task, err := container.Container.Get().Task(ctx, nil)
task, err := container.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container, task not found for container %q: %v", id, err)
@ -111,7 +111,7 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont
glog.Errorf("Stop container %q timed out: %v", id, err)
}
task, err := container.Container.Get().Task(ctx, nil)
task, err := container.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container, task not found for container %q: %v", id, err)

View File

@ -17,9 +17,11 @@ limitations under the License.
package server
import (
gocontext "context"
"fmt"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/typeurl"
"github.com/golang/glog"
@ -63,7 +65,7 @@ func (c *criContainerdService) updateContainerResources(ctx context.Context,
// spec makes sure that the resource limits are correct when start;
// if the container is already started, updating spec is still required,
// the spec will become our source of truth for resource limits.
oldSpec, err := cntr.Container.Get().Spec()
oldSpec, err := cntr.Container.Spec(ctx)
if err != nil {
return fmt.Errorf("failed to get container spec: %v", err)
}
@ -72,42 +74,15 @@ func (c *criContainerdService) updateContainerResources(ctx context.Context,
return fmt.Errorf("failed to update resource in spec: %v", err)
}
info := cntr.Container.Get().Info()
any, err := typeurl.MarshalAny(newSpec)
if err != nil {
return fmt.Errorf("failed to marshal spec %+v: %v", newSpec, err)
}
info.Spec = any
// TODO(random-liu): Add helper function in containerd to do the update.
if _, err := c.client.ContainerService().Update(ctx, info, "spec"); err != nil {
return fmt.Errorf("failed to update container spec: %v", err)
if err := updateContainerSpec(ctx, cntr.Container, newSpec); err != nil {
return err
}
defer func() {
if retErr != nil {
// Reset spec on error.
any, err := typeurl.MarshalAny(oldSpec)
if err != nil {
glog.Errorf("Failed to marshal spec %+v for container %q: %v", oldSpec, id, err)
return
if err := updateContainerSpec(ctx, cntr.Container, oldSpec); err != nil {
glog.Errorf("Failed to update spec %+v for container %q: %v", oldSpec, id, err)
}
info.Spec = any
if _, err := c.client.ContainerService().Update(ctx, info, "spec"); err != nil {
glog.Errorf("Failed to recover spec %+v for container %q: %v", oldSpec, id, err)
}
}
}()
container, err := c.client.LoadContainer(ctx, id)
if err != nil {
return fmt.Errorf("failed to load container: %v", err)
}
defer func() {
if retErr == nil {
// Update container client if no error is returned.
// NOTE(random-liu): By updating container client, we'll be able
// to get latest OCI spec from it, which includes the up-to-date
// container resource limits. This will be useful after the debug
// api is introduced.
cntr.Container.Set(container)
}
}()
@ -117,7 +92,7 @@ func (c *criContainerdService) updateContainerResources(ctx context.Context,
return nil
}
task, err := container.Task(ctx, nil)
task, err := cntr.Container.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
// Task exited already.
@ -136,6 +111,21 @@ func (c *criContainerdService) updateContainerResources(ctx context.Context,
return nil
}
// updateContainerSpec updates container spec.
func updateContainerSpec(ctx context.Context, cntr containerd.Container, spec *runtimespec.Spec) error {
any, err := typeurl.MarshalAny(spec)
if err != nil {
return fmt.Errorf("failed to marshal spec %+v: %v", spec, err)
}
if err := cntr.Update(ctx, func(ctx gocontext.Context, client *containerd.Client, c *containers.Container) error {
c.Spec = any
return nil
}); err != nil {
return fmt.Errorf("failed to update container spec: %v", err)
}
return nil
}
// updateOCILinuxResource updates container resource limit.
func updateOCILinuxResource(spec *runtimespec.Spec, new *runtime.LinuxContainerResources) (*runtimespec.Spec, error) {
// Copy to make sure old spec is not changed.

View File

@ -104,7 +104,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
return
}
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Get().Task(context.Background(),
task, err := cntr.Container.Task(context.Background(),
func(*containerd.FIFOSet) (containerd.IO, error) {
return cntr.IO, nil
},

View File

@ -127,7 +127,11 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir
id := cntr.ID()
var container containerstore.Container
// Load container metadata.
ext, ok := cntr.Extensions()[containerMetadataExtension]
exts, err := cntr.Extensions(ctx)
if err != nil {
return container, fmt.Errorf("failed to get container extensions: %v", err)
}
ext, ok := exts[containerMetadataExtension]
if !ok {
return container, fmt.Errorf("metadata extension %q not found", containerMetadataExtension)
}
@ -278,7 +282,11 @@ func unknownContainerStatus() containerstore.Status {
func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) {
var sandbox sandboxstore.Sandbox
// Load sandbox metadata.
ext, ok := cntr.Extensions()[sandboxMetadataExtension]
exts, err := cntr.Extensions(ctx)
if err != nil {
return sandbox, fmt.Errorf("failed to get sandbox container extensions: %v", err)
}
ext, ok := exts[sandboxMetadataExtension]
if !ok {
return sandbox, fmt.Errorf("metadata extension %q not found", sandboxMetadataExtension)
}

View File

@ -22,6 +22,7 @@ import (
tasks "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
@ -55,7 +56,15 @@ func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.Li
state = runtime.PodSandboxState_SANDBOX_READY
}
createdAt := sandboxInStore.Container.Info().CreatedAt
info, err := sandboxInStore.Container.Info(ctx)
if err != nil {
// It's possible that container gets deleted during list.
if errdefs.IsNotFound(err) {
continue
}
return nil, fmt.Errorf("failed to get sandbox container %q info: %v", sandboxInStore.ID, err)
}
createdAt := info.CreatedAt
sandboxes = append(sandboxes, toCRISandbox(sandboxInStore.Metadata, state, createdAt))
}

View File

@ -34,15 +34,12 @@ import (
func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandboxStatusRequest) (*runtime.PodSandboxStatusResponse, error) {
sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId())
if err != nil {
return nil, fmt.Errorf("an error occurred when try to find sandbox %q: %v",
r.GetPodSandboxId(), err)
return nil, fmt.Errorf("an error occurred when try to find sandbox: %v", err)
}
// Use the full sandbox id.
id := sandbox.ID
task, err := sandbox.Container.Task(ctx, nil)
if err != nil && !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err)
return nil, fmt.Errorf("failed to get sandbox container task: %v", err)
}
// Set sandbox state to NOTREADY by default.
@ -51,7 +48,7 @@ func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime.
if task != nil {
taskStatus, err := task.Status(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get task status for sandbox container %q: %v", id, err)
return nil, fmt.Errorf("failed to get task status: %v", err)
}
if taskStatus.Status == containerd.Running {
@ -64,7 +61,11 @@ func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime.
return nil, fmt.Errorf("failed to get sandbox ip: %v", err)
}
createdAt := sandbox.Container.Info().CreatedAt
info, err := sandbox.Container.Info(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox container info: %v", err)
}
createdAt := info.CreatedAt
status := toCRISandboxStatus(sandbox.Metadata, state, createdAt, ip)
return &runtime.PodSandboxStatusResponse{Status: status}, nil
}

View File

@ -1,45 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"sync"
"github.com/containerd/containerd"
)
// Client holds the containerd container client.
// containerd.Container is a pointer underlying. New assignment won't affect
// the previous pointer, so simply lock around is enough.
type Client struct {
lock sync.RWMutex
container containerd.Container
}
// Get containerd container client.
func (c *Client) Get() containerd.Container {
c.lock.RLock()
defer c.lock.RUnlock()
return c.container
}
// Set containerd container client.
func (c *Client) Set(container containerd.Container) {
c.lock.Lock()
defer c.lock.Unlock()
c.container = container
}

View File

@ -33,7 +33,7 @@ type Container struct {
// Status stores the status of the container.
Status StatusStorage
// Container is the containerd container client.
Container *Client
Container containerd.Container
// Container IO
IO *cio.ContainerIO
// TODO(random-liu): Add stop channel to get rid of stop poll waiting.
@ -45,7 +45,7 @@ type Opts func(*Container) error
// WithContainer adds the containerd Container to the internal data store.
func WithContainer(cntr containerd.Container) Opts {
return func(c *Container) error {
c.Container = &Client{container: cntr}
c.Container = cntr
return nil
}
}
@ -95,10 +95,6 @@ type Store struct {
// TODO(random-liu): Add trunc index.
}
// LoadStore loads containers from runtime.
// TODO(random-liu): Implement LoadStore.
func LoadStore() *Store { return nil }
// NewStore creates a container store.
func NewStore() *Store {
return &Store{containers: make(map[string]Container)}

View File

@ -51,10 +51,6 @@ type Store struct {
// TODO(random-liu): Add trunc index.
}
// LoadStore loads images from runtime.
// TODO(random-liu): Implement LoadStore.
func LoadStore() *Store { return nil }
// NewStore creates an image store.
func NewStore() *Store {
return &Store{images: make(map[string]Image)}

View File

@ -42,10 +42,6 @@ type Store struct {
// TODO(random-liu): Add trunc index.
}
// LoadStore loads sandboxes from runtime.
// TODO(random-liu): Implement LoadStore.
func LoadStore() *Store { return nil }
// NewStore creates a sandbox store.
func NewStore() *Store {
return &Store{sandboxes: make(map[string]Sandbox)}

View File

@ -2,7 +2,7 @@ github.com/blang/semver v3.1.0
github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895
github.com/containerd/cgroups 5933ab4dc4f7caa3a73a1dc141bd11f42b5c9163
github.com/containerd/containerd v1.0.0-beta.1
github.com/containerd/containerd 8558b98eb19aeb415d5da331e9c17c2513717671
github.com/containerd/continuity cf279e6ac893682272b4479d4c67fd3abf878b4e
github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
github.com/containerd/typeurl f6943554a7e7e88b3c14aad190bf05932da84788
@ -37,7 +37,7 @@ github.com/mailru/easyjson d5b7844b561a7bc640052f1b935f7b800330d7e0
github.com/Microsoft/go-winio v0.4.4
github.com/opencontainers/go-digest 21dfd564fd89c944783d00d069f33e3e7123c448
github.com/opencontainers/image-spec v1.0.0
github.com/opencontainers/runc 593914b8bd5448a93f7c3e4902a03408b6d5c0ce
github.com/opencontainers/runc 0351df1c5a66838d0c392b4ac4cf9450de844e2d
github.com/opencontainers/runtime-spec v1.0.0
github.com/opencontainers/runtime-tools 6073aff4ac61897f75895123f7e24135204a404d
github.com/opencontainers/selinux 4a2974bf1ee960774ffd517717f1f45325af0206

View File

@ -101,8 +101,6 @@ func NewWithConn(conn *grpc.ClientConn, opts ...ClientOpt) (*Client, error) {
// using a uniform interface
type Client struct {
conn *grpc.ClientConn
defaultns string
runtime string
}
@ -178,6 +176,9 @@ type RemoteContext struct {
// Snapshotter used for unpacking
Snapshotter string
// Labels to be applied to the created image
Labels map[string]string
// BaseHandlers are a set of handlers which get are called on dispatch.
// These handlers always get called before any operation specific
// handlers.
@ -199,7 +200,7 @@ func defaultRemoteContext() *RemoteContext {
}
// Pull downloads the provided content into containerd's content store
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Image, error) {
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image, error) {
pullCtx := defaultRemoteContext()
for _, o := range opts {
if err := o(c, pullCtx); err != nil {
@ -244,6 +245,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Imag
imgrec := images.Image{
Name: name,
Target: desc,
Labels: pullCtx.Labels,
}
is := c.ImageService()
@ -275,7 +277,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Imag
}
// Push uploads the provided content to a remote resource
func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpts) error {
func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpt) error {
pushCtx := defaultRemoteContext()
for _, o := range opts {
if err := o(c, pushCtx); err != nil {
@ -298,7 +300,7 @@ func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor,
m.Lock()
manifestStack = append(manifestStack, desc)
m.Unlock()
return nil, images.StopHandler
return nil, images.ErrStopHandler
default:
return nil, nil
}
@ -340,8 +342,8 @@ func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) {
}
// ListImages returns all existing images
func (c *Client) ListImages(ctx context.Context) ([]Image, error) {
imgs, err := c.ImageService().List(ctx)
func (c *Client) ListImages(ctx context.Context, filters ...string) ([]Image, error) {
imgs, err := c.ImageService().List(ctx, filters...)
if err != nil {
return nil, err
}
@ -406,42 +408,52 @@ func (c *Client) Close() error {
return c.conn.Close()
}
// NamespaceService returns the underlying NamespacesClient
func (c *Client) NamespaceService() namespacesapi.NamespacesClient {
return namespacesapi.NewNamespacesClient(c.conn)
}
// ContainerService returns the underlying container Store
func (c *Client) ContainerService() containers.Store {
return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn))
}
// ContentStore returns the underlying content Store
func (c *Client) ContentStore() content.Store {
return contentservice.NewStoreFromClient(contentapi.NewContentClient(c.conn))
}
// SnapshotService returns the underlying snapshotter for the provided snapshotter name
func (c *Client) SnapshotService(snapshotterName string) snapshot.Snapshotter {
return snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotsClient(c.conn), snapshotterName)
}
// TaskService returns the underlying TasksClient
func (c *Client) TaskService() tasks.TasksClient {
return tasks.NewTasksClient(c.conn)
}
// ImageService returns the underlying image Store
func (c *Client) ImageService() images.Store {
return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(c.conn))
}
// DiffService returns the underlying DiffService
func (c *Client) DiffService() diff.DiffService {
return diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn))
}
// HealthService returns the underlying GRPC HealthClient
func (c *Client) HealthService() grpc_health_v1.HealthClient {
return grpc_health_v1.NewHealthClient(c.conn)
}
// EventService returns the underlying EventsClient
func (c *Client) EventService() eventsapi.EventsClient {
return eventsapi.NewEventsClient(c.conn)
}
// VersionService returns the underlying VersionClient
func (c *Client) VersionService() versionservice.VersionClient {
return versionservice.NewVersionClient(c.conn)
}
@ -475,11 +487,38 @@ const (
type importOpts struct {
format imageFormat
refObject string
labels map[string]string
}
// ImportOpt allows the caller to specify import specific options
type ImportOpt func(c *importOpts) error
// WithImportLabel sets a label to be associated with an imported image
func WithImportLabel(key, value string) ImportOpt {
return func(opts *importOpts) error {
if opts.labels == nil {
opts.labels = make(map[string]string)
}
opts.labels[key] = value
return nil
}
}
// WithImportLabels associates a set of labels to an imported image
func WithImportLabels(labels map[string]string) ImportOpt {
return func(opts *importOpts) error {
if opts.labels == nil {
opts.labels = make(map[string]string)
}
for k, v := range labels {
opts.labels[k] = v
}
return nil
}
}
// WithOCIImportFormat sets the import format for an OCI image format
func WithOCIImportFormat() ImportOpt {
return func(c *importOpts) error {

View File

@ -33,25 +33,51 @@ func WithDialOpts(opts []grpc.DialOption) ClientOpt {
}
}
// RemoteOpts allows the caller to set distribution options for a remote
type RemoteOpts func(*Client, *RemoteContext) error
// RemoteOpt allows the caller to set distribution options for a remote
type RemoteOpt func(*Client, *RemoteContext) error
// WithPullUnpack is used to unpack an image after pull. This
// uses the snapshotter, content store, and diff service
// configured for the client.
func WithPullUnpack(client *Client, c *RemoteContext) error {
func WithPullUnpack(_ *Client, c *RemoteContext) error {
c.Unpack = true
return nil
}
// WithPullSnapshotter specifies snapshotter name used for unpacking
func WithPullSnapshotter(snapshotterName string) RemoteOpts {
return func(client *Client, c *RemoteContext) error {
func WithPullSnapshotter(snapshotterName string) RemoteOpt {
return func(_ *Client, c *RemoteContext) error {
c.Snapshotter = snapshotterName
return nil
}
}
// WithPullLabel sets a label to be associated with a pulled reference
func WithPullLabel(key, value string) RemoteOpt {
return func(_ *Client, rc *RemoteContext) error {
if rc.Labels == nil {
rc.Labels = make(map[string]string)
}
rc.Labels[key] = value
return nil
}
}
// WithPullLabels associates a set of labels to a pulled reference
func WithPullLabels(labels map[string]string) RemoteOpt {
return func(_ *Client, rc *RemoteContext) error {
if rc.Labels == nil {
rc.Labels = make(map[string]string)
}
for k, v := range labels {
rc.Labels[k] = v
}
return nil
}
}
// WithSchema1Conversion is used to convert Docker registry schema 1
// manifests to oci manifests on pull. Without this option schema 1
// manifests will return a not supported error.
@ -61,7 +87,7 @@ func WithSchema1Conversion(client *Client, c *RemoteContext) error {
}
// WithResolver specifies the resolver to use.
func WithResolver(resolver remotes.Resolver) RemoteOpts {
func WithResolver(resolver remotes.Resolver) RemoteOpt {
return func(client *Client, c *RemoteContext) error {
c.Resolver = resolver
return nil
@ -69,7 +95,7 @@ func WithResolver(resolver remotes.Resolver) RemoteOpts {
}
// WithImageHandler adds a base handler to be called on dispatch.
func WithImageHandler(h images.Handler) RemoteOpts {
func WithImageHandler(h images.Handler) RemoteOpt {
return func(client *Client, c *RemoteContext) error {
c.BaseHandlers = append(c.BaseHandlers, h)
return nil

View File

@ -5,7 +5,6 @@ import (
"encoding/json"
"path/filepath"
"strings"
"sync"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types"
@ -17,21 +16,18 @@ import (
"github.com/pkg/errors"
)
// DeleteOpts allows the caller to set options for the deletion of a container
type DeleteOpts func(context.Context, *Client, containers.Container) error
// Container is a metadata object for container resources and task creation
type Container interface {
// ID identifies the container
ID() string
// Info returns the underlying container record type
Info() containers.Container
Info(context.Context) (containers.Container, error)
// Delete removes the container
Delete(context.Context, ...DeleteOpts) error
// NewTask creates a new task based on the container metadata
NewTask(context.Context, IOCreation, ...NewTaskOpts) (Task, error)
// Spec returns the OCI runtime specification
Spec() (*specs.Spec, error)
Spec(context.Context) (*specs.Spec, error)
// Task returns the current task for the container
//
// If IOAttach options are passed the client will reattach to the IO for the running
@ -44,53 +40,53 @@ type Container interface {
// SetLabels sets the provided labels for the container and returns the final label set
SetLabels(context.Context, map[string]string) (map[string]string, error)
// Extensions returns the extensions set on the container
Extensions() map[string]prototypes.Any
Extensions(context.Context) (map[string]prototypes.Any, error)
// Update a container
Update(context.Context, ...UpdateContainerOpts) error
}
func containerFromRecord(client *Client, c containers.Container) *container {
return &container{
client: client,
c: c,
id: c.ID,
}
}
var _ = (Container)(&container{})
type container struct {
mu sync.Mutex
client *Client
c containers.Container
id string
}
// ID returns the container's unique id
func (c *container) ID() string {
return c.c.ID
return c.id
}
func (c *container) Info() containers.Container {
return c.c
func (c *container) Info(ctx context.Context) (containers.Container, error) {
return c.get(ctx)
}
func (c *container) Labels(ctx context.Context) (map[string]string, error) {
r, err := c.client.ContainerService().Get(ctx, c.ID())
func (c *container) Extensions(ctx context.Context) (map[string]prototypes.Any, error) {
r, err := c.get(ctx)
if err != nil {
return nil, err
}
return r.Extensions, nil
}
c.c = r
m := make(map[string]string, len(r.Labels))
for k, v := range c.c.Labels {
m[k] = v
func (c *container) Labels(ctx context.Context) (map[string]string, error) {
r, err := c.get(ctx)
if err != nil {
return nil, err
}
return m, nil
return r.Labels, nil
}
func (c *container) SetLabels(ctx context.Context, labels map[string]string) (map[string]string, error) {
container := containers.Container{
ID: c.ID(),
ID: c.id,
Labels: labels,
}
@ -105,20 +101,17 @@ func (c *container) SetLabels(ctx context.Context, labels map[string]string) (ma
if err != nil {
return nil, err
}
c.c = r // update our local container
m := make(map[string]string, len(r.Labels))
for k, v := range c.c.Labels {
m[k] = v
}
return m, nil
return r.Labels, nil
}
// Spec returns the current OCI specification for the container
func (c *container) Spec() (*specs.Spec, error) {
func (c *container) Spec(ctx context.Context) (*specs.Spec, error) {
r, err := c.get(ctx)
if err != nil {
return nil, err
}
var s specs.Spec
if err := json.Unmarshal(c.c.Spec.Value, &s); err != nil {
if err := json.Unmarshal(r.Spec.Value, &s); err != nil {
return nil, err
}
return &s, nil
@ -126,20 +119,20 @@ func (c *container) Spec() (*specs.Spec, error) {
// Delete deletes an existing container
// an error is returned if the container has running tasks
func (c *container) Delete(ctx context.Context, opts ...DeleteOpts) (err error) {
if _, err := c.Task(ctx, nil); err == nil {
return errors.Wrapf(errdefs.ErrFailedPrecondition, "cannot delete running task %v", c.ID())
func (c *container) Delete(ctx context.Context, opts ...DeleteOpts) error {
if _, err := c.loadTask(ctx, nil); err == nil {
return errors.Wrapf(errdefs.ErrFailedPrecondition, "cannot delete running task %v", c.id)
}
r, err := c.get(ctx)
if err != nil {
return err
}
for _, o := range opts {
if err := o(ctx, c.client, c.c); err != nil {
if err := o(ctx, c.client, r); err != nil {
return err
}
}
if cerr := c.client.ContainerService().Delete(ctx, c.ID()); err == nil {
err = cerr
}
return err
return c.client.ContainerService().Delete(ctx, c.id)
}
func (c *container) Task(ctx context.Context, attach IOAttach) (Task, error) {
@ -148,12 +141,16 @@ func (c *container) Task(ctx context.Context, attach IOAttach) (Task, error) {
// Image returns the image that the container is based on
func (c *container) Image(ctx context.Context) (Image, error) {
if c.c.Image == "" {
return nil, errors.Wrapf(errdefs.ErrNotFound, "container not created from an image")
}
i, err := c.client.ImageService().Get(ctx, c.c.Image)
r, err := c.get(ctx)
if err != nil {
return nil, errors.Wrapf(err, "failed to get image for container")
return nil, err
}
if r.Image == "" {
return nil, errors.Wrap(errdefs.ErrNotFound, "container not created from an image")
}
i, err := c.client.ImageService().Get(ctx, r.Image)
if err != nil {
return nil, errors.Wrapf(err, "failed to get image %s for container", r.Image)
}
return &image{
client: c.client,
@ -161,34 +158,30 @@ func (c *container) Image(ctx context.Context) (Image, error) {
}, nil
}
func (c *container) Extensions() map[string]prototypes.Any {
c.mu.Lock()
defer c.mu.Unlock()
return c.c.Extensions
}
func (c *container) NewTask(ctx context.Context, ioCreate IOCreation, opts ...NewTaskOpts) (Task, error) {
c.mu.Lock()
defer c.mu.Unlock()
i, err := ioCreate(c.c.ID)
i, err := ioCreate(c.id)
if err != nil {
return nil, err
}
cfg := i.Config()
request := &tasks.CreateTaskRequest{
ContainerID: c.c.ID,
ContainerID: c.id,
Terminal: cfg.Terminal,
Stdin: cfg.Stdin,
Stdout: cfg.Stdout,
Stderr: cfg.Stderr,
}
if c.c.SnapshotKey != "" {
if c.c.Snapshotter == "" {
r, err := c.get(ctx)
if err != nil {
return nil, err
}
if r.SnapshotKey != "" {
if r.Snapshotter == "" {
return nil, errors.Wrapf(errdefs.ErrInvalidArgument, "unable to resolve rootfs mounts without snapshotter on container")
}
// get the rootfs from the snapshotter and add it to the request
mounts, err := c.client.SnapshotService(c.c.Snapshotter).Mounts(ctx, c.c.SnapshotKey)
mounts, err := c.client.SnapshotService(r.Snapshotter).Mounts(ctx, r.SnapshotKey)
if err != nil {
return nil, err
}
@ -225,7 +218,7 @@ func (c *container) NewTask(ctx context.Context, ioCreate IOCreation, opts ...Ne
t := &task{
client: c.client,
io: i,
id: c.ID(),
id: c.id,
}
if info.Checkpoint != nil {
request.Checkpoint = info.Checkpoint
@ -238,9 +231,26 @@ func (c *container) NewTask(ctx context.Context, ioCreate IOCreation, opts ...Ne
return t, nil
}
func (c *container) Update(ctx context.Context, opts ...UpdateContainerOpts) error {
// fetch the current container config before updating it
r, err := c.get(ctx)
if err != nil {
return err
}
for _, o := range opts {
if err := o(ctx, c.client, &r); err != nil {
return err
}
}
if _, err := c.client.ContainerService().Update(ctx, r); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, error) {
response, err := c.client.TaskService().Get(ctx, &tasks.GetRequest{
ContainerID: c.c.ID,
ContainerID: c.id,
})
if err != nil {
err = errdefs.FromGRPC(err)
@ -264,6 +274,10 @@ func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, erro
return t, nil
}
func (c *container) get(ctx context.Context) (containers.Container, error) {
return c.client.ContainerService().Get(ctx, c.id)
}
func attachExistingIO(response *tasks.GetResponse, ioAttach IOAttach) (IO, error) {
// get the existing fifo paths from the task information stored by the daemon
paths := &FIFOSet{

View File

@ -12,9 +12,15 @@ import (
"github.com/pkg/errors"
)
// DeleteOpts allows the caller to set options for the deletion of a container
type DeleteOpts func(ctx context.Context, client *Client, c containers.Container) error
// NewContainerOpts allows the caller to set additional options when creating a container
type NewContainerOpts func(ctx context.Context, client *Client, c *containers.Container) error
// UpdateContainerOpts allows the caller to set additional options when updating a container
type UpdateContainerOpts func(ctx context.Context, client *Client, c *containers.Container) error
// WithRuntime allows a user to specify the runtime name and additional options that should
// be used to create tasks for the container
func WithRuntime(name string, options interface{}) NewContainerOpts {

View File

@ -62,11 +62,13 @@ type Container struct {
Extensions map[string]types.Any
}
// RuntimeInfo holds runtime specific information
type RuntimeInfo struct {
Name string
Options *types.Any
}
// Store interacts with the underlying container storage
type Store interface {
Get(ctx context.Context, id string) (Container, error)

View File

@ -15,6 +15,7 @@ type remoteContainers struct {
var _ containers.Store = &remoteContainers{}
// NewRemoteContainerStore returns the container Store connected with the provided client
func NewRemoteContainerStore(client containersapi.ContainersClient) containers.Store {
return &remoteContainers{
client: client,

View File

@ -8,20 +8,25 @@ import (
"github.com/opencontainers/go-digest"
)
// ReaderAt extends the standard io.ReaderAt interface with reporting of Size and io.Closer
type ReaderAt interface {
io.ReaderAt
io.Closer
Size() int64
}
// Provider provides a reader interface for specific content
type Provider interface {
ReaderAt(ctx context.Context, dgst digest.Digest) (ReaderAt, error)
}
// Ingester writes content
type Ingester interface {
Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error)
}
// Info holds content specific information
//
// TODO(stevvooe): Consider a very different name for this struct. Info is way
// to general. It also reads very weird in certain context, like pluralization.
type Info struct {
@ -32,6 +37,7 @@ type Info struct {
Labels map[string]string
}
// Status of a content operation
type Status struct {
Ref string
Offset int64
@ -81,6 +87,7 @@ type IngestManager interface {
Abort(ctx context.Context, ref string) error
}
// Writer handles the write of content into a content store
type Writer interface {
// Close is expected to be called after Commit() when commission is needed.
io.WriteCloser
@ -111,6 +118,7 @@ type Store interface {
// Opt is used to alter the mutable properties of content
type Opt func(*Info) error
// WithLabels allows labels to be set on content
func WithLabels(labels map[string]string) Opt {
return func(info *Info) error {
info.Labels = labels

View File

@ -19,6 +19,7 @@ var (
}
)
// NewReader returns a io.Reader from a ReaderAt
func NewReader(ra ReaderAt) io.Reader {
rd := io.NewSectionReader(ra, 0, ra.Size())
return rd

View File

@ -2,7 +2,6 @@ package containerd
import (
"net"
"strings"
"time"
"github.com/pkg/errors"
@ -13,12 +12,12 @@ type dialResult struct {
err error
}
// Dialer returns a GRPC net.Conn connected to the provided address
func Dialer(address string, timeout time.Duration) (net.Conn, error) {
var (
stopC = make(chan struct{})
synC = make(chan *dialResult)
)
address = strings.TrimPrefix(address, "unix://")
go func() {
defer close(synC)
for {
@ -47,6 +46,6 @@ func Dialer(address string, timeout time.Duration) (net.Conn, error) {
dr.c.Close()
}
}()
return nil, errors.Errorf("dial %s: no such file or directory", address)
return nil, errors.Errorf("dial %s: timeout", address)
}
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"net"
"os"
"strings"
"syscall"
"time"
)
@ -24,9 +25,12 @@ func isNoent(err error) bool {
}
func dialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout)
}
// DialAddress returns the address with unix:// prepended to the
// provided address
func DialAddress(address string) string {
return fmt.Sprintf("unix://%s", address)
}

View File

@ -29,6 +29,7 @@ var (
ErrNotImplemented = errors.New("not implemented") // represents not supported and unimplemented
)
// IsInvalidArgument returns true if the error is due to an invalid argument
func IsInvalidArgument(err error) bool {
return errors.Cause(err) == ErrInvalidArgument
}
@ -45,15 +46,17 @@ func IsAlreadyExists(err error) bool {
}
// IsFailedPrecondition returns true if an operation could not proceed to the
// lack of a particular condition.
// lack of a particular condition
func IsFailedPrecondition(err error) bool {
return errors.Cause(err) == ErrFailedPrecondition
}
// IsUnavailable returns true if the error is due to a resource being unavailable
func IsUnavailable(err error) bool {
return errors.Cause(err) == ErrUnavailable
}
// IsNotImplemented returns true if the error is due to not being implemented
func IsNotImplemented(err error) bool {
return errors.Cause(err) == ErrNotImplemented
}

View File

@ -53,6 +53,7 @@ func ToGRPCf(err error, format string, args ...interface{}) error {
return ToGRPC(errors.Wrapf(err, format, args...))
}
// FromGRPC returns the underlying error from a grpc service based on the grpc error code
func FromGRPC(err error) error {
if err == nil {
return nil

View File

@ -6,6 +6,7 @@ import (
events "github.com/containerd/containerd/api/services/events/v1"
)
// Event is a generic interface for any type of event
type Event interface{}
// Publisher posts the event.
@ -13,6 +14,7 @@ type Publisher interface {
Publish(ctx context.Context, topic string, event Event) error
}
// Forwarder forwards an event to the underlying event bus
type Forwarder interface {
Forward(ctx context.Context, envelope *events.Envelope) error
}
@ -23,6 +25,7 @@ func (fn publisherFunc) Publish(ctx context.Context, topic string, event Event)
return fn(ctx, topic, event)
}
// Subscriber allows callers to subscribe to events
type Subscriber interface {
Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error)
}

View File

@ -18,10 +18,12 @@ import (
"github.com/sirupsen/logrus"
)
// Exchange broadcasts events
type Exchange struct {
broadcaster *goevents.Broadcaster
}
// NewExchange returns a new event Exchange
func NewExchange() *Exchange {
return &Exchange{
broadcaster: goevents.NewBroadcaster(),

View File

@ -8,8 +8,10 @@ type Adaptor interface {
Field(fieldpath []string) (value string, present bool)
}
// AdapterFunc allows implementation specific matching of fieldpaths
type AdapterFunc func(fieldpath []string) (string, bool)
// Field returns the field name and true if it exists
func (fn AdapterFunc) Field(fieldpath []string) (string, bool) {
return fn(fieldpath)
}

View File

@ -58,22 +58,28 @@ import (
"github.com/containerd/containerd/log"
)
// Filter matches specific resources based the provided filter
type Filter interface {
Match(adaptor Adaptor) bool
}
// FilterFunc is a function that handles matching with an adaptor
type FilterFunc func(Adaptor) bool
// Match matches the FilterFunc returning true if the object matches the filter
func (fn FilterFunc) Match(adaptor Adaptor) bool {
return fn(adaptor)
}
// Always is a filter that always returns true for any type of object
var Always FilterFunc = func(adaptor Adaptor) bool {
return true
}
// Any allows multiple filters to be matched aginst the object
type Any []Filter
// Match returns true if any of the provided filters are true
func (m Any) Match(adaptor Adaptor) bool {
for _, m := range m {
if m.Match(adaptor) {
@ -84,8 +90,10 @@ func (m Any) Match(adaptor Adaptor) bool {
return false
}
// All allows multiple filters to be matched aginst the object
type All []Filter
// Match only returns true if all filters match the object
func (m All) Match(adaptor Adaptor) bool {
for _, m := range m {
if !m.Match(adaptor) {

View File

@ -67,9 +67,8 @@ func (s *scanner) next() rune {
if r == utf8.RuneError {
if w > 0 {
return tokenIllegal
} else {
return tokenEOF
}
return tokenEOF
}
if r == 0 {

View File

@ -1,5 +1,6 @@
package fs
// Usage of disk information
type Usage struct {
Inodes int64
Size int64

View File

@ -2,7 +2,7 @@ package fs
import "os"
// GetLinkID returns an identifier representing the node a hardlink is pointing
// GetLinkInfo returns an identifier representing the node a hardlink is pointing
// to. If the file is not hard linked then 0 will be returned.
func GetLinkInfo(fi os.FileInfo) (uint64, bool) {
return getLinkInfo(fi)

View File

@ -80,10 +80,10 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
if err != nil {
return err
}
if info.Labels["containerd.io/uncompressed"] != layer.Diff.Digest.String() {
if info.Labels == nil {
info.Labels = map[string]string{}
}
if info.Labels["containerd.io/uncompressed"] != layer.Diff.Digest.String() {
info.Labels["containerd.io/uncompressed"] = layer.Diff.Digest.String()
if _, err := cs.Update(ctx, info, "labels.containerd.io/uncompressed"); err != nil {
return err

View File

@ -2,49 +2,49 @@ package images
import (
"context"
"encoding/json"
"fmt"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/platforms"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
var (
// SkipDesc is used to skip processing of a descriptor and
// ErrSkipDesc is used to skip processing of a descriptor and
// its descendants.
SkipDesc = fmt.Errorf("skip descriptor")
ErrSkipDesc = fmt.Errorf("skip descriptor")
// StopHandler is used to signify that the descriptor
// ErrStopHandler is used to signify that the descriptor
// has been handled and should not be handled further.
// This applies only to a single descriptor in a handler
// chain and does not apply to descendant descriptors.
StopHandler = fmt.Errorf("stop handler")
ErrStopHandler = fmt.Errorf("stop handler")
)
// Handler handles image manifests
type Handler interface {
Handle(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error)
}
// HandlerFunc function implementing the Handler interface
type HandlerFunc func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error)
// Handle image manifests
func (fn HandlerFunc) Handle(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
return fn(ctx, desc)
}
// Handlers returns a handler that will run the handlers in sequence.
//
// A handler may return `StopHandler` to stop calling additional handlers
// A handler may return `ErrStopHandler` to stop calling additional handlers
func Handlers(handlers ...Handler) HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
var children []ocispec.Descriptor
for _, handler := range handlers {
ch, err := handler.Handle(ctx, desc)
if err != nil {
if errors.Cause(err) == StopHandler {
if errors.Cause(err) == ErrStopHandler {
break
}
return nil, err
@ -67,7 +67,7 @@ func Walk(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) err
children, err := handler.Handle(ctx, desc)
if err != nil {
if errors.Cause(err) == SkipDesc {
if errors.Cause(err) == ErrSkipDesc {
continue // don't traverse the children.
}
return err
@ -87,7 +87,7 @@ func Walk(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) err
// If the handler decode subresources, they will be visited, as well.
//
// Handlers for siblings are run in parallel on the provided descriptors. A
// handler may return `SkipDesc` to signal to the dispatcher to not traverse
// handler may return `ErrSkipDesc` to signal to the dispatcher to not traverse
// any children.
//
// Typically, this function will be used with `FetchHandler`, often composed
@ -104,7 +104,7 @@ func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor)
children, err := handler.Handle(ctx, desc)
if err != nil {
if errors.Cause(err) == SkipDesc {
if errors.Cause(err) == ErrSkipDesc {
return nil // don't traverse the children.
}
return err
@ -121,7 +121,7 @@ func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor)
return eg.Wait()
}
// ChildrenHandler decodes well-known manifests types and returns their children.
// ChildrenHandler decodes well-known manifest types and returns their children.
//
// This is useful for supporting recursive fetch and other use cases where you
// want to do a full walk of resources.
@ -130,60 +130,6 @@ func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor)
// arbitrary types.
func ChildrenHandler(provider content.Provider, platform string) HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
var descs []ocispec.Descriptor
switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
p, err := content.ReadBlob(ctx, provider, desc.Digest)
if err != nil {
return nil, err
}
// TODO(stevvooe): We just assume oci manifest, for now. There may be
// subtle differences from the docker version.
var manifest ocispec.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return nil, err
}
descs = append(descs, manifest.Config)
descs = append(descs, manifest.Layers...)
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
p, err := content.ReadBlob(ctx, provider, desc.Digest)
if err != nil {
return nil, err
}
var index ocispec.Index
if err := json.Unmarshal(p, &index); err != nil {
return nil, err
}
if platform != "" {
matcher, err := platforms.Parse(platform)
if err != nil {
return nil, err
}
for _, d := range index.Manifests {
if d.Platform == nil || matcher.Match(*d.Platform) {
descs = append(descs, d)
}
}
} else {
descs = append(descs, index.Manifests...)
}
case MediaTypeDockerSchema2Layer, MediaTypeDockerSchema2LayerGzip,
MediaTypeDockerSchema2LayerForeign, MediaTypeDockerSchema2LayerForeignGzip,
MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig,
ocispec.MediaTypeImageLayer, ocispec.MediaTypeImageLayerGzip,
ocispec.MediaTypeImageLayerNonDistributable, ocispec.MediaTypeImageLayerNonDistributableGzip:
// childless data types.
return nil, nil
default:
log.G(ctx).Warnf("encountered unknown type %v; children may not be fetched", desc.MediaType)
}
return descs, nil
return Children(ctx, provider, desc, platform)
}
}

View File

@ -7,6 +7,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/platforms"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -15,12 +16,29 @@ import (
// Image provides the model for how containerd views container images.
type Image struct {
// Name of the image.
//
// To be pulled, it must be a reference compatible with resolvers.
//
// This field is required.
Name string
// Labels provide runtime decoration for the image record.
//
// There is no default behavior for how these labels are propagated. They
// only decorate the static metadata object.
//
// This field is optional.
Labels map[string]string
// Target describes the root content for this image. Typically, this is
// a manifest, index or manifest list.
Target ocispec.Descriptor
CreatedAt, UpdatedAt time.Time
}
// Store and interact with images
type Store interface {
Get(ctx context.Context, name string) (Image, error)
List(ctx context.Context, filters ...string) ([]Image, error)
@ -69,6 +87,12 @@ func (image *Image) Size(ctx context.Context, provider content.Provider, platfor
}), ChildrenHandler(provider, platform)), image.Target)
}
// Manifest resolves a manifest from the image for the given platform.
//
// TODO(stevvooe): This violates the current platform agnostic approach to this
// package by returning a specific manifest type. We'll need to refactor this
// to return a manifest descriptor or decide that we want to bring the API in
// this direction because this abstraction is not needed.`
func Manifest(ctx context.Context, provider content.Provider, image ocispec.Descriptor, platform string) (ocispec.Manifest, error) {
var (
matcher platforms.Matcher
@ -146,7 +170,7 @@ func Manifest(ctx context.Context, provider content.Provider, image ocispec.Desc
return descs, nil
}
return nil, errors.New("could not resolve manifest")
return nil, errors.Wrap(errdefs.ErrNotFound, "could not resolve manifest")
}), image); err != nil {
return ocispec.Manifest{}, err
}
@ -177,7 +201,7 @@ func Platforms(ctx context.Context, provider content.Provider, image ocispec.Des
return platformSpecs, Walk(ctx, Handlers(HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
if desc.Platform != nil {
platformSpecs = append(platformSpecs, *desc.Platform)
return nil, SkipDesc
return nil, ErrSkipDesc
}
switch desc.MediaType {
@ -199,6 +223,108 @@ func Platforms(ctx context.Context, provider content.Provider, image ocispec.Des
}), ChildrenHandler(provider, "")), image)
}
// Check returns nil if the all components of an image are available in the
// provider for the specified platform.
//
// If available is true, the caller can assume that required represents the
// complete set of content required for the image.
//
// missing will have the components that are part of required but not avaiiable
// in the provider.
//
// If there is a problem resolving content, an error will be returned.
func Check(ctx context.Context, provider content.Provider, image ocispec.Descriptor, platform string) (available bool, required, present, missing []ocispec.Descriptor, err error) {
mfst, err := Manifest(ctx, provider, image, platform)
if err != nil {
if errdefs.IsNotFound(err) {
return false, []ocispec.Descriptor{image}, nil, []ocispec.Descriptor{image}, nil
}
return false, nil, nil, nil, errors.Wrap(err, "image check failed")
}
// TODO(stevvooe): It is possible that referenced conponents could have
// children, but this is rare. For now, we ignore this and only verify
// that manfiest components are present.
required = append([]ocispec.Descriptor{mfst.Config}, mfst.Layers...)
for _, desc := range required {
ra, err := provider.ReaderAt(ctx, desc.Digest)
if err != nil {
if errdefs.IsNotFound(err) {
missing = append(missing, desc)
continue
} else {
return false, nil, nil, nil, err
}
}
ra.Close()
present = append(present, desc)
}
return true, required, present, missing, nil
}
// Children returns the immediate children of content described by the descriptor.
func Children(ctx context.Context, provider content.Provider, desc ocispec.Descriptor, platform string) ([]ocispec.Descriptor, error) {
var descs []ocispec.Descriptor
switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
p, err := content.ReadBlob(ctx, provider, desc.Digest)
if err != nil {
return nil, err
}
// TODO(stevvooe): We just assume oci manifest, for now. There may be
// subtle differences from the docker version.
var manifest ocispec.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return nil, err
}
descs = append(descs, manifest.Config)
descs = append(descs, manifest.Layers...)
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
p, err := content.ReadBlob(ctx, provider, desc.Digest)
if err != nil {
return nil, err
}
var index ocispec.Index
if err := json.Unmarshal(p, &index); err != nil {
return nil, err
}
if platform != "" {
matcher, err := platforms.Parse(platform)
if err != nil {
return nil, err
}
for _, d := range index.Manifests {
if d.Platform == nil || matcher.Match(*d.Platform) {
descs = append(descs, d)
}
}
} else {
descs = append(descs, index.Manifests...)
}
case MediaTypeDockerSchema2Layer, MediaTypeDockerSchema2LayerGzip,
MediaTypeDockerSchema2LayerForeign, MediaTypeDockerSchema2LayerForeignGzip,
MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig,
ocispec.MediaTypeImageLayer, ocispec.MediaTypeImageLayerGzip,
ocispec.MediaTypeImageLayerNonDistributable, ocispec.MediaTypeImageLayerNonDistributableGzip:
// childless data types.
return nil, nil
default:
log.G(ctx).Warnf("encountered unknown type %v; children may not be fetched", desc.MediaType)
}
return descs, nil
}
// RootFS returns the unpacked diffids that make up and images rootfs.
//
// These are used to verify that a set of layers unpacked to the expected

View File

@ -68,6 +68,7 @@ func (c *Client) importFromOCITar(ctx context.Context, ref string, reader io.Rea
imgrec := images.Image{
Name: ref,
Target: *desc,
Labels: iopts.labels,
}
is := c.ImageService()
if updated, err := is.Update(ctx, imgrec, "target"); err != nil {

View File

@ -119,6 +119,7 @@ func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
return f, nil
}
// DirectIO allows task IO to be handled externally by the caller
type DirectIO struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
@ -128,14 +129,17 @@ type DirectIO struct {
terminal bool
}
// IOCreate returns IO avaliable for use with task creation
func (f *DirectIO) IOCreate(id string) (IO, error) {
return f, nil
}
// IOAttach returns IO avaliable for use with task attachment
func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) {
return f, nil
}
// Config returns the IOConfig
func (f *DirectIO) Config() IOConfig {
return IOConfig{
Terminal: f.terminal,
@ -145,14 +149,21 @@ func (f *DirectIO) Config() IOConfig {
}
}
// Cancel stops any IO copy operations
//
// Not applicable for DirectIO
func (f *DirectIO) Cancel() {
// nothing to cancel as all operations are handled externally
}
// Wait on any IO copy operations
//
// Not applicable for DirectIO
func (f *DirectIO) Wait() {
// nothing to wait on as all operations are handled externally
}
// Close closes all open fds
func (f *DirectIO) Close() error {
err := f.Stdin.Close()
if err2 := f.Stdout.Close(); err == nil {
@ -164,6 +175,7 @@ func (f *DirectIO) Close() error {
return err
}
// Delete removes the underlying directory containing fifos
func (f *DirectIO) Delete() error {
if f.set.Dir == "" {
return nil

View File

@ -9,15 +9,13 @@ const (
maxSize = 4096
)
// Validate a label's key and value are under 4096 bytes
func Validate(k, v string) error {
// A label key and value should be under 4096 bytes
if (len(k) + len(v)) > maxSize {
if len(k) > 10 {
k = k[:10]
}
return errors.Wrapf(errdefs.ErrInvalidArgument, "label key and value greater than maximum size (%d bytes), key: %s", maxSize, k)
}
return nil
}

View File

@ -17,9 +17,14 @@ func WithTransactionContext(ctx context.Context, tx *bolt.Tx) context.Context {
return context.WithValue(ctx, transactionKey{}, tx)
}
type transactor interface {
View(fn func(*bolt.Tx) error) error
Update(fn func(*bolt.Tx) error) error
}
// view gets a bolt db transaction either from the context
// or starts a new one with the provided bolt database.
func view(ctx context.Context, db *bolt.DB, fn func(*bolt.Tx) error) error {
func view(ctx context.Context, db transactor, fn func(*bolt.Tx) error) error {
tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
if !ok {
return db.View(fn)
@ -29,7 +34,7 @@ func view(ctx context.Context, db *bolt.DB, fn func(*bolt.Tx) error) error {
// update gets a writable bolt db transaction either from the context
// or starts a new one with the provided bolt database.
func update(ctx context.Context, db *bolt.DB, fn func(*bolt.Tx) error) error {
func update(ctx context.Context, db transactor, fn func(*bolt.Tx) error) error {
tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
if !ok {
return db.Update(fn)

View File

@ -28,7 +28,8 @@ import (
// key: object-specific key identifying the storage bucket for the objects
// contents.
var (
bucketKeyVersion = []byte("v1")
bucketKeyVersion = []byte(schemaVersion)
bucketKeyDBVersion = []byte("version") // stores the version of the schema
bucketKeyObjectLabels = []byte("labels") // stores the labels for a namespace.
bucketKeyObjectIndexes = []byte("indexes") // reserved
bucketKeyObjectImages = []byte("images") // stores image objects
@ -45,6 +46,7 @@ var (
bucketKeyRuntime = []byte("runtime")
bucketKeyName = []byte("name")
bucketKeyParent = []byte("parent")
bucketKeyChildren = []byte("children")
bucketKeyOptions = []byte("options")
bucketKeySpec = []byte("spec")
bucketKeySnapshotKey = []byte("snapshotKey")

View File

@ -22,6 +22,7 @@ type containerStore struct {
tx *bolt.Tx
}
// NewContainerStore returns a Store backed by an underlying bolt DB
func NewContainerStore(tx *bolt.Tx) containers.Store {
return &containerStore{
tx: tx,

View File

@ -19,12 +19,12 @@ import (
type contentStore struct {
content.Store
db *bolt.DB
db transactor
}
// NewContentStore returns a namespaced content store using an existing
// newContentStore returns a namespaced content store using an existing
// content store interface.
func NewContentStore(db *bolt.DB, cs content.Store) content.Store {
func newContentStore(db transactor, cs content.Store) content.Store {
return &contentStore{
Store: cs,
db: db,
@ -353,7 +353,7 @@ type namespacedWriter struct {
content.Writer
ref string
namespace string
db *bolt.DB
db transactor
}
func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
@ -406,7 +406,7 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
commitTime := time.Now().UTC()
sizeEncoded, err := encodeSize(size)
sizeEncoded, err := encodeInt(size)
if err != nil {
return err
}
@ -417,11 +417,7 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
if err := boltutil.WriteLabels(bkt, base.Labels); err != nil {
return err
}
if err := bkt.Put(bucketKeySize, sizeEncoded); err != nil {
return err
}
return nil
return bkt.Put(bucketKeySize, sizeEncoded)
}
func (nw *namespacedWriter) Status() (content.Status, error) {
@ -492,14 +488,10 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
}
// Write size
sizeEncoded, err := encodeSize(info.Size)
sizeEncoded, err := encodeInt(info.Size)
if err != nil {
return err
}
if err := bkt.Put(bucketKeySize, sizeEncoded); err != nil {
return err
}
return nil
return bkt.Put(bucketKeySize, sizeEncoded)
}

140
vendor/github.com/containerd/containerd/metadata/db.go generated vendored Normal file
View File

@ -0,0 +1,140 @@
package metadata
import (
"context"
"encoding/binary"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/snapshot"
"github.com/pkg/errors"
)
const (
// schemaVersion represents the schema version of
// the database. This schema version represents the
// structure of the data in the database. The schema
// can envolve at any time but any backwards
// incompatible changes or structural changes require
// bumping the schema version.
schemaVersion = "v1"
// dbVersion represents updates to the schema
// version which are additions and compatible with
// prior version of the same schema.
dbVersion = 1
)
type DB struct {
db *bolt.DB
ss map[string]snapshot.Snapshotter
cs content.Store
}
func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshot.Snapshotter) *DB {
return &DB{
db: db,
ss: ss,
cs: cs,
}
}
func (m *DB) Init(ctx context.Context) error {
// errSkip is used when no migration or version needs to be written
// to the database and the transaction can be immediately rolled
// back rather than performing a much slower and unnecessary commit.
var errSkip = errors.New("skip update")
err := m.db.Update(func(tx *bolt.Tx) error {
var (
// current schema and version
schema = "v0"
version = 0
)
i := len(migrations)
for ; i > 0; i-- {
migration := migrations[i-1]
bkt := tx.Bucket([]byte(migration.schema))
if bkt == nil {
// Hasn't encountered another schema, go to next migration
if schema == "v0" {
continue
}
break
}
if schema == "v0" {
schema = migration.schema
vb := bkt.Get(bucketKeyDBVersion)
if vb != nil {
v, _ := binary.Varint(vb)
version = int(v)
}
}
if version >= migration.version {
break
}
}
// Previous version fo database found
if schema != "v0" {
updates := migrations[i:]
// No migration updates, return immediately
if len(updates) == 0 {
return errSkip
}
for _, m := range updates {
t0 := time.Now()
if err := m.migrate(tx); err != nil {
return errors.Wrapf(err, "failed to migrate to %s.%d", m.schema, m.version)
}
log.G(ctx).WithField("d", time.Now().Sub(t0)).Debugf("database migration to %s.%d finished", m.schema, m.version)
}
}
bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
if err != nil {
return err
}
versionEncoded, err := encodeInt(dbVersion)
if err != nil {
return err
}
return bkt.Put(bucketKeyDBVersion, versionEncoded)
})
if err == errSkip {
err = nil
}
return err
}
func (m *DB) ContentStore() content.Store {
if m.cs == nil {
return nil
}
return newContentStore(m, m.cs)
}
func (m *DB) Snapshotter(name string) snapshot.Snapshotter {
sn, ok := m.ss[name]
if !ok {
return nil
}
return newSnapshotter(m, name, sn)
}
func (m *DB) View(fn func(*bolt.Tx) error) error {
return m.db.View(fn)
}
func (m *DB) Update(fn func(*bolt.Tx) error) error {
return m.db.Update(fn)
}

View File

@ -15,6 +15,7 @@ import (
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
@ -22,6 +23,7 @@ type imageStore struct {
tx *bolt.Tx
}
// NewImageStore returns a store backed by a bolt DB
func NewImageStore(tx *bolt.Tx) images.Store {
return &imageStore{tx: tx}
}
@ -201,12 +203,34 @@ func (s *imageStore) Delete(ctx context.Context, name string) error {
}
func validateImage(image *images.Image) error {
if image.Name == "" {
return errors.Wrapf(errdefs.ErrInvalidArgument, "image name must not be empty")
}
for k, v := range image.Labels {
if err := labels.Validate(k, v); err != nil {
return errors.Wrapf(err, "image.Labels")
}
}
return validateTarget(&image.Target)
}
func validateTarget(target *ocispec.Descriptor) error {
// NOTE(stevvooe): Only validate fields we actually store.
if err := target.Digest.Validate(); err != nil {
return errors.Wrapf(errdefs.ErrInvalidArgument, "Target.Digest %q invalid: %v", target.Digest, err)
}
if target.Size <= 0 {
return errors.Wrapf(errdefs.ErrInvalidArgument, "Target.Size must be greater than zero")
}
if target.MediaType == "" {
return errors.Wrapf(errdefs.ErrInvalidArgument, "Target.MediaType must be set")
}
return nil
}
@ -260,7 +284,7 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error {
return err
}
sizeEncoded, err := encodeSize(image.Target.Size)
sizeEncoded, err := encodeInt(image.Target.Size)
if err != nil {
return err
}
@ -278,15 +302,15 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error {
return nil
}
func encodeSize(size int64) ([]byte, error) {
func encodeInt(i int64) ([]byte, error) {
var (
buf [binary.MaxVarintLen64]byte
sizeEncoded []byte = buf[:]
iEncoded = buf[:]
)
sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, size)]
iEncoded = iEncoded[:binary.PutVarint(iEncoded, i)]
if len(sizeEncoded) == 0 {
return nil, fmt.Errorf("failed encoding size = %v", size)
if len(iEncoded) == 0 {
return nil, fmt.Errorf("failed encoding integer = %v", i)
}
return sizeEncoded, nil
return iEncoded, nil
}

View File

@ -0,0 +1,75 @@
package metadata
import "github.com/boltdb/bolt"
type migration struct {
schema string
version int
migrate func(*bolt.Tx) error
}
var migrations = []migration{
{
schema: "v1",
version: 1,
migrate: addChildLinks,
},
}
// addChildLinks Adds children key to the snapshotters to enforce snapshot
// entries cannot be removed which have children
func addChildLinks(tx *bolt.Tx) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
}
// iterate through each namespace
v1c := v1bkt.Cursor()
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
if v != nil {
continue
}
nbkt := v1bkt.Bucket(k)
sbkt := nbkt.Bucket(bucketKeyObjectSnapshots)
if sbkt != nil {
// Iterate through each snapshotter
if err := sbkt.ForEach(func(sk, sv []byte) error {
if sv != nil {
return nil
}
snbkt := sbkt.Bucket(sk)
// Iterate through each snapshot
return snbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
parent := snbkt.Bucket(k).Get(bucketKeyParent)
if len(parent) > 0 {
pbkt := snbkt.Bucket(parent)
if pbkt == nil {
// Not enforcing consistency during migration, skip
return nil
}
cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren)
if err != nil {
return err
}
if err := cbkt.Put(k, nil); err != nil {
return err
}
}
return nil
})
}); err != nil {
return err
}
}
}
return nil
}

View File

@ -14,6 +14,7 @@ type namespaceStore struct {
tx *bolt.Tx
}
// NewNamespaceStore returns a store backed by a bolt DB
func NewNamespaceStore(tx *bolt.Tx) namespaces.Store {
return &namespaceStore{tx: tx}
}

View File

@ -19,12 +19,12 @@ import (
type snapshotter struct {
snapshot.Snapshotter
name string
db *bolt.DB
db transactor
}
// NewSnapshotter returns a new Snapshotter which namespaces the given snapshot
// using the provided name and metadata store.
func NewSnapshotter(db *bolt.DB, name string, sn snapshot.Snapshotter) snapshot.Snapshotter {
// newSnapshotter returns a new Snapshotter which namespaces the given snapshot
// using the provided name and database.
func newSnapshotter(db transactor, name string, sn snapshot.Snapshotter) snapshot.Snapshotter {
return &snapshotter{
Snapshotter: sn,
name: name,
@ -283,10 +283,18 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
if parent != "" {
pbkt := bkt.Bucket([]byte(parent))
if pbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", parent)
return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", parent)
}
bparent = string(pbkt.Get(bucketKeyName))
cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren)
if err != nil {
return err
}
if err := cbkt.Put([]byte(key), nil); err != nil {
return err
}
if err := bbkt.Put(bucketKeyParent, []byte(parent)); err != nil {
return err
}
@ -360,7 +368,6 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
}
bkey := string(obkt.Get(bucketKeyName))
parent := string(obkt.Get(bucketKeyParent))
sid, err := bkt.NextSequence()
if err != nil {
@ -372,9 +379,29 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
if err := bbkt.Put(bucketKeyName, []byte(nameKey)); err != nil {
return err
}
if err := bbkt.Put(bucketKeyParent, []byte(parent)); err != nil {
parent := obkt.Get(bucketKeyParent)
if len(parent) > 0 {
pbkt := bkt.Bucket(parent)
if pbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", string(parent))
}
cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren)
if err != nil {
return err
}
if err := cbkt.Delete([]byte(key)); err != nil {
return err
}
if err := cbkt.Put([]byte(name), nil); err != nil {
return err
}
if err := bbkt.Put(bucketKeyParent, parent); err != nil {
return err
}
}
ts := time.Now().UTC()
if err := boltutil.WriteTimestamps(bbkt, ts, ts); err != nil {
return err
@ -400,23 +427,37 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error {
}
return update(ctx, s.db, func(tx *bolt.Tx) error {
var bkey string
var sbkt *bolt.Bucket
bkt := getSnapshotterBucket(tx, ns, s.name)
if bkt != nil {
sbkt := bkt.Bucket([]byte(key))
if sbkt != nil {
bkey = string(sbkt.Get(bucketKeyName))
sbkt = bkt.Bucket([]byte(key))
}
}
if bkey == "" {
if sbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", key)
}
if err := bkt.DeleteBucket([]byte(key)); err != nil {
return err
cbkt := sbkt.Bucket(bucketKeyChildren)
if cbkt != nil {
if child, _ := cbkt.Cursor().First(); child != nil {
return errors.Wrap(errdefs.ErrFailedPrecondition, "cannot remove snapshot with child")
}
}
return s.Snapshotter.Remove(ctx, bkey)
parent := sbkt.Get(bucketKeyParent)
if len(parent) > 0 {
pbkt := bkt.Bucket(parent)
if pbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", string(parent))
}
cbkt := pbkt.Bucket(bucketKeyChildren)
if cbkt != nil {
if err := cbkt.Delete([]byte(key)); err != nil {
return errors.Wrap(err, "failed to remove child link")
}
}
}
return bkt.DeleteBucket([]byte(key))
})
}

View File

@ -13,8 +13,8 @@ type Mount struct {
Options []string
}
// MountAll mounts all the provided mounts to the provided target
func MountAll(mounts []Mount, target string) error {
// All mounts all the provided mounts to the provided target
func All(mounts []Mount, target string) error {
for _, m := range mounts {
if err := m.Mount(target); err != nil {
return err

View File

@ -6,6 +6,7 @@ import (
"golang.org/x/sys/unix"
)
// Mount to the provided target path
func (m *Mount) Mount(target string) error {
flags, data := parseMountOptions(m.Options)
@ -40,6 +41,7 @@ func (m *Mount) Mount(target string) error {
return nil
}
// Unmount the provided mount path with the flags
func Unmount(mount string, flags int) error {
return unix.Unmount(mount, flags)
}

View File

@ -9,7 +9,9 @@ import (
)
const (
// NamespaceEnvVar is the environment variable key name
NamespaceEnvVar = "CONTAINERD_NAMESPACE"
// Default is the name of the default namespace
Default = "default"
)

View File

@ -9,7 +9,8 @@ import (
"github.com/containerd/containerd/log"
)
func NewContext(ctx context.Context, plugins map[PluginType]map[string]interface{}, root, state, id string) *InitContext {
// NewContext returns a new plugin InitContext
func NewContext(ctx context.Context, plugins map[Type]map[string]interface{}, root, state, id string) *InitContext {
return &InitContext{
plugins: plugins,
Root: filepath.Join(root, id),
@ -18,6 +19,7 @@ func NewContext(ctx context.Context, plugins map[PluginType]map[string]interface
}
}
// InitContext is used for plugin inititalization
type InitContext struct {
Root string
State string
@ -26,17 +28,19 @@ type InitContext struct {
Config interface{}
Events *events.Exchange
plugins map[PluginType]map[string]interface{}
plugins map[Type]map[string]interface{}
}
func (i *InitContext) Get(t PluginType) (interface{}, error) {
// Get returns the first plugin by its type
func (i *InitContext) Get(t Type) (interface{}, error) {
for _, v := range i.plugins[t] {
return v, nil
}
return nil, fmt.Errorf("no plugins registered for %s", t)
}
func (i *InitContext) GetAll(t PluginType) (map[string]interface{}, error) {
// GetAll returns all plugins with the specific type
func (i *InitContext) GetAll(t Type) (map[string]interface{}, error) {
p, ok := i.plugins[t]
if !ok {
return nil, fmt.Errorf("no plugins registered for %s", t)

View File

@ -6,6 +6,7 @@ import (
"golang.org/x/net/context"
)
// Differ allows the apply and creation of filesystem diffs between mounts
type Differ interface {
Apply(ctx context.Context, desc ocispec.Descriptor, mount []mount.Mount) (ocispec.Descriptor, error)
DiffMounts(ctx context.Context, lower, upper []mount.Mount, media, ref string) (ocispec.Descriptor, error)

View File

@ -9,49 +9,62 @@ import (
)
var (
ErrNoPluginType = errors.New("plugin: no type")
// ErrNoType is returned when no type is specified
ErrNoType = errors.New("plugin: no type")
// ErrNoPluginID is returned when no id is specified
ErrNoPluginID = errors.New("plugin: no id")
// SkipPlugin is used when a plugin is not initialized and should not be loaded,
// ErrSkipPlugin is used when a plugin is not initialized and should not be loaded,
// this allows the plugin loader differentiate between a plugin which is configured
// not to load and one that fails to load.
SkipPlugin = errors.New("skip plugin")
ErrSkipPlugin = errors.New("skip plugin")
)
// IsSkipPlugin returns true if the error is skipping the plugin
func IsSkipPlugin(err error) bool {
if errors.Cause(err) == SkipPlugin {
if errors.Cause(err) == ErrSkipPlugin {
return true
}
return false
}
type PluginType string
// Type is the type of the plugin
type Type string
const (
RuntimePlugin PluginType = "io.containerd.runtime.v1"
GRPCPlugin PluginType = "io.containerd.grpc.v1"
SnapshotPlugin PluginType = "io.containerd.snapshotter.v1"
TaskMonitorPlugin PluginType = "io.containerd.monitor.v1"
DiffPlugin PluginType = "io.containerd.differ.v1"
MetadataPlugin PluginType = "io.containerd.metadata.v1"
ContentPlugin PluginType = "io.containerd.content.v1"
// RuntimePlugin implements a runtime
RuntimePlugin Type = "io.containerd.runtime.v1"
// GRPCPlugin implements a grpc service
GRPCPlugin Type = "io.containerd.grpc.v1"
// SnapshotPlugin implements a snapshotter
SnapshotPlugin Type = "io.containerd.snapshotter.v1"
// TaskMonitorPlugin implements a task monitor
TaskMonitorPlugin Type = "io.containerd.monitor.v1"
// DiffPlugin implements a differ
DiffPlugin Type = "io.containerd.differ.v1"
// MetadataPlugin implements a metadata store
MetadataPlugin Type = "io.containerd.metadata.v1"
// ContentPlugin implements a content store
ContentPlugin Type = "io.containerd.content.v1"
)
// Registration contains information for registering a plugin
type Registration struct {
Type PluginType
Type Type
ID string
Config interface{}
Requires []PluginType
Requires []Type
Init func(*InitContext) (interface{}, error)
added bool
}
// URI returns the full plugin URI
func (r *Registration) URI() string {
return fmt.Sprintf("%s.%s", r.Type, r.ID)
}
// Service allows GRPC services to be registered with the underlying server
type Service interface {
Register(*grpc.Server) error
}
@ -75,11 +88,12 @@ func Load(path string) (err error) {
return loadPlugins(path)
}
// Register allows plugins to register
func Register(r *Registration) {
register.Lock()
defer register.Unlock()
if r.Type == "" {
panic(ErrNoPluginType)
panic(ErrNoType)
}
if r.ID == "" {
panic(ErrNoPluginID)
@ -87,6 +101,7 @@ func Register(r *Registration) {
register.r = append(register.r, r)
}
// Graph returns an ordered list of registered plugins for initialization
func Graph() (ordered []*Registration) {
for _, r := range register.r {
children(r.Requires, &ordered)
@ -98,7 +113,7 @@ func Graph() (ordered []*Registration) {
return ordered
}
func children(types []PluginType, ordered *[]*Registration) {
func children(types []Type, ordered *[]*Registration) {
for _, t := range types {
for _, r := range register.r {
if r.Type == t {

View File

@ -5,6 +5,7 @@ import (
"github.com/gogo/protobuf/protoc-gen-gogo/descriptor"
)
// FieldpathEnabled returns true if E_Fieldpath is enabled
func FieldpathEnabled(file *descriptor.FileDescriptorProto, message *descriptor.DescriptorProto) bool {
return proto.GetBoolExtension(message.Options, E_Fieldpath, proto.GetBoolExtension(file.Options, E_FieldpathAll, false))
}

View File

@ -12,8 +12,11 @@ import (
)
var (
// ErrInvalid is returned when there is an invalid reference
ErrInvalid = errors.New("invalid reference")
// ErrObjectRequired is returned when the object is required
ErrObjectRequired = errors.New("object required")
// ErrHostnameRequired is returned when the hostname is required
ErrHostnameRequired = errors.New("hostname required")
)
@ -138,7 +141,6 @@ func SplitObject(obj string) (tag string, dgst digest.Digest) {
parts := strings.SplitAfterN(obj, "@", 2)
if len(parts) < 2 {
return parts[0], ""
} else {
return parts[0], digest.Digest(parts[1])
}
return parts[0], digest.Digest(parts[1])
}

View File

@ -47,6 +47,7 @@ type Converter struct {
layerBlobs map[digest.Digest]ocispec.Descriptor
}
// NewConverter returns a new converter
func NewConverter(contentStore content.Store, fetcher remotes.Fetcher) *Converter {
return &Converter{
contentStore: contentStore,
@ -56,6 +57,7 @@ func NewConverter(contentStore content.Store, fetcher remotes.Fetcher) *Converte
}
}
// Handle fetching descriptors for a docker media type
func (c *Converter) Handle(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
switch desc.MediaType {
case images.MediaTypeDockerSchema1Manifest:
@ -101,6 +103,7 @@ func (c *Converter) Handle(ctx context.Context, desc ocispec.Descriptor) ([]ocis
}
}
// Convert a docker manifest to an OCI descriptor
func (c *Converter) Convert(ctx context.Context) (ocispec.Descriptor, error) {
history, diffIDs, err := c.schema1ManifestHistory()
if err != nil {

View File

@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
)
// Status of a content operation
type Status struct {
content.Status
@ -15,6 +16,7 @@ type Status struct {
UploadUUID string
}
// StatusTracker to track status of operations
type StatusTracker interface {
GetStatus(string) (Status, error)
SetStatus(string, Status)
@ -25,6 +27,7 @@ type memoryStatusTracker struct {
m sync.Mutex
}
// NewInMemoryTracker returns a StatusTracker that tracks content status in-memory
func NewInMemoryTracker() StatusTracker {
return &memoryStatusTracker{
statuses: map[string]Status{},

View File

@ -14,7 +14,7 @@ import (
"github.com/sirupsen/logrus"
)
// MakeRef returns a unique reference for the descriptor. This reference can be
// MakeRefKey returns a unique reference for the descriptor. This reference can be
// used to lookup ongoing processes related to the descriptor. This function
// may look to the context to namespace the reference appropriately.
func MakeRefKey(ctx context.Context, desc ocispec.Descriptor) string {

View File

@ -32,11 +32,13 @@ type Resolver interface {
Pusher(ctx context.Context, ref string) (Pusher, error)
}
// Fetcher fetches content
type Fetcher interface {
// Fetch the resource identified by the descriptor.
Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
}
// Pusher pushes content
type Pusher interface {
// Push returns a content writer for the given resource identified
// by the descriptor.
@ -47,6 +49,7 @@ type Pusher interface {
// function.
type FetcherFunc func(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
// Fetch content
func (fn FetcherFunc) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
return fn(ctx, desc)
}
@ -55,6 +58,7 @@ func (fn FetcherFunc) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.Re
// function.
type PusherFunc func(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error
func (fn PusherFunc) Pusher(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error {
// Push content
func (fn PusherFunc) Push(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error {
return fn(ctx, desc, r)
}

View File

@ -19,11 +19,13 @@ var (
type initializerFunc func(string) error
// Mounter handles mount and unmount
type Mounter interface {
Mount(target string, mounts ...mount.Mount) error
Unmount(target string) error
}
// InitRootFS initializes the snapshot for use as a rootfs
func InitRootFS(ctx context.Context, name string, parent digest.Digest, readonly bool, snapshotter snapshot.Snapshotter, mounter Mounter) ([]mount.Mount, error) {
_, err := snapshotter.Stat(ctx, name)
if err == nil {

View File

@ -4,7 +4,6 @@ import (
"io"
"sync"
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/content/v1"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/content"
@ -39,8 +38,7 @@ func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "content",
Requires: []plugin.PluginType{
plugin.ContentPlugin,
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
Init: NewService,
@ -48,17 +46,13 @@ func init() {
}
func NewService(ic *plugin.InitContext) (interface{}, error) {
c, err := ic.Get(plugin.ContentPlugin)
if err != nil {
return nil, err
}
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
cs := metadata.NewContentStore(m.(*bolt.DB), c.(content.Store))
return &Service{
store: cs,
store: m.(*metadata.DB).ContentStore(),
publisher: ic.Events,
}, nil
}

View File

@ -26,7 +26,7 @@ func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "diff",
Requires: []plugin.PluginType{
Requires: []plugin.Type{
plugin.DiffPlugin,
},
Config: &config{

View File

@ -20,7 +20,7 @@ func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "images",
Requires: []plugin.PluginType{
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
@ -28,17 +28,17 @@ func init() {
if err != nil {
return nil, err
}
return NewService(m.(*bolt.DB), ic.Events), nil
return NewService(m.(*metadata.DB), ic.Events), nil
},
})
}
type Service struct {
db *bolt.DB
db *metadata.DB
publisher events.Publisher
}
func NewService(db *bolt.DB, publisher events.Publisher) imagesapi.ImagesServer {
func NewService(db *metadata.DB, publisher events.Publisher) imagesapi.ImagesServer {
return &Service{
db: db,
publisher: publisher,

View File

@ -3,7 +3,6 @@ package snapshot
import (
gocontext "context"
"github.com/boltdb/bolt"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
"github.com/containerd/containerd/api/types"
@ -15,7 +14,6 @@ import (
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/snapshot"
protoempty "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
@ -24,8 +22,7 @@ func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "snapshots",
Requires: []plugin.PluginType{
plugin.SnapshotPlugin,
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
Init: newService,
@ -35,30 +32,18 @@ func init() {
var empty = &protoempty.Empty{}
type service struct {
snapshotters map[string]snapshot.Snapshotter
db *metadata.DB
publisher events.Publisher
}
func newService(ic *plugin.InitContext) (interface{}, error) {
rawSnapshotters, err := ic.GetAll(plugin.SnapshotPlugin)
if err != nil {
return nil, err
}
md, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
snapshotters := make(map[string]snapshot.Snapshotter)
for name, sn := range rawSnapshotters {
snapshotters[name] = metadata.NewSnapshotter(md.(*bolt.DB), name, sn.(snapshot.Snapshotter))
}
if len(snapshotters) == 0 {
return nil, errors.Errorf("failed to create snapshotter service: no snapshotters loaded")
}
return &service{
snapshotters: snapshotters,
db: md.(*metadata.DB),
publisher: ic.Events,
}, nil
}
@ -68,8 +53,8 @@ func (s *service) getSnapshotter(name string) (snapshot.Snapshotter, error) {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "snapshotter argument missing")
}
sn, ok := s.snapshotters[name]
if !ok {
sn := s.db.Snapshotter(name)
if sn == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "snapshotter not loaded: %s", name)
}
return sn, nil

View File

@ -146,7 +146,7 @@ func (u *Usage) Add(other Usage) {
// the active snapshot. Mount this to the temporary location with the
// following:
//
// if err := containerd.MountAll(mounts, tmpDir); err != nil { ... }
// if err := mount.All(mounts, tmpDir); err != nil { ... }
//
// Once the mounts are performed, our temporary location is ready to capture
// a diff. In practice, this works similar to a filesystem transaction. The

View File

@ -19,6 +19,14 @@ func WithProcessArgs(args ...string) SpecOpts {
}
}
// WithProcessCwd replaces the current working directory on the generated spec
func WithProcessCwd(cwd string) SpecOpts {
return func(_ context.Context, _ *Client, _ *containers.Container, s *specs.Spec) error {
s.Process.Cwd = cwd
return nil
}
}
// WithHostname sets the container's hostname
func WithHostname(name string) SpecOpts {
return func(_ context.Context, _ *Client, _ *containers.Container, s *specs.Spec) error {

View File

@ -138,17 +138,28 @@ func WithImageConfig(i Image) SpecOpts {
}
// WithRootFSPath specifies unmanaged rootfs path.
func WithRootFSPath(path string, readonly bool) SpecOpts {
func WithRootFSPath(path string) SpecOpts {
return func(_ context.Context, _ *Client, _ *containers.Container, s *specs.Spec) error {
s.Root = &specs.Root{
Path: path,
Readonly: readonly,
if s.Root == nil {
s.Root = &specs.Root{}
}
s.Root.Path = path
// Entrypoint is not set here (it's up to caller)
return nil
}
}
// WithRootFSReadonly sets specs.Root.Readonly to true
func WithRootFSReadonly() SpecOpts {
return func(_ context.Context, _ *Client, _ *containers.Container, s *specs.Spec) error {
if s.Root == nil {
s.Root = &specs.Root{}
}
s.Root.Readonly = true
return nil
}
}
// WithResources sets the provided resources on the spec for task updates
func WithResources(resources *specs.LinuxResources) UpdateTaskOpts {
return func(ctx context.Context, client *Client, r *UpdateTaskInfo) error {
@ -302,8 +313,8 @@ func WithNamespacedCgroup() SpecOpts {
}
}
// WithUidGid allows the UID and GID for the Process to be set
func WithUidGid(uid, gid uint32) SpecOpts {
// WithUIDGID allows the UID and GID for the Process to be set
func WithUIDGID(uid, gid uint32) SpecOpts {
return func(_ context.Context, _ *Client, _ *containers.Container, s *specs.Spec) error {
s.Process.User.UID = uid
s.Process.User.GID = gid

View File

@ -136,6 +136,24 @@ func createDefaultSpec() (*specs.Spec, error) {
},
},
Linux: &specs.Linux{
// TODO (AkihiroSuda): unmask /sys/firmware on Windows daemon for LCOW support?
// https://github.com/moby/moby/pull/33241/files#diff-a1f5051ce84e711a2ee688ab9ded5e74R215
MaskedPaths: []string{
"/proc/kcore",
"/proc/latency_stats",
"/proc/timer_list",
"/proc/timer_stats",
"/proc/sched_debug",
"/sys/firmware",
},
ReadonlyPaths: []string{
"/proc/asound",
"/proc/bus",
"/proc/fs",
"/proc/irq",
"/proc/sys",
"/proc/sysrq-trigger",
},
// TODO (@crosbymichael) make sure we don't have have two containers in the same cgroup
Resources: &specs.LinuxResources{
Devices: []specs.LinuxDeviceCgroup{

View File

@ -41,6 +41,7 @@ type Status struct {
ExitTime time.Time
}
// ProcessStatus returns a human readable status for the Process representing its current status
type ProcessStatus string
const (
@ -435,6 +436,15 @@ func (t *task) Metrics(ctx context.Context) (*types.Metric, error) {
if err != nil {
return nil, errdefs.FromGRPC(err)
}
if response.Metrics == nil {
_, err := t.Status(ctx)
if err != nil && errdefs.IsNotFound(err) {
return nil, err
}
return nil, errors.New("no metrics received")
}
return response.Metrics[0], nil
}

View File

@ -52,12 +52,14 @@ func WithProcessKill(ctx context.Context, p Process) error {
return nil
}
// KillInfo contains information on how to process a Kill action
type KillInfo struct {
// All kills all processes inside the task
// only valid on tasks, ignored on processes
All bool
}
// KillOpts allows options to be set for the killing of a process
type KillOpts func(context.Context, Process, *KillInfo) error
// WithKillAll kills all processes for a task

View File

@ -1,6 +1,6 @@
github.com/coreos/go-systemd 48702e0da86bd25e76cfef347e2adeb434a0d0a6
github.com/containerd/go-runc b3c048c028ddd789c6f9510c597f8b9c62f25359
github.com/containerd/console b28c739c79ce69d017e3691ad3664568d68e95c6
github.com/containerd/console 84eeaae905fa414d03e07bcd6c8d3f19e7cf180e
github.com/containerd/cgroups 5933ab4dc4f7caa3a73a1dc141bd11f42b5c9163
github.com/containerd/typeurl f6943554a7e7e88b3c14aad190bf05932da84788
github.com/docker/go-metrics 8fd5772bf1584597834c6f7961a530f06cbfbb87
@ -16,7 +16,7 @@ github.com/docker/go-units v0.3.1
github.com/gogo/protobuf d2e1ade2d719b78fe5b061b4c18a9f7111b5bdc8
github.com/golang/protobuf 5a0f697c9ed9d68fef0116532c6e05cfeae00e55
github.com/opencontainers/runtime-spec v1.0.0
github.com/opencontainers/runc 593914b8bd5448a93f7c3e4902a03408b6d5c0ce
github.com/opencontainers/runc 0351df1c5a66838d0c392b4ac4cf9450de844e2d
github.com/sirupsen/logrus v1.0.0
github.com/containerd/btrfs cc52c4dea2ce11a44e6639e561bb5c2af9ada9e3
github.com/stretchr/testify v1.1.4
@ -28,7 +28,7 @@ golang.org/x/net 7dcfb8076726a3fdd9353b6b8a1f1b6be6811bd6
google.golang.org/grpc v1.3.0
github.com/pkg/errors v0.8.0
github.com/opencontainers/go-digest 21dfd564fd89c944783d00d069f33e3e7123c448
golang.org/x/sys 7ddbeae9ae08c6a06a59597f0c9edbc5ff2444ce https://github.com/golang/sys
golang.org/x/sys 314a259e304ff91bd6985da2a7149bbf91237993 https://github.com/golang/sys
github.com/opencontainers/image-spec v1.0.0
github.com/containerd/continuity cf279e6ac893682272b4479d4c67fd3abf878b4e
golang.org/x/sync 450f422ab23cf9881c94e2db30cac0eb1b7cf80c
@ -38,7 +38,6 @@ github.com/Microsoft/go-winio v0.4.4
github.com/Microsoft/hcsshim v0.6.3
github.com/Microsoft/opengcs v0.3.2
github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd
github.com/Azure/go-ansiterm 19f72df4d05d31cbe1c56bfc8045c96babff6c7e
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4
github.com/dmcgowan/go-tar 2e2c51242e8993c50445dab7c03c8e7febddd0cf

View File

@ -18,9 +18,8 @@ github.com/golang/protobuf 18c9bb3261723cd5401db4d0c9fbc5c3b6c70fe8
github.com/docker/docker 0f5c9d301b9b1cca66b3ea0f9dec3b5317d3686d
github.com/docker/go-units v0.2.0
github.com/urfave/cli d53eb991652b1d438abdd34ce4bfa3ef1539108e
golang.org/x/sys 0e0164865330d5cf1c00247be08330bf96e2f87c https://github.com/golang/sys
golang.org/x/sys 7ddbeae9ae08c6a06a59597f0c9edbc5ff2444ce https://github.com/golang/sys
# console dependencies
github.com/containerd/console 2ce1c681f3c3c0dfa7d0af289428d36567c9a6bc
github.com/Azure/go-ansiterm fa152c58bc15761d0200cb75fe958b89a9d4888e
github.com/containerd/console 84eeaae905fa414d03e07bcd6c8d3f19e7cf180e
github.com/pkg/errors v0.8.0