Merge pull request #904 from crosbymichael/client

Add initial containerd client package
This commit is contained in:
Derek McGowan 2017-05-25 16:21:57 -07:00 committed by GitHub
commit 199544ea80
13 changed files with 1300 additions and 0 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
/bin/ /bin/
**/coverage.txt **/coverage.txt
**/coverage.out **/coverage.out
containerd.test

339
client.go Normal file
View File

@ -0,0 +1,339 @@
package containerd
import (
"context"
"encoding/json"
"io/ioutil"
"log"
"net/http"
"runtime"
"time"
"github.com/containerd/containerd/api/services/containers"
contentapi "github.com/containerd/containerd/api/services/content"
diffapi "github.com/containerd/containerd/api/services/diff"
"github.com/containerd/containerd/api/services/execution"
imagesapi "github.com/containerd/containerd/api/services/images"
snapshotapi "github.com/containerd/containerd/api/services/snapshot"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/rootfs"
contentservice "github.com/containerd/containerd/services/content"
"github.com/containerd/containerd/services/diff"
diffservice "github.com/containerd/containerd/services/diff"
imagesservice "github.com/containerd/containerd/services/images"
snapshotservice "github.com/containerd/containerd/services/snapshot"
"github.com/containerd/containerd/snapshot"
protobuf "github.com/gogo/protobuf/types"
"github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
func init() {
// reset the grpc logger so that it does not output in the STDIO of the calling process
grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
}
type NewClientOpts func(c *Client) error
func WithNamespace(namespace string) NewClientOpts {
return func(c *Client) error {
c.namespace = namespace
return nil
}
}
// New returns a new containerd client that is connected to the containerd
// instance provided by address
func New(address string, opts ...NewClientOpts) (*Client, error) {
gopts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithTimeout(100 * time.Second),
grpc.WithDialer(dialer),
}
conn, err := grpc.Dial(dialAddress(address), gopts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
}
c := &Client{
conn: conn,
runtime: runtime.GOOS,
}
for _, o := range opts {
if err := o(c); err != nil {
return nil, err
}
}
return c, nil
}
// Client is the client to interact with containerd and its various services
// using a uniform interface
type Client struct {
conn *grpc.ClientConn
runtime string
namespace string
}
// Containers returns all containers created in containerd
func (c *Client) Containers(ctx context.Context) ([]Container, error) {
r, err := c.ContainerService().List(ctx, &containers.ListContainersRequest{})
if err != nil {
return nil, err
}
var out []Container
for _, container := range r.Containers {
out = append(out, containerFromProto(c, container))
}
return out, nil
}
type NewContainerOpts func(ctx context.Context, client *Client, c *containers.Container) error
// WithContainerLabels adds the provided labels to the container
func WithContainerLabels(labels map[string]string) NewContainerOpts {
return func(_ context.Context, _ *Client, c *containers.Container) error {
c.Labels = labels
return nil
}
}
// WithExistingRootFS uses an existing root filesystem for the container
func WithExistingRootFS(id string) NewContainerOpts {
return func(ctx context.Context, client *Client, c *containers.Container) error {
// check that the snapshot exists, if not, fail on creation
if _, err := client.SnapshotService().Mounts(ctx, id); err != nil {
return err
}
c.RootFS = id
return nil
}
}
// WithNewRootFS allocates a new snapshot to be used by the container as the
// root filesystem in read-write mode
func WithNewRootFS(id string, i Image) NewContainerOpts {
return func(ctx context.Context, client *Client, c *containers.Container) error {
diffIDs, err := i.(*image).i.RootFS(ctx, client.ContentStore())
if err != nil {
return err
}
if _, err := client.SnapshotService().Prepare(ctx, id, identity.ChainID(diffIDs).String()); err != nil {
return err
}
c.RootFS = id
return nil
}
}
// WithNewReadonlyRootFS allocates a new snapshot to be used by the container as the
// root filesystem in read-only mode
func WithNewReadonlyRootFS(id string, i Image) NewContainerOpts {
return func(ctx context.Context, client *Client, c *containers.Container) error {
diffIDs, err := i.(*image).i.RootFS(ctx, client.ContentStore())
if err != nil {
return err
}
if _, err := client.SnapshotService().View(ctx, id, identity.ChainID(diffIDs).String()); err != nil {
return err
}
c.RootFS = id
return nil
}
}
func WithRuntime(name string) NewContainerOpts {
return func(ctx context.Context, client *Client, c *containers.Container) error {
c.Runtime = name
return nil
}
}
func WithImage(i Image) NewContainerOpts {
return func(ctx context.Context, client *Client, c *containers.Container) error {
c.Image = i.Name()
return nil
}
}
// NewContainer will create a new container in container with the provided id
// the id must be unique within the namespace
func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, opts ...NewContainerOpts) (Container, error) {
data, err := json.Marshal(spec)
if err != nil {
return nil, err
}
container := containers.Container{
ID: id,
Runtime: c.runtime,
Spec: &protobuf.Any{
TypeUrl: specs.Version,
Value: data,
},
}
for _, o := range opts {
if err := o(ctx, c, &container); err != nil {
return nil, err
}
}
r, err := c.ContainerService().Create(ctx, &containers.CreateContainerRequest{
Container: container,
})
if err != nil {
return nil, err
}
return containerFromProto(c, r.Container), nil
}
type PullOpts func(*Client, *PullContext) error
type PullContext struct {
Resolver remotes.Resolver
Unpacker Unpacker
}
func defaultPullContext() *PullContext {
return &PullContext{
Resolver: docker.NewResolver(docker.ResolverOptions{
Client: http.DefaultClient,
}),
}
}
func WithPullUnpack(client *Client, c *PullContext) error {
c.Unpacker = &snapshotUnpacker{
store: client.ContentStore(),
diff: client.DiffService(),
snapshotter: client.SnapshotService(),
}
return nil
}
type Unpacker interface {
Unpack(context.Context, images.Image) error
}
type snapshotUnpacker struct {
snapshotter snapshot.Snapshotter
store content.Store
diff diff.DiffService
}
func (s *snapshotUnpacker) Unpack(ctx context.Context, image images.Image) error {
layers, err := s.getLayers(ctx, image)
if err != nil {
return err
}
if _, err := rootfs.ApplyLayers(ctx, layers, s.snapshotter, s.diff); err != nil {
return err
}
return nil
}
func (s *snapshotUnpacker) getLayers(ctx context.Context, image images.Image) ([]rootfs.Layer, error) {
p, err := content.ReadBlob(ctx, s.store, image.Target.Digest)
if err != nil {
return nil, errors.Wrapf(err, "failed to read manifest blob")
}
var manifest v1.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal manifest")
}
diffIDs, err := image.RootFS(ctx, s.store)
if err != nil {
return nil, errors.Wrap(err, "failed to resolve rootfs")
}
if len(diffIDs) != len(manifest.Layers) {
return nil, errors.Errorf("mismatched image rootfs and manifest layers")
}
layers := make([]rootfs.Layer, len(diffIDs))
for i := range diffIDs {
layers[i].Diff = v1.Descriptor{
// TODO: derive media type from compressed type
MediaType: v1.MediaTypeImageLayer,
Digest: diffIDs[i],
}
layers[i].Blob = manifest.Layers[i]
}
return layers, nil
}
func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (Image, error) {
pullCtx := defaultPullContext()
for _, o := range opts {
if err := o(c, pullCtx); err != nil {
return nil, err
}
}
store := c.ContentStore()
name, desc, err := pullCtx.Resolver.Resolve(ctx, ref)
if err != nil {
return nil, err
}
fetcher, err := pullCtx.Resolver.Fetcher(ctx, name)
if err != nil {
return nil, err
}
handlers := []images.Handler{
remotes.FetchHandler(store, fetcher),
images.ChildrenHandler(store),
}
if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil {
return nil, err
}
is := c.ImageService()
if err := is.Put(ctx, name, desc); err != nil {
return nil, err
}
i, err := is.Get(ctx, name)
if err != nil {
return nil, err
}
if pullCtx.Unpacker != nil {
if err := pullCtx.Unpacker.Unpack(ctx, i); err != nil {
return nil, err
}
}
return &image{
client: c,
i: i,
}, nil
}
// Close closes the clients connection to containerd
func (c *Client) Close() error {
return c.conn.Close()
}
func (c *Client) ContainerService() containers.ContainersClient {
return containers.NewContainersClient(c.conn)
}
func (c *Client) ContentStore() content.Store {
return contentservice.NewStoreFromClient(contentapi.NewContentClient(c.conn))
}
func (c *Client) SnapshotService() snapshot.Snapshotter {
return snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotClient(c.conn))
}
func (c *Client) TaskService() execution.TasksClient {
return execution.NewTasksClient(c.conn)
}
func (c *Client) ImageService() images.Store {
return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(c.conn))
}
func (c *Client) DiffService() diff.DiffService {
return diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn))
}

48
client_test.go Normal file
View File

@ -0,0 +1,48 @@
package containerd
import (
"context"
"flag"
"testing"
)
func init() {
flag.StringVar(&address, "address", "/run/containerd/containerd.sock", "The address to the containerd socket for use in the tests")
flag.Parse()
}
var address string
func TestNewClient(t *testing.T) {
if testing.Short() {
t.Skip()
}
client, err := New(address)
if err != nil {
t.Fatal(err)
}
if client == nil {
t.Fatal("New() returned nil client")
}
if err := client.Close(); err != nil {
t.Errorf("client closed returned errror %v", err)
}
}
func TestImagePull(t *testing.T) {
if testing.Short() {
t.Skip()
}
client, err := New(address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
const ref = "docker.io/library/alpine:latest"
_, err = client.Pull(context.Background(), ref)
if err != nil {
t.Error(err)
return
}
}

17
client_unix.go Normal file
View File

@ -0,0 +1,17 @@
package containerd
import (
"fmt"
"net"
"strings"
"time"
)
func dialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout)
}
func dialAddress(address string) string {
return fmt.Sprintf("unix://%s", address)
}

16
client_windows.go Normal file
View File

@ -0,0 +1,16 @@
package containerd
import (
"net"
"time"
winio "github.com/Microsoft/go-winio"
)
func dialer(address string, timeout time.Duration) (net.Conn, error) {
return winio.DialPipe(address, &timeout)
}
func dialAddress(address string) string {
return address
}

110
container.go Normal file
View File

@ -0,0 +1,110 @@
package containerd
import (
"context"
"encoding/json"
"github.com/containerd/containerd/api/services/containers"
"github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/mount"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
type Container interface {
ID() string
Delete(context.Context) error
NewTask(context.Context, IOCreation) (Task, error)
Spec() (*specs.Spec, error)
Task() Task
}
func containerFromProto(client *Client, c containers.Container) *container {
return &container{
client: client,
c: c,
}
}
var _ = (Container)(&container{})
type container struct {
client *Client
c containers.Container
task *task
}
// ID returns the container's unique id
func (c *container) ID() string {
return c.c.ID
}
// Spec returns the current OCI specification for the container
func (c *container) Spec() (*specs.Spec, error) {
var s specs.Spec
if err := json.Unmarshal(c.c.Spec.Value, &s); err != nil {
return nil, err
}
return &s, nil
}
// Delete deletes an existing container
// an error is returned if the container has running tasks
func (c *container) Delete(ctx context.Context) (err error) {
// TODO: should the client be the one removing resources attached
// to the container at the moment before we have GC?
if c.c.RootFS != "" {
err = c.client.SnapshotService().Remove(ctx, c.c.RootFS)
}
if _, cerr := c.client.ContainerService().Delete(ctx, &containers.DeleteContainerRequest{
ID: c.c.ID,
}); err == nil {
err = cerr
}
return err
}
func (c *container) Task() Task {
return c.task
}
func (c *container) NewTask(ctx context.Context, ioCreate IOCreation) (Task, error) {
i, err := ioCreate()
if err != nil {
return nil, err
}
request := &execution.CreateRequest{
ContainerID: c.c.ID,
Terminal: i.Terminal,
Stdin: i.Stdin,
Stdout: i.Stdout,
Stderr: i.Stderr,
}
if c.c.RootFS != "" {
// get the rootfs from the snapshotter and add it to the request
mounts, err := c.client.SnapshotService().Mounts(ctx, c.c.RootFS)
if err != nil {
return nil, err
}
for _, m := range mounts {
request.Rootfs = append(request.Rootfs, &mount.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
}
}
response, err := c.client.TaskService().Create(ctx, request)
if err != nil {
return nil, err
}
t := &task{
client: c.client,
io: i,
containerID: response.ContainerID,
pid: response.Pid,
}
c.task = t
return t, nil
}

60
container_test.go Normal file
View File

@ -0,0 +1,60 @@
package containerd
import (
"context"
"testing"
)
func TestContainerList(t *testing.T) {
if testing.Short() {
t.Skip()
}
client, err := New(address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
containers, err := client.Containers(context.Background())
if err != nil {
t.Errorf("container list returned error %v", err)
return
}
if len(containers) != 0 {
t.Errorf("expected 0 containers but received %d", len(containers))
}
}
func TestNewContainer(t *testing.T) {
if testing.Short() {
t.Skip()
}
client, err := New(address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
id := "test"
spec, err := GenerateSpec()
if err != nil {
t.Error(err)
return
}
container, err := client.NewContainer(context.Background(), id, spec)
if err != nil {
t.Error(err)
return
}
if container.ID() != id {
t.Errorf("expected container id %q but received %q", id, container.ID())
}
if spec, err = container.Spec(); err != nil {
t.Error(err)
return
}
if err := container.Delete(context.Background()); err != nil {
t.Error(err)
return
}
}

19
image.go Normal file
View File

@ -0,0 +1,19 @@
package containerd
import "github.com/containerd/containerd/images"
type Image interface {
Name() string
}
var _ = (Image)(&image{})
type image struct {
client *Client
i images.Image
}
func (i *image) Name() string {
return i.i.Name
}

27
spec.go Normal file
View File

@ -0,0 +1,27 @@
package containerd
import specs "github.com/opencontainers/runtime-spec/specs-go"
type SpecOpts func(s *specs.Spec) error
func WithProcessArgs(args ...string) SpecOpts {
return func(s *specs.Spec) error {
s.Process.Args = args
return nil
}
}
// GenerateSpec will generate a default spec from the provided image
// for use as a containerd container
func GenerateSpec(opts ...SpecOpts) (*specs.Spec, error) {
s, err := createDefaultSpec()
if err != nil {
return nil, err
}
for _, o := range opts {
if err := o(s); err != nil {
return nil, err
}
}
return s, nil
}

255
spec_unix.go Normal file
View File

@ -0,0 +1,255 @@
package containerd
import (
"context"
"encoding/json"
"fmt"
"runtime"
"strconv"
"strings"
"github.com/containerd/containerd/images"
"github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
const (
rwm = "rwm"
defaultRootfsPath = "rootfs"
)
func defaltCaps() []string {
return []string{
"CAP_CHOWN",
"CAP_DAC_OVERRIDE",
"CAP_FSETID",
"CAP_FOWNER",
"CAP_MKNOD",
"CAP_NET_RAW",
"CAP_SETGID",
"CAP_SETUID",
"CAP_SETFCAP",
"CAP_SETPCAP",
"CAP_NET_BIND_SERVICE",
"CAP_SYS_CHROOT",
"CAP_KILL",
"CAP_AUDIT_WRITE",
}
}
func defaultNamespaces() []specs.LinuxNamespace {
return []specs.LinuxNamespace{
{
Type: specs.PIDNamespace,
},
{
Type: specs.IPCNamespace,
},
{
Type: specs.UTSNamespace,
},
{
Type: specs.MountNamespace,
},
{
Type: specs.NetworkNamespace,
},
}
}
func createDefaultSpec() (*specs.Spec, error) {
s := &specs.Spec{
Version: specs.Version,
Platform: specs.Platform{
OS: runtime.GOOS,
Arch: runtime.GOARCH,
},
Root: specs.Root{
Path: defaultRootfsPath,
},
Process: specs.Process{
Cwd: "/",
NoNewPrivileges: true,
User: specs.User{
UID: 0,
GID: 0,
},
Capabilities: &specs.LinuxCapabilities{
Bounding: defaltCaps(),
Permitted: defaltCaps(),
Inheritable: defaltCaps(),
Effective: defaltCaps(),
Ambient: defaltCaps(),
},
Rlimits: []specs.LinuxRlimit{
{
Type: "RLIMIT_NOFILE",
Hard: uint64(1024),
Soft: uint64(1024),
},
},
},
Mounts: []specs.Mount{
{
Destination: "/proc",
Type: "proc",
Source: "proc",
},
{
Destination: "/dev",
Type: "tmpfs",
Source: "tmpfs",
Options: []string{"nosuid", "strictatime", "mode=755", "size=65536k"},
},
{
Destination: "/dev/pts",
Type: "devpts",
Source: "devpts",
Options: []string{"nosuid", "noexec", "newinstance", "ptmxmode=0666", "mode=0620", "gid=5"},
},
{
Destination: "/dev/shm",
Type: "tmpfs",
Source: "shm",
Options: []string{"nosuid", "noexec", "nodev", "mode=1777", "size=65536k"},
},
{
Destination: "/dev/mqueue",
Type: "mqueue",
Source: "mqueue",
Options: []string{"nosuid", "noexec", "nodev"},
},
{
Destination: "/sys",
Type: "sysfs",
Source: "sysfs",
Options: []string{"nosuid", "noexec", "nodev", "ro"},
},
{
Destination: "/run",
Type: "tmpfs",
Source: "tmpfs",
Options: []string{"nosuid", "strictatime", "mode=755", "size=65536k"},
},
{
Destination: "/etc/resolv.conf",
Type: "bind",
Source: "/etc/resolv.conf",
Options: []string{"rbind", "ro"},
},
{
Destination: "/etc/hosts",
Type: "bind",
Source: "/etc/hosts",
Options: []string{"rbind", "ro"},
},
{
Destination: "/etc/localtime",
Type: "bind",
Source: "/etc/localtime",
Options: []string{"rbind", "ro"},
},
},
Linux: &specs.Linux{
Resources: &specs.LinuxResources{
Devices: []specs.LinuxDeviceCgroup{
{
Allow: false,
Access: rwm,
},
},
},
Namespaces: defaultNamespaces(),
},
}
return s, nil
}
func WithTTY(s *specs.Spec) error {
s.Process.Terminal = true
s.Process.Env = append(s.Process.Env, "TERM=xterm")
return nil
}
func WithHostNamespace(ns specs.LinuxNamespaceType) SpecOpts {
return func(s *specs.Spec) error {
for i, n := range s.Linux.Namespaces {
if n.Type == ns {
s.Linux.Namespaces = append(s.Linux.Namespaces[:i], s.Linux.Namespaces[i+1:]...)
return nil
}
}
return nil
}
}
func WithImageConfig(ctx context.Context, i Image) SpecOpts {
return func(s *specs.Spec) error {
var (
image = i.(*image)
store = image.client.ContentStore()
)
ic, err := image.i.Config(ctx, store)
if err != nil {
return err
}
var (
ociimage v1.Image
config v1.ImageConfig
)
switch ic.MediaType {
case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config:
r, err := store.Reader(ctx, ic.Digest)
if err != nil {
return err
}
if err := json.NewDecoder(r).Decode(&ociimage); err != nil {
r.Close()
return err
}
r.Close()
config = ociimage.Config
default:
return fmt.Errorf("unknown image config media type %s", ic.MediaType)
}
env := []string{
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
}
s.Process.Env = append(env, config.Env...)
var (
uid, gid uint32
)
cmd := config.Cmd
s.Process.Args = append(config.Entrypoint, cmd...)
if config.User != "" {
parts := strings.Split(config.User, ":")
switch len(parts) {
case 1:
v, err := strconv.ParseUint(parts[0], 0, 10)
if err != nil {
return err
}
uid, gid = uint32(v), uint32(v)
case 2:
v, err := strconv.ParseUint(parts[0], 0, 10)
if err != nil {
return err
}
uid = uint32(v)
if v, err = strconv.ParseUint(parts[1], 0, 10); err != nil {
return err
}
gid = uint32(v)
default:
return fmt.Errorf("invalid USER value %s", config.User)
}
}
s.Process.User.UID, s.Process.User.GID = uid, gid
cwd := config.WorkingDir
if cwd == "" {
cwd = "/"
}
s.Process.Cwd = cwd
return nil
}
}

56
spec_unix_test.go Normal file
View File

@ -0,0 +1,56 @@
package containerd
import "testing"
func TestGenerateSpec(t *testing.T) {
s, err := GenerateSpec()
if err != nil {
t.Fatal(err)
}
if s == nil {
t.Fatal("GenerateSpec() returns a nil spec")
}
// check for matching caps
defaults := defaltCaps()
for _, cl := range [][]string{
s.Process.Capabilities.Ambient,
s.Process.Capabilities.Bounding,
s.Process.Capabilities.Permitted,
s.Process.Capabilities.Inheritable,
s.Process.Capabilities.Effective,
} {
for i := 0; i < len(defaults); i++ {
if cl[i] != defaults[i] {
t.Errorf("cap at %d does not match set %q != %q", i, defaults[i], cl[i])
}
}
}
// check default namespaces
defaultNS := defaultNamespaces()
for i, ns := range s.Linux.Namespaces {
if defaultNS[i] != ns {
t.Errorf("ns at %d does not match set %q != %q", i, defaultNS[i], ns)
}
}
// test that we don't have tty set
if s.Process.Terminal {
t.Error("terminal set on default process")
}
}
func TestSpecWithTTY(t *testing.T) {
s, err := GenerateSpec(WithTTY)
if err != nil {
t.Fatal(err)
}
if !s.Process.Terminal {
t.Error("terminal net set WithTTY()")
}
v := s.Process.Env[len(s.Process.Env)-1]
if v != "TERM=xterm" {
t.Errorf("xterm not set in env for TTY")
}
}

79
spec_windows.go Normal file
View File

@ -0,0 +1,79 @@
package containerd
import (
"context"
"encoding/json"
"fmt"
"runtime"
"github.com/containerd/containerd/images"
"github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
const pipeRoot = `\\.\pipe`
func createDefaultSpec() (*specs.Spec, error) {
return &specs.Spec{
Version: specs.Version,
Platform: specs.Platform{
OS: runtime.GOOS,
Arch: runtime.GOARCH,
},
Root: specs.Root{},
Process: specs.Process{
Env: config.Env,
ConsoleSize: specs.Box{
Width: 80,
Height: 20,
},
},
}, nil
}
func WithImageConfig(ctx context.Context, i Image) SpecOpts {
return func(s *specs.Spec) error {
var (
image = i.(*image)
store = image.client.ContentStore()
)
ic, err := image.i.Config(ctx, store)
if err != nil {
return err
}
var (
ociimage v1.Image
config v1.ImageConfig
)
switch ic.MediaType {
case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config:
r, err := store.Reader(ctx, ic.Digest)
if err != nil {
return err
}
if err := json.NewDecoder(r).Decode(&ociimage); err != nil {
r.Close()
return err
}
r.Close()
config = ociimage.Config
default:
return fmt.Errorf("unknown image config media type %s", ic.MediaType)
}
s.Process.Env = config.Env
s.Process.Args = append(config.Entrypoint, config.Cmd...)
s.Process.User = specs.User{
Username: config.User,
}
return nil
}
}
func WithTTY(width, height int) SpecOpts {
func(s *specs.Spec) error {
s.Process.Terminal = true
s.Process.ConsoleSize.Width = width
s.Process.ConsoleSize.Height = height
return nil
}
}

273
task.go Normal file
View File

@ -0,0 +1,273 @@
package containerd
import (
"context"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
"syscall"
"github.com/containerd/containerd/api/services/execution"
taskapi "github.com/containerd/containerd/api/types/task"
"github.com/containerd/fifo"
)
const UnknownExitStatus = 255
type IO struct {
Terminal bool
Stdin string
Stdout string
Stderr string
closer io.Closer
}
func (i *IO) Close() error {
if i.closer == nil {
return nil
}
return i.closer.Close()
}
type IOCreation func() (*IO, error)
// Stdio returns an IO implementation to be used for a task
// that outputs the container's IO as the current processes Stdio
func Stdio() (*IO, error) {
paths, err := fifoPaths()
if err != nil {
return nil, err
}
set := &ioSet{
in: os.Stdin,
out: os.Stdout,
err: os.Stderr,
}
closer, err := copyIO(paths, set, false)
if err != nil {
return nil, err
}
return &IO{
Terminal: false,
Stdin: paths.in,
Stdout: paths.out,
Stderr: paths.err,
closer: closer,
}, nil
}
func fifoPaths() (*fifoSet, error) {
root := filepath.Join(os.TempDir(), "containerd")
if err := os.MkdirAll(root, 0700); err != nil {
return nil, err
}
dir, err := ioutil.TempDir(root, "")
if err != nil {
return nil, err
}
return &fifoSet{
dir: dir,
in: filepath.Join(dir, "stdin"),
out: filepath.Join(dir, "stdout"),
err: filepath.Join(dir, "stderr"),
}, nil
}
type fifoSet struct {
// dir is the directory holding the task fifos
dir string
in, out, err string
}
type ioSet struct {
in io.Reader
out, err io.Writer
}
func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error) {
var (
ctx = context.Background()
wg = &sync.WaitGroup{}
)
f, err := fifo.OpenFifo(ctx, fifos.in, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
return nil, err
}
defer func(c io.Closer) {
if err != nil {
c.Close()
}
}(f)
go func(w io.WriteCloser) {
io.Copy(w, ioset.in)
w.Close()
}(f)
f, err = fifo.OpenFifo(ctx, fifos.out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
return nil, err
}
defer func(c io.Closer) {
if err != nil {
c.Close()
}
}(f)
wg.Add(1)
go func(r io.ReadCloser) {
io.Copy(ioset.out, r)
r.Close()
wg.Done()
}(f)
f, err = fifo.OpenFifo(ctx, fifos.err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
return nil, err
}
defer func(c io.Closer) {
if err != nil {
c.Close()
}
}(f)
if !tty {
wg.Add(1)
go func(r io.ReadCloser) {
io.Copy(ioset.err, r)
r.Close()
wg.Done()
}(f)
}
return &wgCloser{
wg: wg,
dir: fifos.dir,
}, nil
}
type wgCloser struct {
wg *sync.WaitGroup
dir string
}
func (g *wgCloser) Close() error {
g.wg.Wait()
if g.dir != "" {
return os.RemoveAll(g.dir)
}
return nil
}
type TaskStatus string
const (
Running TaskStatus = "running"
Created TaskStatus = "created"
Stopped TaskStatus = "stopped"
Paused TaskStatus = "paused"
Pausing TaskStatus = "pausing"
)
type Task interface {
Delete(context.Context) (uint32, error)
Kill(context.Context, syscall.Signal) error
Pause(context.Context) error
Resume(context.Context) error
Pid() uint32
Start(context.Context) error
Status(context.Context) (TaskStatus, error)
Wait(context.Context) (uint32, error)
}
var _ = (Task)(&task{})
type task struct {
client *Client
io *IO
containerID string
pid uint32
}
// Pid returns the pid or process id for the task
func (t *task) Pid() uint32 {
return t.pid
}
func (t *task) Start(ctx context.Context) error {
_, err := t.client.TaskService().Start(ctx, &execution.StartRequest{
ContainerID: t.containerID,
})
return err
}
func (t *task) Kill(ctx context.Context, s syscall.Signal) error {
_, err := t.client.TaskService().Kill(ctx, &execution.KillRequest{
Signal: uint32(s),
ContainerID: t.containerID,
PidOrAll: &execution.KillRequest_All{
All: true,
},
})
return err
}
func (t *task) Pause(ctx context.Context) error {
_, err := t.client.TaskService().Pause(ctx, &execution.PauseRequest{
ContainerID: t.containerID,
})
return err
}
func (t *task) Resume(ctx context.Context) error {
_, err := t.client.TaskService().Resume(ctx, &execution.ResumeRequest{
ContainerID: t.containerID,
})
return err
}
func (t *task) Status(ctx context.Context) (TaskStatus, error) {
r, err := t.client.TaskService().Info(ctx, &execution.InfoRequest{
ContainerID: t.containerID,
})
if err != nil {
return "", err
}
return TaskStatus(r.Task.Status.String()), nil
}
// Wait is a blocking call that will wait for the task to exit and return the exit status
func (t *task) Wait(ctx context.Context) (uint32, error) {
events, err := t.client.TaskService().Events(ctx, &execution.EventsRequest{})
if err != nil {
return UnknownExitStatus, err
}
for {
e, err := events.Recv()
if err != nil {
return UnknownExitStatus, err
}
if e.Type != taskapi.Event_EXIT {
continue
}
if e.ID == t.containerID && e.Pid == t.pid {
return e.ExitStatus, nil
}
}
}
// Delete deletes the task and its runtime state
// it returns the exit status of the task and any errors that were encountered
// during cleanup
func (t *task) Delete(ctx context.Context) (uint32, error) {
cerr := t.io.Close()
r, err := t.client.TaskService().Delete(ctx, &execution.DeleteRequest{
ContainerID: t.containerID,
})
if err != nil {
return UnknownExitStatus, err
}
return r.ExitStatus, cerr
}