diff --git a/.gitignore b/.gitignore index 45bd181cb..d3397d696 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /bin/ **/coverage.txt **/coverage.out +containerd.test diff --git a/client.go b/client.go new file mode 100644 index 000000000..f9160ab82 --- /dev/null +++ b/client.go @@ -0,0 +1,339 @@ +package containerd + +import ( + "context" + "encoding/json" + "io/ioutil" + "log" + "net/http" + "runtime" + "time" + + "github.com/containerd/containerd/api/services/containers" + contentapi "github.com/containerd/containerd/api/services/content" + diffapi "github.com/containerd/containerd/api/services/diff" + "github.com/containerd/containerd/api/services/execution" + imagesapi "github.com/containerd/containerd/api/services/images" + snapshotapi "github.com/containerd/containerd/api/services/snapshot" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + "github.com/containerd/containerd/rootfs" + contentservice "github.com/containerd/containerd/services/content" + "github.com/containerd/containerd/services/diff" + diffservice "github.com/containerd/containerd/services/diff" + imagesservice "github.com/containerd/containerd/services/images" + snapshotservice "github.com/containerd/containerd/services/snapshot" + "github.com/containerd/containerd/snapshot" + protobuf "github.com/gogo/protobuf/types" + "github.com/opencontainers/image-spec/identity" + "github.com/opencontainers/image-spec/specs-go/v1" + specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" +) + +func init() { + // reset the grpc logger so that it does not output in the STDIO of the calling process + grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags)) +} + +type NewClientOpts func(c *Client) error + +func WithNamespace(namespace string) NewClientOpts { + return func(c *Client) error { + c.namespace = namespace + return nil + } +} + +// New returns a new containerd client that is connected to the containerd +// instance provided by address +func New(address string, opts ...NewClientOpts) (*Client, error) { + gopts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithTimeout(100 * time.Second), + grpc.WithDialer(dialer), + } + conn, err := grpc.Dial(dialAddress(address), gopts...) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial %q", address) + } + c := &Client{ + conn: conn, + runtime: runtime.GOOS, + } + for _, o := range opts { + if err := o(c); err != nil { + return nil, err + } + } + return c, nil +} + +// Client is the client to interact with containerd and its various services +// using a uniform interface +type Client struct { + conn *grpc.ClientConn + + runtime string + namespace string +} + +// Containers returns all containers created in containerd +func (c *Client) Containers(ctx context.Context) ([]Container, error) { + r, err := c.ContainerService().List(ctx, &containers.ListContainersRequest{}) + if err != nil { + return nil, err + } + var out []Container + for _, container := range r.Containers { + out = append(out, containerFromProto(c, container)) + } + return out, nil +} + +type NewContainerOpts func(ctx context.Context, client *Client, c *containers.Container) error + +// WithContainerLabels adds the provided labels to the container +func WithContainerLabels(labels map[string]string) NewContainerOpts { + return func(_ context.Context, _ *Client, c *containers.Container) error { + c.Labels = labels + return nil + } +} + +// WithExistingRootFS uses an existing root filesystem for the container +func WithExistingRootFS(id string) NewContainerOpts { + return func(ctx context.Context, client *Client, c *containers.Container) error { + // check that the snapshot exists, if not, fail on creation + if _, err := client.SnapshotService().Mounts(ctx, id); err != nil { + return err + } + c.RootFS = id + return nil + } +} + +// WithNewRootFS allocates a new snapshot to be used by the container as the +// root filesystem in read-write mode +func WithNewRootFS(id string, i Image) NewContainerOpts { + return func(ctx context.Context, client *Client, c *containers.Container) error { + diffIDs, err := i.(*image).i.RootFS(ctx, client.ContentStore()) + if err != nil { + return err + } + if _, err := client.SnapshotService().Prepare(ctx, id, identity.ChainID(diffIDs).String()); err != nil { + return err + } + c.RootFS = id + return nil + } +} + +// WithNewReadonlyRootFS allocates a new snapshot to be used by the container as the +// root filesystem in read-only mode +func WithNewReadonlyRootFS(id string, i Image) NewContainerOpts { + return func(ctx context.Context, client *Client, c *containers.Container) error { + diffIDs, err := i.(*image).i.RootFS(ctx, client.ContentStore()) + if err != nil { + return err + } + if _, err := client.SnapshotService().View(ctx, id, identity.ChainID(diffIDs).String()); err != nil { + return err + } + c.RootFS = id + return nil + } +} + +func WithRuntime(name string) NewContainerOpts { + return func(ctx context.Context, client *Client, c *containers.Container) error { + c.Runtime = name + return nil + } +} + +func WithImage(i Image) NewContainerOpts { + return func(ctx context.Context, client *Client, c *containers.Container) error { + c.Image = i.Name() + return nil + } +} + +// NewContainer will create a new container in container with the provided id +// the id must be unique within the namespace +func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, opts ...NewContainerOpts) (Container, error) { + data, err := json.Marshal(spec) + if err != nil { + return nil, err + } + container := containers.Container{ + ID: id, + Runtime: c.runtime, + Spec: &protobuf.Any{ + TypeUrl: specs.Version, + Value: data, + }, + } + for _, o := range opts { + if err := o(ctx, c, &container); err != nil { + return nil, err + } + } + r, err := c.ContainerService().Create(ctx, &containers.CreateContainerRequest{ + Container: container, + }) + if err != nil { + return nil, err + } + return containerFromProto(c, r.Container), nil +} + +type PullOpts func(*Client, *PullContext) error + +type PullContext struct { + Resolver remotes.Resolver + Unpacker Unpacker +} + +func defaultPullContext() *PullContext { + return &PullContext{ + Resolver: docker.NewResolver(docker.ResolverOptions{ + Client: http.DefaultClient, + }), + } +} + +func WithPullUnpack(client *Client, c *PullContext) error { + c.Unpacker = &snapshotUnpacker{ + store: client.ContentStore(), + diff: client.DiffService(), + snapshotter: client.SnapshotService(), + } + return nil +} + +type Unpacker interface { + Unpack(context.Context, images.Image) error +} + +type snapshotUnpacker struct { + snapshotter snapshot.Snapshotter + store content.Store + diff diff.DiffService +} + +func (s *snapshotUnpacker) Unpack(ctx context.Context, image images.Image) error { + layers, err := s.getLayers(ctx, image) + if err != nil { + return err + } + if _, err := rootfs.ApplyLayers(ctx, layers, s.snapshotter, s.diff); err != nil { + return err + } + return nil +} + +func (s *snapshotUnpacker) getLayers(ctx context.Context, image images.Image) ([]rootfs.Layer, error) { + p, err := content.ReadBlob(ctx, s.store, image.Target.Digest) + if err != nil { + return nil, errors.Wrapf(err, "failed to read manifest blob") + } + var manifest v1.Manifest + if err := json.Unmarshal(p, &manifest); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal manifest") + } + diffIDs, err := image.RootFS(ctx, s.store) + if err != nil { + return nil, errors.Wrap(err, "failed to resolve rootfs") + } + if len(diffIDs) != len(manifest.Layers) { + return nil, errors.Errorf("mismatched image rootfs and manifest layers") + } + layers := make([]rootfs.Layer, len(diffIDs)) + for i := range diffIDs { + layers[i].Diff = v1.Descriptor{ + // TODO: derive media type from compressed type + MediaType: v1.MediaTypeImageLayer, + Digest: diffIDs[i], + } + layers[i].Blob = manifest.Layers[i] + } + return layers, nil +} + +func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (Image, error) { + pullCtx := defaultPullContext() + for _, o := range opts { + if err := o(c, pullCtx); err != nil { + return nil, err + } + } + store := c.ContentStore() + + name, desc, err := pullCtx.Resolver.Resolve(ctx, ref) + if err != nil { + return nil, err + } + fetcher, err := pullCtx.Resolver.Fetcher(ctx, name) + if err != nil { + return nil, err + } + + handlers := []images.Handler{ + remotes.FetchHandler(store, fetcher), + images.ChildrenHandler(store), + } + if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil { + return nil, err + } + is := c.ImageService() + if err := is.Put(ctx, name, desc); err != nil { + return nil, err + } + i, err := is.Get(ctx, name) + if err != nil { + return nil, err + } + if pullCtx.Unpacker != nil { + if err := pullCtx.Unpacker.Unpack(ctx, i); err != nil { + return nil, err + } + } + return &image{ + client: c, + i: i, + }, nil +} + +// Close closes the clients connection to containerd +func (c *Client) Close() error { + return c.conn.Close() +} + +func (c *Client) ContainerService() containers.ContainersClient { + return containers.NewContainersClient(c.conn) +} + +func (c *Client) ContentStore() content.Store { + return contentservice.NewStoreFromClient(contentapi.NewContentClient(c.conn)) +} + +func (c *Client) SnapshotService() snapshot.Snapshotter { + return snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotClient(c.conn)) +} + +func (c *Client) TaskService() execution.TasksClient { + return execution.NewTasksClient(c.conn) +} + +func (c *Client) ImageService() images.Store { + return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(c.conn)) +} + +func (c *Client) DiffService() diff.DiffService { + return diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn)) +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 000000000..871b3167f --- /dev/null +++ b/client_test.go @@ -0,0 +1,48 @@ +package containerd + +import ( + "context" + "flag" + "testing" +) + +func init() { + flag.StringVar(&address, "address", "/run/containerd/containerd.sock", "The address to the containerd socket for use in the tests") + flag.Parse() +} + +var address string + +func TestNewClient(t *testing.T) { + if testing.Short() { + t.Skip() + } + client, err := New(address) + if err != nil { + t.Fatal(err) + } + if client == nil { + t.Fatal("New() returned nil client") + } + if err := client.Close(); err != nil { + t.Errorf("client closed returned errror %v", err) + } +} + +func TestImagePull(t *testing.T) { + if testing.Short() { + t.Skip() + } + client, err := New(address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + const ref = "docker.io/library/alpine:latest" + _, err = client.Pull(context.Background(), ref) + if err != nil { + t.Error(err) + return + } +} diff --git a/client_unix.go b/client_unix.go new file mode 100644 index 000000000..b2ba75270 --- /dev/null +++ b/client_unix.go @@ -0,0 +1,17 @@ +package containerd + +import ( + "fmt" + "net" + "strings" + "time" +) + +func dialer(address string, timeout time.Duration) (net.Conn, error) { + address = strings.TrimPrefix(address, "unix://") + return net.DialTimeout("unix", address, timeout) +} + +func dialAddress(address string) string { + return fmt.Sprintf("unix://%s", address) +} diff --git a/client_windows.go b/client_windows.go new file mode 100644 index 000000000..548024e5b --- /dev/null +++ b/client_windows.go @@ -0,0 +1,16 @@ +package containerd + +import ( + "net" + "time" + + winio "github.com/Microsoft/go-winio" +) + +func dialer(address string, timeout time.Duration) (net.Conn, error) { + return winio.DialPipe(address, &timeout) +} + +func dialAddress(address string) string { + return address +} diff --git a/container.go b/container.go new file mode 100644 index 000000000..7b9bffd8d --- /dev/null +++ b/container.go @@ -0,0 +1,110 @@ +package containerd + +import ( + "context" + "encoding/json" + + "github.com/containerd/containerd/api/services/containers" + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/mount" + specs "github.com/opencontainers/runtime-spec/specs-go" +) + +type Container interface { + ID() string + Delete(context.Context) error + NewTask(context.Context, IOCreation) (Task, error) + Spec() (*specs.Spec, error) + Task() Task +} + +func containerFromProto(client *Client, c containers.Container) *container { + return &container{ + client: client, + c: c, + } +} + +var _ = (Container)(&container{}) + +type container struct { + client *Client + + c containers.Container + task *task +} + +// ID returns the container's unique id +func (c *container) ID() string { + return c.c.ID +} + +// Spec returns the current OCI specification for the container +func (c *container) Spec() (*specs.Spec, error) { + var s specs.Spec + if err := json.Unmarshal(c.c.Spec.Value, &s); err != nil { + return nil, err + } + return &s, nil +} + +// Delete deletes an existing container +// an error is returned if the container has running tasks +func (c *container) Delete(ctx context.Context) (err error) { + // TODO: should the client be the one removing resources attached + // to the container at the moment before we have GC? + if c.c.RootFS != "" { + err = c.client.SnapshotService().Remove(ctx, c.c.RootFS) + } + + if _, cerr := c.client.ContainerService().Delete(ctx, &containers.DeleteContainerRequest{ + ID: c.c.ID, + }); err == nil { + err = cerr + } + return err +} + +func (c *container) Task() Task { + return c.task +} + +func (c *container) NewTask(ctx context.Context, ioCreate IOCreation) (Task, error) { + i, err := ioCreate() + if err != nil { + return nil, err + } + request := &execution.CreateRequest{ + ContainerID: c.c.ID, + Terminal: i.Terminal, + Stdin: i.Stdin, + Stdout: i.Stdout, + Stderr: i.Stderr, + } + if c.c.RootFS != "" { + // get the rootfs from the snapshotter and add it to the request + mounts, err := c.client.SnapshotService().Mounts(ctx, c.c.RootFS) + if err != nil { + return nil, err + } + for _, m := range mounts { + request.Rootfs = append(request.Rootfs, &mount.Mount{ + Type: m.Type, + Source: m.Source, + Options: m.Options, + }) + } + } + response, err := c.client.TaskService().Create(ctx, request) + if err != nil { + return nil, err + } + t := &task{ + client: c.client, + io: i, + containerID: response.ContainerID, + pid: response.Pid, + } + c.task = t + return t, nil +} diff --git a/container_test.go b/container_test.go new file mode 100644 index 000000000..7f55a94c1 --- /dev/null +++ b/container_test.go @@ -0,0 +1,60 @@ +package containerd + +import ( + "context" + "testing" +) + +func TestContainerList(t *testing.T) { + if testing.Short() { + t.Skip() + } + client, err := New(address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + containers, err := client.Containers(context.Background()) + if err != nil { + t.Errorf("container list returned error %v", err) + return + } + if len(containers) != 0 { + t.Errorf("expected 0 containers but received %d", len(containers)) + } +} + +func TestNewContainer(t *testing.T) { + if testing.Short() { + t.Skip() + } + client, err := New(address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + id := "test" + spec, err := GenerateSpec() + if err != nil { + t.Error(err) + return + } + container, err := client.NewContainer(context.Background(), id, spec) + if err != nil { + t.Error(err) + return + } + if container.ID() != id { + t.Errorf("expected container id %q but received %q", id, container.ID()) + } + if spec, err = container.Spec(); err != nil { + t.Error(err) + return + } + if err := container.Delete(context.Background()); err != nil { + t.Error(err) + return + } +} diff --git a/image.go b/image.go new file mode 100644 index 000000000..972778daf --- /dev/null +++ b/image.go @@ -0,0 +1,19 @@ +package containerd + +import "github.com/containerd/containerd/images" + +type Image interface { + Name() string +} + +var _ = (Image)(&image{}) + +type image struct { + client *Client + + i images.Image +} + +func (i *image) Name() string { + return i.i.Name +} diff --git a/spec.go b/spec.go new file mode 100644 index 000000000..53e1fd5f5 --- /dev/null +++ b/spec.go @@ -0,0 +1,27 @@ +package containerd + +import specs "github.com/opencontainers/runtime-spec/specs-go" + +type SpecOpts func(s *specs.Spec) error + +func WithProcessArgs(args ...string) SpecOpts { + return func(s *specs.Spec) error { + s.Process.Args = args + return nil + } +} + +// GenerateSpec will generate a default spec from the provided image +// for use as a containerd container +func GenerateSpec(opts ...SpecOpts) (*specs.Spec, error) { + s, err := createDefaultSpec() + if err != nil { + return nil, err + } + for _, o := range opts { + if err := o(s); err != nil { + return nil, err + } + } + return s, nil +} diff --git a/spec_unix.go b/spec_unix.go new file mode 100644 index 000000000..ee9c26b1c --- /dev/null +++ b/spec_unix.go @@ -0,0 +1,255 @@ +package containerd + +import ( + "context" + "encoding/json" + "fmt" + "runtime" + "strconv" + "strings" + + "github.com/containerd/containerd/images" + "github.com/opencontainers/image-spec/specs-go/v1" + specs "github.com/opencontainers/runtime-spec/specs-go" +) + +const ( + rwm = "rwm" + defaultRootfsPath = "rootfs" +) + +func defaltCaps() []string { + return []string{ + "CAP_CHOWN", + "CAP_DAC_OVERRIDE", + "CAP_FSETID", + "CAP_FOWNER", + "CAP_MKNOD", + "CAP_NET_RAW", + "CAP_SETGID", + "CAP_SETUID", + "CAP_SETFCAP", + "CAP_SETPCAP", + "CAP_NET_BIND_SERVICE", + "CAP_SYS_CHROOT", + "CAP_KILL", + "CAP_AUDIT_WRITE", + } +} + +func defaultNamespaces() []specs.LinuxNamespace { + return []specs.LinuxNamespace{ + { + Type: specs.PIDNamespace, + }, + { + Type: specs.IPCNamespace, + }, + { + Type: specs.UTSNamespace, + }, + { + Type: specs.MountNamespace, + }, + { + Type: specs.NetworkNamespace, + }, + } +} + +func createDefaultSpec() (*specs.Spec, error) { + s := &specs.Spec{ + Version: specs.Version, + Platform: specs.Platform{ + OS: runtime.GOOS, + Arch: runtime.GOARCH, + }, + Root: specs.Root{ + Path: defaultRootfsPath, + }, + Process: specs.Process{ + Cwd: "/", + NoNewPrivileges: true, + User: specs.User{ + UID: 0, + GID: 0, + }, + Capabilities: &specs.LinuxCapabilities{ + Bounding: defaltCaps(), + Permitted: defaltCaps(), + Inheritable: defaltCaps(), + Effective: defaltCaps(), + Ambient: defaltCaps(), + }, + Rlimits: []specs.LinuxRlimit{ + { + Type: "RLIMIT_NOFILE", + Hard: uint64(1024), + Soft: uint64(1024), + }, + }, + }, + Mounts: []specs.Mount{ + { + Destination: "/proc", + Type: "proc", + Source: "proc", + }, + { + Destination: "/dev", + Type: "tmpfs", + Source: "tmpfs", + Options: []string{"nosuid", "strictatime", "mode=755", "size=65536k"}, + }, + { + Destination: "/dev/pts", + Type: "devpts", + Source: "devpts", + Options: []string{"nosuid", "noexec", "newinstance", "ptmxmode=0666", "mode=0620", "gid=5"}, + }, + { + Destination: "/dev/shm", + Type: "tmpfs", + Source: "shm", + Options: []string{"nosuid", "noexec", "nodev", "mode=1777", "size=65536k"}, + }, + { + Destination: "/dev/mqueue", + Type: "mqueue", + Source: "mqueue", + Options: []string{"nosuid", "noexec", "nodev"}, + }, + { + Destination: "/sys", + Type: "sysfs", + Source: "sysfs", + Options: []string{"nosuid", "noexec", "nodev", "ro"}, + }, + { + Destination: "/run", + Type: "tmpfs", + Source: "tmpfs", + Options: []string{"nosuid", "strictatime", "mode=755", "size=65536k"}, + }, + { + Destination: "/etc/resolv.conf", + Type: "bind", + Source: "/etc/resolv.conf", + Options: []string{"rbind", "ro"}, + }, + { + Destination: "/etc/hosts", + Type: "bind", + Source: "/etc/hosts", + Options: []string{"rbind", "ro"}, + }, + { + Destination: "/etc/localtime", + Type: "bind", + Source: "/etc/localtime", + Options: []string{"rbind", "ro"}, + }, + }, + Linux: &specs.Linux{ + Resources: &specs.LinuxResources{ + Devices: []specs.LinuxDeviceCgroup{ + { + Allow: false, + Access: rwm, + }, + }, + }, + Namespaces: defaultNamespaces(), + }, + } + return s, nil +} + +func WithTTY(s *specs.Spec) error { + s.Process.Terminal = true + s.Process.Env = append(s.Process.Env, "TERM=xterm") + return nil +} + +func WithHostNamespace(ns specs.LinuxNamespaceType) SpecOpts { + return func(s *specs.Spec) error { + for i, n := range s.Linux.Namespaces { + if n.Type == ns { + s.Linux.Namespaces = append(s.Linux.Namespaces[:i], s.Linux.Namespaces[i+1:]...) + return nil + } + } + return nil + } +} + +func WithImageConfig(ctx context.Context, i Image) SpecOpts { + return func(s *specs.Spec) error { + var ( + image = i.(*image) + store = image.client.ContentStore() + ) + ic, err := image.i.Config(ctx, store) + if err != nil { + return err + } + var ( + ociimage v1.Image + config v1.ImageConfig + ) + switch ic.MediaType { + case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: + r, err := store.Reader(ctx, ic.Digest) + if err != nil { + return err + } + if err := json.NewDecoder(r).Decode(&ociimage); err != nil { + r.Close() + return err + } + r.Close() + config = ociimage.Config + default: + return fmt.Errorf("unknown image config media type %s", ic.MediaType) + } + env := []string{ + "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", + } + s.Process.Env = append(env, config.Env...) + var ( + uid, gid uint32 + ) + cmd := config.Cmd + s.Process.Args = append(config.Entrypoint, cmd...) + if config.User != "" { + parts := strings.Split(config.User, ":") + switch len(parts) { + case 1: + v, err := strconv.ParseUint(parts[0], 0, 10) + if err != nil { + return err + } + uid, gid = uint32(v), uint32(v) + case 2: + v, err := strconv.ParseUint(parts[0], 0, 10) + if err != nil { + return err + } + uid = uint32(v) + if v, err = strconv.ParseUint(parts[1], 0, 10); err != nil { + return err + } + gid = uint32(v) + default: + return fmt.Errorf("invalid USER value %s", config.User) + } + } + s.Process.User.UID, s.Process.User.GID = uid, gid + cwd := config.WorkingDir + if cwd == "" { + cwd = "/" + } + s.Process.Cwd = cwd + return nil + } +} diff --git a/spec_unix_test.go b/spec_unix_test.go new file mode 100644 index 000000000..e98f9ae8b --- /dev/null +++ b/spec_unix_test.go @@ -0,0 +1,56 @@ +package containerd + +import "testing" + +func TestGenerateSpec(t *testing.T) { + s, err := GenerateSpec() + if err != nil { + t.Fatal(err) + } + if s == nil { + t.Fatal("GenerateSpec() returns a nil spec") + } + + // check for matching caps + defaults := defaltCaps() + for _, cl := range [][]string{ + s.Process.Capabilities.Ambient, + s.Process.Capabilities.Bounding, + s.Process.Capabilities.Permitted, + s.Process.Capabilities.Inheritable, + s.Process.Capabilities.Effective, + } { + for i := 0; i < len(defaults); i++ { + if cl[i] != defaults[i] { + t.Errorf("cap at %d does not match set %q != %q", i, defaults[i], cl[i]) + } + } + } + + // check default namespaces + defaultNS := defaultNamespaces() + for i, ns := range s.Linux.Namespaces { + if defaultNS[i] != ns { + t.Errorf("ns at %d does not match set %q != %q", i, defaultNS[i], ns) + } + } + + // test that we don't have tty set + if s.Process.Terminal { + t.Error("terminal set on default process") + } +} + +func TestSpecWithTTY(t *testing.T) { + s, err := GenerateSpec(WithTTY) + if err != nil { + t.Fatal(err) + } + if !s.Process.Terminal { + t.Error("terminal net set WithTTY()") + } + v := s.Process.Env[len(s.Process.Env)-1] + if v != "TERM=xterm" { + t.Errorf("xterm not set in env for TTY") + } +} diff --git a/spec_windows.go b/spec_windows.go new file mode 100644 index 000000000..a5110cf8e --- /dev/null +++ b/spec_windows.go @@ -0,0 +1,79 @@ +package containerd + +import ( + "context" + "encoding/json" + "fmt" + "runtime" + + "github.com/containerd/containerd/images" + "github.com/opencontainers/image-spec/specs-go/v1" + specs "github.com/opencontainers/runtime-spec/specs-go" +) + +const pipeRoot = `\\.\pipe` + +func createDefaultSpec() (*specs.Spec, error) { + return &specs.Spec{ + Version: specs.Version, + Platform: specs.Platform{ + OS: runtime.GOOS, + Arch: runtime.GOARCH, + }, + Root: specs.Root{}, + Process: specs.Process{ + Env: config.Env, + ConsoleSize: specs.Box{ + Width: 80, + Height: 20, + }, + }, + }, nil +} + +func WithImageConfig(ctx context.Context, i Image) SpecOpts { + return func(s *specs.Spec) error { + var ( + image = i.(*image) + store = image.client.ContentStore() + ) + ic, err := image.i.Config(ctx, store) + if err != nil { + return err + } + var ( + ociimage v1.Image + config v1.ImageConfig + ) + switch ic.MediaType { + case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: + r, err := store.Reader(ctx, ic.Digest) + if err != nil { + return err + } + if err := json.NewDecoder(r).Decode(&ociimage); err != nil { + r.Close() + return err + } + r.Close() + config = ociimage.Config + default: + return fmt.Errorf("unknown image config media type %s", ic.MediaType) + } + s.Process.Env = config.Env + s.Process.Args = append(config.Entrypoint, config.Cmd...) + s.Process.User = specs.User{ + Username: config.User, + } + return nil + } +} + +func WithTTY(width, height int) SpecOpts { + func(s *specs.Spec) error { + s.Process.Terminal = true + s.Process.ConsoleSize.Width = width + s.Process.ConsoleSize.Height = height + return nil + } +} diff --git a/task.go b/task.go new file mode 100644 index 000000000..9b28281bd --- /dev/null +++ b/task.go @@ -0,0 +1,273 @@ +package containerd + +import ( + "context" + "io" + "io/ioutil" + "os" + "path/filepath" + "sync" + "syscall" + + "github.com/containerd/containerd/api/services/execution" + taskapi "github.com/containerd/containerd/api/types/task" + "github.com/containerd/fifo" +) + +const UnknownExitStatus = 255 + +type IO struct { + Terminal bool + Stdin string + Stdout string + Stderr string + + closer io.Closer +} + +func (i *IO) Close() error { + if i.closer == nil { + return nil + } + return i.closer.Close() +} + +type IOCreation func() (*IO, error) + +// Stdio returns an IO implementation to be used for a task +// that outputs the container's IO as the current processes Stdio +func Stdio() (*IO, error) { + paths, err := fifoPaths() + if err != nil { + return nil, err + } + set := &ioSet{ + in: os.Stdin, + out: os.Stdout, + err: os.Stderr, + } + closer, err := copyIO(paths, set, false) + if err != nil { + return nil, err + } + return &IO{ + Terminal: false, + Stdin: paths.in, + Stdout: paths.out, + Stderr: paths.err, + closer: closer, + }, nil +} + +func fifoPaths() (*fifoSet, error) { + root := filepath.Join(os.TempDir(), "containerd") + if err := os.MkdirAll(root, 0700); err != nil { + return nil, err + } + dir, err := ioutil.TempDir(root, "") + if err != nil { + return nil, err + } + return &fifoSet{ + dir: dir, + in: filepath.Join(dir, "stdin"), + out: filepath.Join(dir, "stdout"), + err: filepath.Join(dir, "stderr"), + }, nil +} + +type fifoSet struct { + // dir is the directory holding the task fifos + dir string + in, out, err string +} + +type ioSet struct { + in io.Reader + out, err io.Writer +} + +func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error) { + var ( + ctx = context.Background() + wg = &sync.WaitGroup{} + ) + + f, err := fifo.OpenFifo(ctx, fifos.in, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + if err != nil { + return nil, err + } + defer func(c io.Closer) { + if err != nil { + c.Close() + } + }(f) + go func(w io.WriteCloser) { + io.Copy(w, ioset.in) + w.Close() + }(f) + + f, err = fifo.OpenFifo(ctx, fifos.out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + if err != nil { + return nil, err + } + defer func(c io.Closer) { + if err != nil { + c.Close() + } + }(f) + wg.Add(1) + go func(r io.ReadCloser) { + io.Copy(ioset.out, r) + r.Close() + wg.Done() + }(f) + + f, err = fifo.OpenFifo(ctx, fifos.err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + if err != nil { + return nil, err + } + defer func(c io.Closer) { + if err != nil { + c.Close() + } + }(f) + + if !tty { + wg.Add(1) + go func(r io.ReadCloser) { + io.Copy(ioset.err, r) + r.Close() + wg.Done() + }(f) + } + return &wgCloser{ + wg: wg, + dir: fifos.dir, + }, nil +} + +type wgCloser struct { + wg *sync.WaitGroup + dir string +} + +func (g *wgCloser) Close() error { + g.wg.Wait() + if g.dir != "" { + return os.RemoveAll(g.dir) + } + return nil +} + +type TaskStatus string + +const ( + Running TaskStatus = "running" + Created TaskStatus = "created" + Stopped TaskStatus = "stopped" + Paused TaskStatus = "paused" + Pausing TaskStatus = "pausing" +) + +type Task interface { + Delete(context.Context) (uint32, error) + Kill(context.Context, syscall.Signal) error + Pause(context.Context) error + Resume(context.Context) error + Pid() uint32 + Start(context.Context) error + Status(context.Context) (TaskStatus, error) + Wait(context.Context) (uint32, error) +} + +var _ = (Task)(&task{}) + +type task struct { + client *Client + + io *IO + containerID string + pid uint32 +} + +// Pid returns the pid or process id for the task +func (t *task) Pid() uint32 { + return t.pid +} + +func (t *task) Start(ctx context.Context) error { + _, err := t.client.TaskService().Start(ctx, &execution.StartRequest{ + ContainerID: t.containerID, + }) + return err +} + +func (t *task) Kill(ctx context.Context, s syscall.Signal) error { + _, err := t.client.TaskService().Kill(ctx, &execution.KillRequest{ + Signal: uint32(s), + ContainerID: t.containerID, + PidOrAll: &execution.KillRequest_All{ + All: true, + }, + }) + return err +} + +func (t *task) Pause(ctx context.Context) error { + _, err := t.client.TaskService().Pause(ctx, &execution.PauseRequest{ + ContainerID: t.containerID, + }) + return err +} + +func (t *task) Resume(ctx context.Context) error { + _, err := t.client.TaskService().Resume(ctx, &execution.ResumeRequest{ + ContainerID: t.containerID, + }) + return err +} + +func (t *task) Status(ctx context.Context) (TaskStatus, error) { + r, err := t.client.TaskService().Info(ctx, &execution.InfoRequest{ + ContainerID: t.containerID, + }) + if err != nil { + return "", err + } + return TaskStatus(r.Task.Status.String()), nil +} + +// Wait is a blocking call that will wait for the task to exit and return the exit status +func (t *task) Wait(ctx context.Context) (uint32, error) { + events, err := t.client.TaskService().Events(ctx, &execution.EventsRequest{}) + if err != nil { + return UnknownExitStatus, err + } + for { + e, err := events.Recv() + if err != nil { + return UnknownExitStatus, err + } + if e.Type != taskapi.Event_EXIT { + continue + } + if e.ID == t.containerID && e.Pid == t.pid { + return e.ExitStatus, nil + } + } +} + +// Delete deletes the task and its runtime state +// it returns the exit status of the task and any errors that were encountered +// during cleanup +func (t *task) Delete(ctx context.Context) (uint32, error) { + cerr := t.io.Close() + r, err := t.client.TaskService().Delete(ctx, &execution.DeleteRequest{ + ContainerID: t.containerID, + }) + if err != nil { + return UnknownExitStatus, err + } + return r.ExitStatus, cerr +}