From d0e5732f0bae741a68c9daec01cc5f77da42ded2 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 23 May 2017 15:50:06 -0700 Subject: [PATCH 01/14] Add initial containerd *Client Signed-off-by: Michael Crosby --- .gitignore | 1 + client.go | 61 +++++++++++++ client_test.go | 18 ++++ client_unix.go | 17 ++++ client_windows.go | 16 ++++ container.go | 21 +++++ container_test.go | 23 +++++ spec.go | 44 +++++++++ spec_unix.go | 225 ++++++++++++++++++++++++++++++++++++++++++++++ spec_unix_test.go | 56 ++++++++++++ 10 files changed, 482 insertions(+) create mode 100644 client.go create mode 100644 client_test.go create mode 100644 client_unix.go create mode 100644 client_windows.go create mode 100644 container.go create mode 100644 container_test.go create mode 100644 spec.go create mode 100644 spec_unix.go create mode 100644 spec_unix_test.go diff --git a/.gitignore b/.gitignore index 45bd181cb..d3397d696 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /bin/ **/coverage.txt **/coverage.out +containerd.test diff --git a/client.go b/client.go new file mode 100644 index 000000000..5f049af60 --- /dev/null +++ b/client.go @@ -0,0 +1,61 @@ +package containerd + +import ( + "context" + "io/ioutil" + "log" + "time" + + "github.com/containerd/containerd/api/services/containers" + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" +) + +// New returns a new containerd client that is connected to the containerd +// instance provided by address +func New(address string) (*Client, error) { + // reset the grpc logger so that it does not output in the STDIO of the calling process + grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags)) + + opts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithTimeout(100 * time.Second), + grpc.WithDialer(dialer), + } + conn, err := grpc.Dial(dialAddress(address), opts...) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial %q", address) + } + return &Client{ + conn: conn, + }, nil +} + +// Client is the client to interact with containerd and its various services +// using a uniform interface +type Client struct { + conn *grpc.ClientConn +} + +// Containers returns all containers created in containerd +func (c *Client) Containers(ctx context.Context) ([]*Container, error) { + r, err := c.containers().List(ctx, &containers.ListContainersRequest{}) + if err != nil { + return nil, err + } + var out []*Container + for _, container := range r.Containers { + out = append(out, containerFromProto(c, container)) + } + return out, nil +} + +// Close closes the clients connection to containerd +func (c *Client) Close() error { + return c.conn.Close() +} + +func (c *Client) containers() containers.ContainersClient { + return containers.NewContainersClient(c.conn) +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 000000000..12e29df59 --- /dev/null +++ b/client_test.go @@ -0,0 +1,18 @@ +package containerd + +import "testing" + +const defaultAddress = "/run/containerd/containerd.sock" + +func TestNewClient(t *testing.T) { + client, err := New(defaultAddress) + if err != nil { + t.Fatal(err) + } + if client == nil { + t.Fatal("New() returned nil client") + } + if err := client.Close(); err != nil { + t.Errorf("client closed returned errror %v", err) + } +} diff --git a/client_unix.go b/client_unix.go new file mode 100644 index 000000000..b2ba75270 --- /dev/null +++ b/client_unix.go @@ -0,0 +1,17 @@ +package containerd + +import ( + "fmt" + "net" + "strings" + "time" +) + +func dialer(address string, timeout time.Duration) (net.Conn, error) { + address = strings.TrimPrefix(address, "unix://") + return net.DialTimeout("unix", address, timeout) +} + +func dialAddress(address string) string { + return fmt.Sprintf("unix://%s", address) +} diff --git a/client_windows.go b/client_windows.go new file mode 100644 index 000000000..7de5677a0 --- /dev/null +++ b/client_windows.go @@ -0,0 +1,16 @@ +package containerd + +import ( + "net" + "time" + + winio "github.com/Microsoft/go-winio" +) + +func dialer(address string, timeout time.Duration) (net.Conn, error) { + return winio.DialPipe(bindAddress, &timeout) +} + +func dialAddress(address string) string { + return address +} diff --git a/container.go b/container.go new file mode 100644 index 000000000..d47b0615e --- /dev/null +++ b/container.go @@ -0,0 +1,21 @@ +package containerd + +import "github.com/containerd/containerd/api/services/containers" + +func containerFromProto(client *Client, c containers.Container) *Container { + return &Container{ + client: client, + id: c.ID, + } +} + +type Container struct { + client *Client + + id string +} + +// ID returns the container's unique id +func (c *Container) ID() string { + return c.id +} diff --git a/container_test.go b/container_test.go new file mode 100644 index 000000000..da1a9818b --- /dev/null +++ b/container_test.go @@ -0,0 +1,23 @@ +package containerd + +import ( + "context" + "testing" +) + +func TestContainerList(t *testing.T) { + client, err := New(defaultAddress) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + containers, err := client.Containers(context.Background()) + if err != nil { + t.Errorf("container list returned error %v", err) + return + } + if len(containers) != 0 { + t.Errorf("expected 0 containers but received %d", len(containers)) + } +} diff --git a/spec.go b/spec.go new file mode 100644 index 000000000..5913d29b6 --- /dev/null +++ b/spec.go @@ -0,0 +1,44 @@ +package containerd + +import specs "github.com/opencontainers/runtime-spec/specs-go" + +type SpecOpts func(s *specs.Spec) error + +func WithImageRef(ref string) SpecOpts { + return func(s *specs.Spec) error { + if s.Annotations == nil { + s.Annotations = make(map[string]string) + } + s.Annotations["image"] = ref + return nil + } +} + +func WithHostname(id string) SpecOpts { + return func(s *specs.Spec) error { + s.Hostname = id + return nil + } +} + +func WithArgs(args ...string) SpecOpts { + return func(s *specs.Spec) error { + s.Process.Args = args + return nil + } +} + +// GenerateSpec will generate a default spec from the provided image +// for use as a containerd container +func GenerateSpec(opts ...SpecOpts) (*specs.Spec, error) { + s, err := createDefaultSpec() + if err != nil { + return nil, err + } + for _, o := range opts { + if err := o(s); err != nil { + return nil, err + } + } + return s, nil +} diff --git a/spec_unix.go b/spec_unix.go new file mode 100644 index 000000000..b994125bf --- /dev/null +++ b/spec_unix.go @@ -0,0 +1,225 @@ +package containerd + +import ( + "fmt" + "runtime" + "strconv" + "strings" + + "github.com/opencontainers/image-spec/specs-go/v1" + specs "github.com/opencontainers/runtime-spec/specs-go" +) + +const ( + rwm = "rwm" + defaultRootfsPath = "rootfs" +) + +func defaltCaps() []string { + return []string{ + "CAP_CHOWN", + "CAP_DAC_OVERRIDE", + "CAP_FSETID", + "CAP_FOWNER", + "CAP_MKNOD", + "CAP_NET_RAW", + "CAP_SETGID", + "CAP_SETUID", + "CAP_SETFCAP", + "CAP_SETPCAP", + "CAP_NET_BIND_SERVICE", + "CAP_SYS_CHROOT", + "CAP_KILL", + "CAP_AUDIT_WRITE", + } +} + +func defaultNamespaces() []specs.LinuxNamespace { + return []specs.LinuxNamespace{ + { + Type: specs.PIDNamespace, + }, + { + Type: specs.IPCNamespace, + }, + { + Type: specs.UTSNamespace, + }, + { + Type: specs.MountNamespace, + }, + { + Type: specs.NetworkNamespace, + }, + } +} + +func createDefaultSpec() (*specs.Spec, error) { + s := &specs.Spec{ + Version: specs.Version, + Platform: specs.Platform{ + OS: runtime.GOOS, + Arch: runtime.GOARCH, + }, + Root: specs.Root{ + Path: defaultRootfsPath, + }, + Process: specs.Process{ + Cwd: "/", + NoNewPrivileges: true, + User: specs.User{ + UID: 0, + GID: 0, + }, + Capabilities: &specs.LinuxCapabilities{ + Bounding: defaltCaps(), + Permitted: defaltCaps(), + Inheritable: defaltCaps(), + Effective: defaltCaps(), + Ambient: defaltCaps(), + }, + Rlimits: []specs.LinuxRlimit{ + { + Type: "RLIMIT_NOFILE", + Hard: uint64(1024), + Soft: uint64(1024), + }, + }, + }, + Mounts: []specs.Mount{ + { + Destination: "/proc", + Type: "proc", + Source: "proc", + }, + { + Destination: "/dev", + Type: "tmpfs", + Source: "tmpfs", + Options: []string{"nosuid", "strictatime", "mode=755", "size=65536k"}, + }, + { + Destination: "/dev/pts", + Type: "devpts", + Source: "devpts", + Options: []string{"nosuid", "noexec", "newinstance", "ptmxmode=0666", "mode=0620", "gid=5"}, + }, + { + Destination: "/dev/shm", + Type: "tmpfs", + Source: "shm", + Options: []string{"nosuid", "noexec", "nodev", "mode=1777", "size=65536k"}, + }, + { + Destination: "/dev/mqueue", + Type: "mqueue", + Source: "mqueue", + Options: []string{"nosuid", "noexec", "nodev"}, + }, + { + Destination: "/sys", + Type: "sysfs", + Source: "sysfs", + Options: []string{"nosuid", "noexec", "nodev", "ro"}, + }, + { + Destination: "/run", + Type: "tmpfs", + Source: "tmpfs", + Options: []string{"nosuid", "strictatime", "mode=755", "size=65536k"}, + }, + { + Destination: "/etc/resolv.conf", + Type: "bind", + Source: "/etc/resolv.conf", + Options: []string{"rbind", "ro"}, + }, + { + Destination: "/etc/hosts", + Type: "bind", + Source: "/etc/hosts", + Options: []string{"rbind", "ro"}, + }, + { + Destination: "/etc/localtime", + Type: "bind", + Source: "/etc/localtime", + Options: []string{"rbind", "ro"}, + }, + }, + Linux: &specs.Linux{ + Resources: &specs.LinuxResources{ + Devices: []specs.LinuxDeviceCgroup{ + { + Allow: false, + Access: rwm, + }, + }, + }, + Namespaces: defaultNamespaces(), + }, + } + return s, nil +} + +func WithTTY(s *specs.Spec) error { + s.Process.Terminal = true + s.Process.Env = append(s.Process.Env, "TERM=xterm") + return nil +} + +func WithHostNamespace(ns specs.LinuxNamespaceType) SpecOpts { + return func(s *specs.Spec) error { + for i, n := range s.Linux.Namespaces { + if n.Type == ns { + s.Linux.Namespaces = append(s.Linux.Namespaces[:i], s.Linux.Namespaces[i+1:]...) + return nil + } + } + return nil + } +} + +func WithImage(config *v1.ImageConfig) SpecOpts { + return func(s *specs.Spec) error { + env := []string{ + "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", + } + s.Process.Env = append(env, config.Env...) + cmd := config.Cmd + var ( + uid, gid uint32 + ) + s.Process.Args = append(config.Entrypoint, cmd...) + if config.User != "" { + parts := strings.Split(config.User, ":") + switch len(parts) { + case 1: + v, err := strconv.ParseUint(parts[0], 0, 10) + if err != nil { + return err + } + uid, gid = uint32(v), uint32(v) + case 2: + v, err := strconv.ParseUint(parts[0], 0, 10) + if err != nil { + return err + } + uid = uint32(v) + if v, err = strconv.ParseUint(parts[1], 0, 10); err != nil { + return err + } + gid = uint32(v) + default: + return fmt.Errorf("invalid USER value %s", config.User) + } + } + s.Process.User.UID, s.Process.User.GID = uid, gid + cwd := config.WorkingDir + if cwd == "" { + cwd = "/" + } + s.Process.Cwd = cwd + return nil + } +} diff --git a/spec_unix_test.go b/spec_unix_test.go new file mode 100644 index 000000000..e98f9ae8b --- /dev/null +++ b/spec_unix_test.go @@ -0,0 +1,56 @@ +package containerd + +import "testing" + +func TestGenerateSpec(t *testing.T) { + s, err := GenerateSpec() + if err != nil { + t.Fatal(err) + } + if s == nil { + t.Fatal("GenerateSpec() returns a nil spec") + } + + // check for matching caps + defaults := defaltCaps() + for _, cl := range [][]string{ + s.Process.Capabilities.Ambient, + s.Process.Capabilities.Bounding, + s.Process.Capabilities.Permitted, + s.Process.Capabilities.Inheritable, + s.Process.Capabilities.Effective, + } { + for i := 0; i < len(defaults); i++ { + if cl[i] != defaults[i] { + t.Errorf("cap at %d does not match set %q != %q", i, defaults[i], cl[i]) + } + } + } + + // check default namespaces + defaultNS := defaultNamespaces() + for i, ns := range s.Linux.Namespaces { + if defaultNS[i] != ns { + t.Errorf("ns at %d does not match set %q != %q", i, defaultNS[i], ns) + } + } + + // test that we don't have tty set + if s.Process.Terminal { + t.Error("terminal set on default process") + } +} + +func TestSpecWithTTY(t *testing.T) { + s, err := GenerateSpec(WithTTY) + if err != nil { + t.Fatal(err) + } + if !s.Process.Terminal { + t.Error("terminal net set WithTTY()") + } + v := s.Process.Env[len(s.Process.Env)-1] + if v != "TERM=xterm" { + t.Errorf("xterm not set in env for TTY") + } +} From 31a3bda41b4ceb38671ea6ed6a217358330b95a1 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 24 May 2017 10:42:37 -0700 Subject: [PATCH 02/14] Add NewContainer and Delete container support Signed-off-by: Michael Crosby --- client.go | 102 +++++++++++++++++++++++++++++++++++++++++++++++++ client_test.go | 40 ++++++++++++++++++- container.go | 34 +++++++++++++++-- 3 files changed, 171 insertions(+), 5 deletions(-) diff --git a/client.go b/client.go index 5f049af60..b52d25e7c 100644 --- a/client.go +++ b/client.go @@ -2,11 +2,23 @@ package containerd import ( "context" + "encoding/json" "io/ioutil" "log" "time" "github.com/containerd/containerd/api/services/containers" + contentapi "github.com/containerd/containerd/api/services/content" + snapshotapi "github.com/containerd/containerd/api/services/snapshot" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + contentservice "github.com/containerd/containerd/services/content" + snapshotservice "github.com/containerd/containerd/services/snapshot" + "github.com/containerd/containerd/snapshot" + protobuf "github.com/gogo/protobuf/types" + "github.com/opencontainers/image-spec/identity" + "github.com/opencontainers/image-spec/specs-go/v1" + specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" @@ -51,6 +63,88 @@ func (c *Client) Containers(ctx context.Context) ([]*Container, error) { return out, nil } +type NewContainerOpts func(ctx context.Context, client *Client, c *containers.Container) error + +// NewContainerWithLables adds the provided labels to the container +func NewContainerWithLables(labels map[string]string) NewContainerOpts { + return func(_ context.Context, _ *Client, c *containers.Container) error { + c.Labels = labels + return nil + } +} + +// NewContainerWithExistingRootFS uses an existing root filesystem for the container +func NewContainerWithExistingRootFS(id string) NewContainerOpts { + return func(ctx context.Context, client *Client, c *containers.Container) error { + // check that the snapshot exists, if not, fail on creation + if _, err := client.snapshotter().Mounts(ctx, id); err != nil { + return err + } + c.RootFS = id + return nil + } +} + +// NewContainerWithNewRootFS allocates a new snapshot to be used by the container as the +// root filesystem in read-write mode +func NewContainerWithNewRootFS(id string, image v1.Descriptor) NewContainerOpts { + return func(ctx context.Context, client *Client, c *containers.Container) error { + diffIDs, err := images.RootFS(ctx, client.content(), image) + if err != nil { + return err + } + if _, err := client.snapshotter().Prepare(ctx, id, identity.ChainID(diffIDs).String()); err != nil { + return err + } + c.RootFS = id + return nil + } +} + +// NewContainerWithNewReadonlyRootFS allocates a new snapshot to be used by the container as the +// root filesystem in read-only mode +func NewContainerWithNewReadonlyRootFS(id string, image v1.Descriptor) NewContainerOpts { + return func(ctx context.Context, client *Client, c *containers.Container) error { + diffIDs, err := images.RootFS(ctx, client.content(), image) + if err != nil { + return err + } + if _, err := client.snapshotter().View(ctx, id, identity.ChainID(diffIDs).String()); err != nil { + return err + } + c.RootFS = id + return nil + } +} + +// NewContainer will create a new container in container with the provided id +// the id must be unique within the namespace +func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, opts ...NewContainerOpts) (*Container, error) { + data, err := json.Marshal(spec) + if err != nil { + return nil, err + } + container := containers.Container{ + ID: id, + Spec: &protobuf.Any{ + TypeUrl: specs.Version, + Value: data, + }, + } + for _, o := range opts { + if err := o(ctx, c, &container); err != nil { + return nil, err + } + } + r, err := c.containers().Create(ctx, &containers.CreateContainerRequest{ + Container: container, + }) + if err != nil { + return nil, err + } + return containerFromProto(c, r.Container), nil +} + // Close closes the clients connection to containerd func (c *Client) Close() error { return c.conn.Close() @@ -59,3 +153,11 @@ func (c *Client) Close() error { func (c *Client) containers() containers.ContainersClient { return containers.NewContainersClient(c.conn) } + +func (c *Client) content() content.Store { + return contentservice.NewStoreFromClient(contentapi.NewContentClient(c.conn)) +} + +func (c *Client) snapshotter() snapshot.Snapshotter { + return snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotClient(c.conn)) +} diff --git a/client_test.go b/client_test.go index 12e29df59..054c02a64 100644 --- a/client_test.go +++ b/client_test.go @@ -1,6 +1,9 @@ package containerd -import "testing" +import ( + "context" + "testing" +) const defaultAddress = "/run/containerd/containerd.sock" @@ -16,3 +19,38 @@ func TestNewClient(t *testing.T) { t.Errorf("client closed returned errror %v", err) } } + +func TestNewContainer(t *testing.T) { + client, err := New(defaultAddress) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + id := "test" + spec, err := GenerateSpec(WithHostname(id)) + if err != nil { + t.Error(err) + return + } + container, err := client.NewContainer(context.Background(), id, spec) + if err != nil { + t.Error(err) + return + } + if container.ID() != id { + t.Errorf("expected container id %q but received %q", id, container.ID()) + } + if spec, err = container.Spec(); err != nil { + t.Error(err) + return + } + if spec.Hostname != id { + t.Errorf("expected spec hostname id %q but received %q", id, container.ID()) + return + } + if err := container.Delete(context.Background()); err != nil { + t.Error(err) + return + } +} diff --git a/container.go b/container.go index d47b0615e..861a71496 100644 --- a/container.go +++ b/container.go @@ -1,21 +1,47 @@ package containerd -import "github.com/containerd/containerd/api/services/containers" +import ( + "context" + "encoding/json" + + "github.com/containerd/containerd/api/services/containers" + specs "github.com/opencontainers/runtime-spec/specs-go" +) func containerFromProto(client *Client, c containers.Container) *Container { return &Container{ client: client, - id: c.ID, + c: c, } } type Container struct { client *Client - id string + c containers.Container } // ID returns the container's unique id func (c *Container) ID() string { - return c.id + return c.c.ID +} + +// Spec returns the current OCI specification for the container +func (c *Container) Spec() (*specs.Spec, error) { + var s specs.Spec + if err := json.Unmarshal(c.c.Spec.Value, &s); err != nil { + return nil, err + } + return &s, nil +} + +// Delete deletes an existing container +// an error is returned if the container has running tasks +func (c *Container) Delete(ctx context.Context) error { + // TODO: should the client be the one removing resources attached + // to the container at the moment before we have GC? + _, err := c.client.containers().Delete(ctx, &containers.DeleteContainerRequest{ + ID: c.c.ID, + }) + return err } From 3ba06e48ed68629bf41bb844113b1e442a540480 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 24 May 2017 11:31:58 -0700 Subject: [PATCH 03/14] Add NewTask support to client Signed-off-by: Michael Crosby --- client.go | 5 ++ container.go | 45 ++++++++++++- task.go | 186 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+), 1 deletion(-) create mode 100644 task.go diff --git a/client.go b/client.go index b52d25e7c..4082c221a 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "github.com/containerd/containerd/api/services/containers" contentapi "github.com/containerd/containerd/api/services/content" + "github.com/containerd/containerd/api/services/execution" snapshotapi "github.com/containerd/containerd/api/services/snapshot" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" @@ -161,3 +162,7 @@ func (c *Client) content() content.Store { func (c *Client) snapshotter() snapshot.Snapshotter { return snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotClient(c.conn)) } + +func (c *Client) tasks() execution.TasksClient { + return execution.NewTasksClient(c.conn) +} diff --git a/container.go b/container.go index 861a71496..62599e577 100644 --- a/container.go +++ b/container.go @@ -5,6 +5,8 @@ import ( "encoding/json" "github.com/containerd/containerd/api/services/containers" + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/mount" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -18,7 +20,8 @@ func containerFromProto(client *Client, c containers.Container) *Container { type Container struct { client *Client - c containers.Container + c containers.Container + task *Task } // ID returns the container's unique id @@ -45,3 +48,43 @@ func (c *Container) Delete(ctx context.Context) error { }) return err } + +func (c *Container) Task() *Task { + return c.task +} + +func (c *Container) NewTask(ctx context.Context, ioCreate IOCreation) (*Task, error) { + i, err := ioCreate() + if err != nil { + return nil, err + } + request := &execution.CreateRequest{ + ContainerID: c.c.ID, + Terminal: i.Terminal, + Stdin: i.Stdin, + Stdout: i.Stdout, + Stderr: i.Stderr, + } + // get the rootfs from the snapshotter and add it to the request + mounts, err := c.client.snapshotter().Mounts(ctx, c.c.RootFS) + if err != nil { + return nil, err + } + for _, m := range mounts { + request.Rootfs = append(request.Rootfs, &mount.Mount{ + Type: m.Type, + Source: m.Source, + Options: m.Options, + }) + } + response, err := c.client.tasks().Create(ctx, request) + if err != nil { + return nil, err + } + return &Task{ + client: c.client, + io: i, + containerID: response.ContainerID, + pid: response.Pid, + }, nil +} diff --git a/task.go b/task.go new file mode 100644 index 000000000..def67fd7e --- /dev/null +++ b/task.go @@ -0,0 +1,186 @@ +package containerd + +import ( + "context" + "io" + "io/ioutil" + "os" + "path/filepath" + "sync" + "syscall" + + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/fifo" +) + +type IO struct { + Terminal bool + Stdin string + Stdout string + Stderr string + + closer io.Closer +} + +func (i *IO) Close() error { + if i.closer == nil { + return nil + } + return i.closer.Close() +} + +type IOCreation func() (*IO, error) + +// STDIO returns an IO implementation to be used for a task +// that outputs the container's IO as the current processes STDIO +func STDIO() (*IO, error) { + paths, err := fifoPaths() + if err != nil { + return nil, err + } + set := &ioSet{ + in: os.Stdin, + out: os.Stdout, + err: os.Stderr, + } + closer, err := copyIO(paths, set, false) + if err != nil { + return nil, err + } + return &IO{ + Terminal: false, + Stdin: paths.in, + Stdout: paths.out, + Stderr: paths.err, + closer: closer, + }, nil +} + +func fifoPaths() (*fifoSet, error) { + dir, err := ioutil.TempDir(filepath.Join(os.TempDir(), "containerd"), "") + if err != nil { + return nil, err + } + return &fifoSet{ + dir: dir, + in: filepath.Join(dir, "stdin"), + out: filepath.Join(dir, "stdout"), + err: filepath.Join(dir, "stderr"), + }, nil +} + +type fifoSet struct { + // dir is the directory holding the task fifos + dir string + in, out, err string +} + +type ioSet struct { + in io.Reader + out, err io.Writer +} + +func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error) { + var ( + ctx = context.Background() + wg = &sync.WaitGroup{} + ) + + f, err := fifo.OpenFifo(ctx, fifos.in, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + if err != nil { + return nil, err + } + defer func(c io.Closer) { + if err != nil { + c.Close() + } + }(f) + go func(w io.WriteCloser) { + io.Copy(w, ioset.in) + w.Close() + }(f) + + f, err = fifo.OpenFifo(ctx, fifos.out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + if err != nil { + return nil, err + } + defer func(c io.Closer) { + if err != nil { + c.Close() + } + }(f) + wg.Add(1) + go func(r io.ReadCloser) { + io.Copy(ioset.out, r) + r.Close() + wg.Done() + }(f) + + f, err = fifo.OpenFifo(ctx, fifos.err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + if err != nil { + return nil, err + } + defer func(c io.Closer) { + if err != nil { + c.Close() + } + }(f) + + if !tty { + wg.Add(1) + go func(r io.ReadCloser) { + io.Copy(ioset.err, r) + r.Close() + wg.Done() + }(f) + } + return &wgCloser{ + wg: wg, + }, nil +} + +type wgCloser struct { + wg *sync.WaitGroup +} + +func (g *wgCloser) Close() error { + g.wg.Wait() + return nil +} + +type Task struct { + client *Client + + io *IO + containerID string + pid uint32 +} + +// Pid returns the pid or process id for the task +func (t *Task) Pid() uint32 { + return t.pid +} + +func (t *Task) Kill(ctx context.Context, s os.Signal) error { + _, err := t.client.tasks().Kill(ctx, &execution.KillRequest{ + ContainerID: t.containerID, + PidOrAll: &execution.KillRequest_All{ + All: true, + }, + }) + return err +} + +// Delete deletes the task and its runtime state +// it returns the exit status of the task and any errors that were encountered +// during cleanup +func (t *Task) Delete(ctx context.Context) (uint32, error) { + cerr := t.io.Close() + r, err := t.client.tasks().Delete(ctx, &execution.DeleteRequest{ + t.containerID, + }) + if err != nil { + return 255, err + } + return r.ExitStatus, cerr +} From 923236004abf27a26069dbc1bf9603623cdfdf5c Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 24 May 2017 14:06:50 -0700 Subject: [PATCH 04/14] Add pull support to client Signed-off-by: Michael Crosby --- client.go | 129 +++++++++++++++++++++++++++++++++++++++++++++++++ client_test.go | 15 ++++++ image.go | 7 +++ 3 files changed, 151 insertions(+) create mode 100644 image.go diff --git a/client.go b/client.go index 4082c221a..5771f9e76 100644 --- a/client.go +++ b/client.go @@ -5,15 +5,24 @@ import ( "encoding/json" "io/ioutil" "log" + "net/http" "time" "github.com/containerd/containerd/api/services/containers" contentapi "github.com/containerd/containerd/api/services/content" + diffapi "github.com/containerd/containerd/api/services/diff" "github.com/containerd/containerd/api/services/execution" + imagesapi "github.com/containerd/containerd/api/services/images" snapshotapi "github.com/containerd/containerd/api/services/snapshot" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + "github.com/containerd/containerd/rootfs" contentservice "github.com/containerd/containerd/services/content" + "github.com/containerd/containerd/services/diff" + diffservice "github.com/containerd/containerd/services/diff" + imagesservice "github.com/containerd/containerd/services/images" snapshotservice "github.com/containerd/containerd/services/snapshot" "github.com/containerd/containerd/snapshot" protobuf "github.com/gogo/protobuf/types" @@ -146,6 +155,118 @@ func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, return containerFromProto(c, r.Container), nil } +type PullOpts func(*Client, *PullContext) error + +type PullContext struct { + Resolver remotes.Resolver + Unpacker Unpacker +} + +func defaultPullContext() *PullContext { + return &PullContext{ + Resolver: docker.NewResolver(docker.ResolverOptions{ + Client: http.DefaultClient, + }), + } +} + +func WithPullUnpack(client *Client, c *PullContext) error { + c.Unpacker = &snapshotUnpacker{ + store: client.content(), + diff: client.diff(), + snapshotter: client.snapshotter(), + } + return nil +} + +type Unpacker interface { + Unpack(context.Context, images.Image) error +} + +type snapshotUnpacker struct { + snapshotter snapshot.Snapshotter + store content.Store + diff diff.DiffService +} + +func (s *snapshotUnpacker) Unpack(ctx context.Context, image images.Image) error { + layers, err := s.getLayers(ctx, image) + if err != nil { + return err + } + if _, err := rootfs.ApplyLayers(ctx, layers, s.snapshotter, s.diff); err != nil { + return err + } + return nil +} + +func (s *snapshotUnpacker) getLayers(ctx context.Context, image images.Image) ([]rootfs.Layer, error) { + p, err := content.ReadBlob(ctx, s.store, image.Target.Digest) + if err != nil { + return nil, errors.Wrapf(err, "failed to read manifest blob") + } + var manifest v1.Manifest + if err := json.Unmarshal(p, &manifest); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal manifest") + } + diffIDs, err := image.RootFS(ctx, s.store) + if err != nil { + return nil, errors.Wrap(err, "failed to resolve rootfs") + } + if len(diffIDs) != len(manifest.Layers) { + return nil, errors.Errorf("mismatched image rootfs and manifest layers") + } + layers := make([]rootfs.Layer, len(diffIDs)) + for i := range diffIDs { + layers[i].Diff = v1.Descriptor{ + // TODO: derive media type from compressed type + MediaType: v1.MediaTypeImageLayer, + Digest: diffIDs[i], + } + layers[i].Blob = manifest.Layers[i] + } + return layers, nil +} + +func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (*Image, error) { + pullCtx := defaultPullContext() + for _, o := range opts { + if err := o(c, pullCtx); err != nil { + return nil, err + } + } + store := c.content() + + name, desc, fetcher, err := pullCtx.Resolver.Resolve(ctx, ref) + if err != nil { + return nil, err + } + + handlers := []images.Handler{ + remotes.FetchHandler(store, fetcher), + images.ChildrenHandler(store), + } + if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil { + return nil, err + } + is := c.images() + if err := is.Put(ctx, name, desc); err != nil { + return nil, err + } + i, err := is.Get(ctx, name) + if err != nil { + return nil, err + } + if pullCtx.Unpacker != nil { + if err := pullCtx.Unpacker.Unpack(ctx, i); err != nil { + return nil, err + } + } + return &Image{ + i: i, + }, nil +} + // Close closes the clients connection to containerd func (c *Client) Close() error { return c.conn.Close() @@ -166,3 +287,11 @@ func (c *Client) snapshotter() snapshot.Snapshotter { func (c *Client) tasks() execution.TasksClient { return execution.NewTasksClient(c.conn) } + +func (c *Client) images() images.Store { + return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(c.conn)) +} + +func (c *Client) diff() diff.DiffService { + return diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn)) +} diff --git a/client_test.go b/client_test.go index 054c02a64..f80ec2b7d 100644 --- a/client_test.go +++ b/client_test.go @@ -54,3 +54,18 @@ func TestNewContainer(t *testing.T) { return } } + +func TestImagePull(t *testing.T) { + client, err := New(defaultAddress) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + const ref = "docker.io/library/alpine:latest" + _, err = client.Pull(context.Background(), ref) + if err != nil { + t.Error(err) + return + } +} diff --git a/image.go b/image.go new file mode 100644 index 000000000..2fa94b6d5 --- /dev/null +++ b/image.go @@ -0,0 +1,7 @@ +package containerd + +import "github.com/containerd/containerd/images" + +type Image struct { + i images.Image +} From bf9ad0c57f000e4541d70a07aaabae4d84f8b8ed Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 24 May 2017 15:40:40 -0700 Subject: [PATCH 05/14] Fix spec generation for task execution Signed-off-by: Michael Crosby --- client.go | 57 ++++++++++++++++++++++++++++++++++++++-------------- container.go | 8 ++++++-- image.go | 2 ++ spec.go | 11 ++-------- spec_unix.go | 34 ++++++++++++++++++++++++++++--- task.go | 52 ++++++++++++++++++++++++++++++++++++++++------- 6 files changed, 128 insertions(+), 36 deletions(-) diff --git a/client.go b/client.go index 5771f9e76..acc37e295 100644 --- a/client.go +++ b/client.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "log" "net/http" + "runtime" "time" "github.com/containerd/containerd/api/services/containers" @@ -34,30 +35,43 @@ import ( "google.golang.org/grpc/grpclog" ) -// New returns a new containerd client that is connected to the containerd -// instance provided by address -func New(address string) (*Client, error) { +func init() { // reset the grpc logger so that it does not output in the STDIO of the calling process grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags)) +} - opts := []grpc.DialOption{ +type NewClientOpts func(c *Client) error + +// New returns a new containerd client that is connected to the containerd +// instance provided by address +func New(address string, opts ...NewClientOpts) (*Client, error) { + gopts := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithTimeout(100 * time.Second), grpc.WithDialer(dialer), } - conn, err := grpc.Dial(dialAddress(address), opts...) + conn, err := grpc.Dial(dialAddress(address), gopts...) if err != nil { return nil, errors.Wrapf(err, "failed to dial %q", address) } - return &Client{ - conn: conn, - }, nil + c := &Client{ + conn: conn, + Runtime: runtime.GOOS, + } + for _, o := range opts { + if err := o(c); err != nil { + return nil, err + } + } + return c, nil } // Client is the client to interact with containerd and its various services // using a uniform interface type Client struct { conn *grpc.ClientConn + + Runtime string } // Containers returns all containers created in containerd @@ -97,9 +111,9 @@ func NewContainerWithExistingRootFS(id string) NewContainerOpts { // NewContainerWithNewRootFS allocates a new snapshot to be used by the container as the // root filesystem in read-write mode -func NewContainerWithNewRootFS(id string, image v1.Descriptor) NewContainerOpts { +func NewContainerWithNewRootFS(id string, image *Image) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { - diffIDs, err := images.RootFS(ctx, client.content(), image) + diffIDs, err := image.i.RootFS(ctx, client.content()) if err != nil { return err } @@ -113,9 +127,9 @@ func NewContainerWithNewRootFS(id string, image v1.Descriptor) NewContainerOpts // NewContainerWithNewReadonlyRootFS allocates a new snapshot to be used by the container as the // root filesystem in read-only mode -func NewContainerWithNewReadonlyRootFS(id string, image v1.Descriptor) NewContainerOpts { +func NewContainerWithNewReadonlyRootFS(id string, image *Image) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { - diffIDs, err := images.RootFS(ctx, client.content(), image) + diffIDs, err := image.i.RootFS(ctx, client.content()) if err != nil { return err } @@ -127,6 +141,13 @@ func NewContainerWithNewReadonlyRootFS(id string, image v1.Descriptor) NewContai } } +func NewContainerWithRuntime(name string) NewContainerOpts { + return func(ctx context.Context, client *Client, c *containers.Container) error { + c.Runtime = name + return nil + } +} + // NewContainer will create a new container in container with the provided id // the id must be unique within the namespace func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, opts ...NewContainerOpts) (*Container, error) { @@ -135,7 +156,8 @@ func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, return nil, err } container := containers.Container{ - ID: id, + ID: id, + Runtime: c.Runtime, Spec: &protobuf.Any{ TypeUrl: specs.Version, Value: data, @@ -237,7 +259,11 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (*Image } store := c.content() - name, desc, fetcher, err := pullCtx.Resolver.Resolve(ctx, ref) + name, desc, err := pullCtx.Resolver.Resolve(ctx, ref) + if err != nil { + return nil, err + } + fetcher, err := pullCtx.Resolver.Fetcher(ctx, name) if err != nil { return nil, err } @@ -263,7 +289,8 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (*Image } } return &Image{ - i: i, + client: c, + i: i, }, nil } diff --git a/container.go b/container.go index 62599e577..34c64c9c2 100644 --- a/container.go +++ b/container.go @@ -43,9 +43,13 @@ func (c *Container) Spec() (*specs.Spec, error) { func (c *Container) Delete(ctx context.Context) error { // TODO: should the client be the one removing resources attached // to the container at the moment before we have GC? - _, err := c.client.containers().Delete(ctx, &containers.DeleteContainerRequest{ + err := c.client.snapshotter().Remove(ctx, c.c.RootFS) + + if _, cerr := c.client.containers().Delete(ctx, &containers.DeleteContainerRequest{ ID: c.c.ID, - }) + }); err == nil { + err = cerr + } return err } diff --git a/image.go b/image.go index 2fa94b6d5..83e042ded 100644 --- a/image.go +++ b/image.go @@ -3,5 +3,7 @@ package containerd import "github.com/containerd/containerd/images" type Image struct { + client *Client + i images.Image } diff --git a/spec.go b/spec.go index 5913d29b6..835f0ad72 100644 --- a/spec.go +++ b/spec.go @@ -14,13 +14,6 @@ func WithImageRef(ref string) SpecOpts { } } -func WithHostname(id string) SpecOpts { - return func(s *specs.Spec) error { - s.Hostname = id - return nil - } -} - func WithArgs(args ...string) SpecOpts { return func(s *specs.Spec) error { s.Process.Args = args @@ -30,8 +23,8 @@ func WithArgs(args ...string) SpecOpts { // GenerateSpec will generate a default spec from the provided image // for use as a containerd container -func GenerateSpec(opts ...SpecOpts) (*specs.Spec, error) { - s, err := createDefaultSpec() +func GenerateSpec(id string, opts ...SpecOpts) (*specs.Spec, error) { + s, err := createDefaultSpec(id) if err != nil { return nil, err } diff --git a/spec_unix.go b/spec_unix.go index b994125bf..94fdce1ad 100644 --- a/spec_unix.go +++ b/spec_unix.go @@ -1,11 +1,14 @@ package containerd import ( + "context" + "encoding/json" "fmt" "runtime" "strconv" "strings" + "github.com/containerd/containerd/images" "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -54,7 +57,7 @@ func defaultNamespaces() []specs.LinuxNamespace { } } -func createDefaultSpec() (*specs.Spec, error) { +func createDefaultSpec(id string) (*specs.Spec, error) { s := &specs.Spec{ Version: specs.Version, Platform: specs.Platform{ @@ -64,6 +67,7 @@ func createDefaultSpec() (*specs.Spec, error) { Root: specs.Root{ Path: defaultRootfsPath, }, + Hostname: id, Process: specs.Process{ Cwd: "/", NoNewPrivileges: true, @@ -180,16 +184,40 @@ func WithHostNamespace(ns specs.LinuxNamespaceType) SpecOpts { } } -func WithImage(config *v1.ImageConfig) SpecOpts { +func WithImage(ctx context.Context, image *Image) SpecOpts { return func(s *specs.Spec) error { + store := image.client.content() + ic, err := image.i.Config(ctx, store) + if err != nil { + return err + } + var ( + ociimage v1.Image + config v1.ImageConfig + ) + switch ic.MediaType { + case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: + r, err := store.Reader(ctx, ic.Digest) + if err != nil { + return err + } + if err := json.NewDecoder(r).Decode(&ociimage); err != nil { + r.Close() + return err + } + r.Close() + config = ociimage.Config + default: + return fmt.Errorf("unknown image config media type %s", ic.MediaType) + } env := []string{ "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", } s.Process.Env = append(env, config.Env...) - cmd := config.Cmd var ( uid, gid uint32 ) + cmd := config.Cmd s.Process.Args = append(config.Entrypoint, cmd...) if config.User != "" { parts := strings.Split(config.User, ":") diff --git a/task.go b/task.go index def67fd7e..36f6440ea 100644 --- a/task.go +++ b/task.go @@ -10,6 +10,7 @@ import ( "syscall" "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/task" "github.com/containerd/fifo" ) @@ -31,9 +32,9 @@ func (i *IO) Close() error { type IOCreation func() (*IO, error) -// STDIO returns an IO implementation to be used for a task -// that outputs the container's IO as the current processes STDIO -func STDIO() (*IO, error) { +// Stdio returns an IO implementation to be used for a task +// that outputs the container's IO as the current processes Stdio +func Stdio() (*IO, error) { paths, err := fifoPaths() if err != nil { return nil, err @@ -57,7 +58,11 @@ func STDIO() (*IO, error) { } func fifoPaths() (*fifoSet, error) { - dir, err := ioutil.TempDir(filepath.Join(os.TempDir(), "containerd"), "") + root := filepath.Join(os.TempDir(), "containerd") + if err := os.MkdirAll(root, 0700); err != nil { + return nil, err + } + dir, err := ioutil.TempDir(root, "") if err != nil { return nil, err } @@ -135,16 +140,21 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error }(f) } return &wgCloser{ - wg: wg, + wg: wg, + dir: fifos.dir, }, nil } type wgCloser struct { - wg *sync.WaitGroup + wg *sync.WaitGroup + dir string } func (g *wgCloser) Close() error { g.wg.Wait() + if g.dir != "" { + return os.RemoveAll(g.dir) + } return nil } @@ -161,8 +171,16 @@ func (t *Task) Pid() uint32 { return t.pid } -func (t *Task) Kill(ctx context.Context, s os.Signal) error { +func (t *Task) Start(ctx context.Context) error { + _, err := t.client.tasks().Start(ctx, &execution.StartRequest{ + ContainerID: t.containerID, + }) + return err +} + +func (t *Task) Kill(ctx context.Context, s syscall.Signal) error { _, err := t.client.tasks().Kill(ctx, &execution.KillRequest{ + Signal: uint32(s), ContainerID: t.containerID, PidOrAll: &execution.KillRequest_All{ All: true, @@ -171,6 +189,26 @@ func (t *Task) Kill(ctx context.Context, s os.Signal) error { return err } +// Wait is a blocking call that will wait for the task to exit and return the exit status +func (t *Task) Wait(ctx context.Context) (uint32, error) { + events, err := t.client.tasks().Events(ctx, &execution.EventsRequest{}) + if err != nil { + return 255, err + } + for { + e, err := events.Recv() + if err != nil { + return 255, err + } + if e.Type != task.Event_EXIT { + continue + } + if e.ID == t.containerID && e.Pid == t.pid { + return e.ExitStatus, nil + } + } +} + // Delete deletes the task and its runtime state // it returns the exit status of the task and any errors that were encountered // during cleanup From 01c4c8641407b6db341f42c6650fd8f15dc51584 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 24 May 2017 15:58:48 -0700 Subject: [PATCH 06/14] Add WithNamespace for client Signed-off-by: Michael Crosby --- client.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/client.go b/client.go index acc37e295..2c569e360 100644 --- a/client.go +++ b/client.go @@ -42,6 +42,13 @@ func init() { type NewClientOpts func(c *Client) error +func WithNamespace(namespace string) NewClientOpts { + return func(c *Client) error { + c.Namespace = namespace + return nil + } +} + // New returns a new containerd client that is connected to the containerd // instance provided by address func New(address string, opts ...NewClientOpts) (*Client, error) { @@ -71,7 +78,8 @@ func New(address string, opts ...NewClientOpts) (*Client, error) { type Client struct { conn *grpc.ClientConn - Runtime string + Runtime string + Namespace string } // Containers returns all containers created in containerd @@ -89,16 +97,16 @@ func (c *Client) Containers(ctx context.Context) ([]*Container, error) { type NewContainerOpts func(ctx context.Context, client *Client, c *containers.Container) error -// NewContainerWithLables adds the provided labels to the container -func NewContainerWithLables(labels map[string]string) NewContainerOpts { +// WithContainerLables adds the provided labels to the container +func WithContainerLables(labels map[string]string) NewContainerOpts { return func(_ context.Context, _ *Client, c *containers.Container) error { c.Labels = labels return nil } } -// NewContainerWithExistingRootFS uses an existing root filesystem for the container -func NewContainerWithExistingRootFS(id string) NewContainerOpts { +// WithExistingRootFS uses an existing root filesystem for the container +func WithExistingRootFS(id string) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { // check that the snapshot exists, if not, fail on creation if _, err := client.snapshotter().Mounts(ctx, id); err != nil { @@ -109,9 +117,9 @@ func NewContainerWithExistingRootFS(id string) NewContainerOpts { } } -// NewContainerWithNewRootFS allocates a new snapshot to be used by the container as the +// WithNewRootFS allocates a new snapshot to be used by the container as the // root filesystem in read-write mode -func NewContainerWithNewRootFS(id string, image *Image) NewContainerOpts { +func WithNewRootFS(id string, image *Image) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { diffIDs, err := image.i.RootFS(ctx, client.content()) if err != nil { @@ -125,9 +133,9 @@ func NewContainerWithNewRootFS(id string, image *Image) NewContainerOpts { } } -// NewContainerWithNewReadonlyRootFS allocates a new snapshot to be used by the container as the +// WithNewReadonlyRootFS allocates a new snapshot to be used by the container as the // root filesystem in read-only mode -func NewContainerWithNewReadonlyRootFS(id string, image *Image) NewContainerOpts { +func WithNewReadonlyRootFS(id string, image *Image) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { diffIDs, err := image.i.RootFS(ctx, client.content()) if err != nil { @@ -141,7 +149,7 @@ func NewContainerWithNewReadonlyRootFS(id string, image *Image) NewContainerOpts } } -func NewContainerWithRuntime(name string) NewContainerOpts { +func WithRuntime(name string) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { c.Runtime = name return nil From 8cd882c570e658fca23f3bbca48c2e6d8bf7a898 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 24 May 2017 16:05:24 -0700 Subject: [PATCH 07/14] Add test -short for non-integration tests Signed-off-by: Michael Crosby --- client_test.go | 41 ++++++----------------------------------- container_test.go | 37 +++++++++++++++++++++++++++++++++++++ spec.go | 4 ++-- spec_unix.go | 3 +-- task.go | 2 +- 5 files changed, 47 insertions(+), 40 deletions(-) diff --git a/client_test.go b/client_test.go index f80ec2b7d..5671fe600 100644 --- a/client_test.go +++ b/client_test.go @@ -8,6 +8,9 @@ import ( const defaultAddress = "/run/containerd/containerd.sock" func TestNewClient(t *testing.T) { + if testing.Short() { + return + } client, err := New(defaultAddress) if err != nil { t.Fatal(err) @@ -20,42 +23,10 @@ func TestNewClient(t *testing.T) { } } -func TestNewContainer(t *testing.T) { - client, err := New(defaultAddress) - if err != nil { - t.Fatal(err) - } - defer client.Close() - - id := "test" - spec, err := GenerateSpec(WithHostname(id)) - if err != nil { - t.Error(err) - return - } - container, err := client.NewContainer(context.Background(), id, spec) - if err != nil { - t.Error(err) - return - } - if container.ID() != id { - t.Errorf("expected container id %q but received %q", id, container.ID()) - } - if spec, err = container.Spec(); err != nil { - t.Error(err) - return - } - if spec.Hostname != id { - t.Errorf("expected spec hostname id %q but received %q", id, container.ID()) - return - } - if err := container.Delete(context.Background()); err != nil { - t.Error(err) - return - } -} - func TestImagePull(t *testing.T) { + if testing.Short() { + return + } client, err := New(defaultAddress) if err != nil { t.Fatal(err) diff --git a/container_test.go b/container_test.go index da1a9818b..bbe67b241 100644 --- a/container_test.go +++ b/container_test.go @@ -6,6 +6,9 @@ import ( ) func TestContainerList(t *testing.T) { + if testing.Short() { + return + } client, err := New(defaultAddress) if err != nil { t.Fatal(err) @@ -21,3 +24,37 @@ func TestContainerList(t *testing.T) { t.Errorf("expected 0 containers but received %d", len(containers)) } } + +func TestNewContainer(t *testing.T) { + if testing.Short() { + return + } + client, err := New(defaultAddress) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + id := "test" + spec, err := GenerateSpec() + if err != nil { + t.Error(err) + return + } + container, err := client.NewContainer(context.Background(), id, spec) + if err != nil { + t.Error(err) + return + } + if container.ID() != id { + t.Errorf("expected container id %q but received %q", id, container.ID()) + } + if spec, err = container.Spec(); err != nil { + t.Error(err) + return + } + if err := container.Delete(context.Background()); err != nil { + t.Error(err) + return + } +} diff --git a/spec.go b/spec.go index 835f0ad72..5fa9425d4 100644 --- a/spec.go +++ b/spec.go @@ -23,8 +23,8 @@ func WithArgs(args ...string) SpecOpts { // GenerateSpec will generate a default spec from the provided image // for use as a containerd container -func GenerateSpec(id string, opts ...SpecOpts) (*specs.Spec, error) { - s, err := createDefaultSpec(id) +func GenerateSpec(opts ...SpecOpts) (*specs.Spec, error) { + s, err := createDefaultSpec() if err != nil { return nil, err } diff --git a/spec_unix.go b/spec_unix.go index 94fdce1ad..673f2d6ad 100644 --- a/spec_unix.go +++ b/spec_unix.go @@ -57,7 +57,7 @@ func defaultNamespaces() []specs.LinuxNamespace { } } -func createDefaultSpec(id string) (*specs.Spec, error) { +func createDefaultSpec() (*specs.Spec, error) { s := &specs.Spec{ Version: specs.Version, Platform: specs.Platform{ @@ -67,7 +67,6 @@ func createDefaultSpec(id string) (*specs.Spec, error) { Root: specs.Root{ Path: defaultRootfsPath, }, - Hostname: id, Process: specs.Process{ Cwd: "/", NoNewPrivileges: true, diff --git a/task.go b/task.go index 36f6440ea..53dd4f90c 100644 --- a/task.go +++ b/task.go @@ -215,7 +215,7 @@ func (t *Task) Wait(ctx context.Context) (uint32, error) { func (t *Task) Delete(ctx context.Context) (uint32, error) { cerr := t.io.Close() r, err := t.client.tasks().Delete(ctx, &execution.DeleteRequest{ - t.containerID, + ContainerID: t.containerID, }) if err != nil { return 255, err From e2db2892ec7071a90fb5782d41a2077425709578 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 24 May 2017 16:26:35 -0700 Subject: [PATCH 08/14] Unexport client fields Signed-off-by: Michael Crosby --- client.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/client.go b/client.go index 2c569e360..998d01c6c 100644 --- a/client.go +++ b/client.go @@ -44,7 +44,7 @@ type NewClientOpts func(c *Client) error func WithNamespace(namespace string) NewClientOpts { return func(c *Client) error { - c.Namespace = namespace + c.namespace = namespace return nil } } @@ -63,7 +63,7 @@ func New(address string, opts ...NewClientOpts) (*Client, error) { } c := &Client{ conn: conn, - Runtime: runtime.GOOS, + runtime: runtime.GOOS, } for _, o := range opts { if err := o(c); err != nil { @@ -78,8 +78,8 @@ func New(address string, opts ...NewClientOpts) (*Client, error) { type Client struct { conn *grpc.ClientConn - Runtime string - Namespace string + runtime string + namespace string } // Containers returns all containers created in containerd @@ -97,8 +97,8 @@ func (c *Client) Containers(ctx context.Context) ([]*Container, error) { type NewContainerOpts func(ctx context.Context, client *Client, c *containers.Container) error -// WithContainerLables adds the provided labels to the container -func WithContainerLables(labels map[string]string) NewContainerOpts { +// WithContainerLabels adds the provided labels to the container +func WithContainerLabels(labels map[string]string) NewContainerOpts { return func(_ context.Context, _ *Client, c *containers.Container) error { c.Labels = labels return nil @@ -165,7 +165,7 @@ func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, } container := containers.Container{ ID: id, - Runtime: c.Runtime, + Runtime: c.runtime, Spec: &protobuf.Any{ TypeUrl: specs.Version, Value: data, From 1de25c09e3c8c19b165434fe13766a8e9c266f51 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 25 May 2017 10:15:35 -0700 Subject: [PATCH 09/14] Add pause resume to task Signed-off-by: Michael Crosby --- container.go | 6 ++++-- task.go | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/container.go b/container.go index 34c64c9c2..22ba64858 100644 --- a/container.go +++ b/container.go @@ -85,10 +85,12 @@ func (c *Container) NewTask(ctx context.Context, ioCreate IOCreation) (*Task, er if err != nil { return nil, err } - return &Task{ + t := &Task{ client: c.client, io: i, containerID: response.ContainerID, pid: response.Pid, - }, nil + } + c.task = t + return t, nil } diff --git a/task.go b/task.go index 53dd4f90c..132a58b46 100644 --- a/task.go +++ b/task.go @@ -189,6 +189,30 @@ func (t *Task) Kill(ctx context.Context, s syscall.Signal) error { return err } +func (t *Task) Pause(ctx context.Context) error { + _, err := t.client.tasks().Pause(ctx, &execution.PauseRequest{ + ContainerID: t.containerID, + }) + return err +} + +func (t *Task) Resume(ctx context.Context) error { + _, err := t.client.tasks().Resume(ctx, &execution.ResumeRequest{ + ContainerID: t.containerID, + }) + return err +} + +func (t *Task) Status(ctx context.Context) (string, error) { + r, err := t.client.tasks().Info(ctx, &execution.InfoRequest{ + ContainerID: t.containerID, + }) + if err != nil { + return "", err + } + return r.Task.Status.String(), nil +} + // Wait is a blocking call that will wait for the task to exit and return the exit status func (t *Task) Wait(ctx context.Context) (uint32, error) { events, err := t.client.tasks().Events(ctx, &execution.EventsRequest{}) From 608e6daaa411b17ce3767f49c5d5a9969f07daf7 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 25 May 2017 11:05:58 -0700 Subject: [PATCH 10/14] Make Task, Container, Image interface types Signed-off-by: Michael Crosby --- client.go | 18 +++++++++--------- container.go | 30 ++++++++++++++++++++---------- image.go | 7 ++++++- spec_unix.go | 3 ++- task.go | 35 ++++++++++++++++++++++++----------- 5 files changed, 61 insertions(+), 32 deletions(-) diff --git a/client.go b/client.go index 998d01c6c..e8fbd37f3 100644 --- a/client.go +++ b/client.go @@ -83,12 +83,12 @@ type Client struct { } // Containers returns all containers created in containerd -func (c *Client) Containers(ctx context.Context) ([]*Container, error) { +func (c *Client) Containers(ctx context.Context) ([]Container, error) { r, err := c.containers().List(ctx, &containers.ListContainersRequest{}) if err != nil { return nil, err } - var out []*Container + var out []Container for _, container := range r.Containers { out = append(out, containerFromProto(c, container)) } @@ -119,9 +119,9 @@ func WithExistingRootFS(id string) NewContainerOpts { // WithNewRootFS allocates a new snapshot to be used by the container as the // root filesystem in read-write mode -func WithNewRootFS(id string, image *Image) NewContainerOpts { +func WithNewRootFS(id string, i Image) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { - diffIDs, err := image.i.RootFS(ctx, client.content()) + diffIDs, err := i.(*image).i.RootFS(ctx, client.content()) if err != nil { return err } @@ -135,9 +135,9 @@ func WithNewRootFS(id string, image *Image) NewContainerOpts { // WithNewReadonlyRootFS allocates a new snapshot to be used by the container as the // root filesystem in read-only mode -func WithNewReadonlyRootFS(id string, image *Image) NewContainerOpts { +func WithNewReadonlyRootFS(id string, i Image) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { - diffIDs, err := image.i.RootFS(ctx, client.content()) + diffIDs, err := i.(*image).i.RootFS(ctx, client.content()) if err != nil { return err } @@ -158,7 +158,7 @@ func WithRuntime(name string) NewContainerOpts { // NewContainer will create a new container in container with the provided id // the id must be unique within the namespace -func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, opts ...NewContainerOpts) (*Container, error) { +func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, opts ...NewContainerOpts) (Container, error) { data, err := json.Marshal(spec) if err != nil { return nil, err @@ -258,7 +258,7 @@ func (s *snapshotUnpacker) getLayers(ctx context.Context, image images.Image) ([ return layers, nil } -func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (*Image, error) { +func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (Image, error) { pullCtx := defaultPullContext() for _, o := range opts { if err := o(c, pullCtx); err != nil { @@ -296,7 +296,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (*Image return nil, err } } - return &Image{ + return &image{ client: c, i: i, }, nil diff --git a/container.go b/container.go index 22ba64858..328f74489 100644 --- a/container.go +++ b/container.go @@ -10,27 +10,37 @@ import ( specs "github.com/opencontainers/runtime-spec/specs-go" ) -func containerFromProto(client *Client, c containers.Container) *Container { - return &Container{ +type Container interface { + ID() string + Delete(context.Context) error + NewTask(context.Context, IOCreation) (Task, error) + Spec() (*specs.Spec, error) + Task() Task +} + +func containerFromProto(client *Client, c containers.Container) *container { + return &container{ client: client, c: c, } } -type Container struct { +var _ = (Container)(&container{}) + +type container struct { client *Client c containers.Container - task *Task + task *task } // ID returns the container's unique id -func (c *Container) ID() string { +func (c *container) ID() string { return c.c.ID } // Spec returns the current OCI specification for the container -func (c *Container) Spec() (*specs.Spec, error) { +func (c *container) Spec() (*specs.Spec, error) { var s specs.Spec if err := json.Unmarshal(c.c.Spec.Value, &s); err != nil { return nil, err @@ -40,7 +50,7 @@ func (c *Container) Spec() (*specs.Spec, error) { // Delete deletes an existing container // an error is returned if the container has running tasks -func (c *Container) Delete(ctx context.Context) error { +func (c *container) Delete(ctx context.Context) error { // TODO: should the client be the one removing resources attached // to the container at the moment before we have GC? err := c.client.snapshotter().Remove(ctx, c.c.RootFS) @@ -53,11 +63,11 @@ func (c *Container) Delete(ctx context.Context) error { return err } -func (c *Container) Task() *Task { +func (c *container) Task() Task { return c.task } -func (c *Container) NewTask(ctx context.Context, ioCreate IOCreation) (*Task, error) { +func (c *container) NewTask(ctx context.Context, ioCreate IOCreation) (Task, error) { i, err := ioCreate() if err != nil { return nil, err @@ -85,7 +95,7 @@ func (c *Container) NewTask(ctx context.Context, ioCreate IOCreation) (*Task, er if err != nil { return nil, err } - t := &Task{ + t := &task{ client: c.client, io: i, containerID: response.ContainerID, diff --git a/image.go b/image.go index 83e042ded..6e48a52f3 100644 --- a/image.go +++ b/image.go @@ -2,7 +2,12 @@ package containerd import "github.com/containerd/containerd/images" -type Image struct { +type Image interface { +} + +var _ = (Image)(&image{}) + +type image struct { client *Client i images.Image diff --git a/spec_unix.go b/spec_unix.go index 673f2d6ad..81404c99a 100644 --- a/spec_unix.go +++ b/spec_unix.go @@ -183,8 +183,9 @@ func WithHostNamespace(ns specs.LinuxNamespaceType) SpecOpts { } } -func WithImage(ctx context.Context, image *Image) SpecOpts { +func WithImage(ctx context.Context, i Image) SpecOpts { return func(s *specs.Spec) error { + image := i.(*image) store := image.client.content() ic, err := image.i.Config(ctx, store) if err != nil { diff --git a/task.go b/task.go index 132a58b46..62e283a91 100644 --- a/task.go +++ b/task.go @@ -10,7 +10,7 @@ import ( "syscall" "github.com/containerd/containerd/api/services/execution" - "github.com/containerd/containerd/api/types/task" + taskapi "github.com/containerd/containerd/api/types/task" "github.com/containerd/fifo" ) @@ -158,7 +158,20 @@ func (g *wgCloser) Close() error { return nil } -type Task struct { +type Task interface { + Delete(context.Context) (uint32, error) + Kill(context.Context, syscall.Signal) error + Pause(context.Context) error + Resume(context.Context) error + Pid() uint32 + Start(context.Context) error + Status(context.Context) (string, error) + Wait(context.Context) (uint32, error) +} + +var _ = (Task)(&task{}) + +type task struct { client *Client io *IO @@ -167,18 +180,18 @@ type Task struct { } // Pid returns the pid or process id for the task -func (t *Task) Pid() uint32 { +func (t *task) Pid() uint32 { return t.pid } -func (t *Task) Start(ctx context.Context) error { +func (t *task) Start(ctx context.Context) error { _, err := t.client.tasks().Start(ctx, &execution.StartRequest{ ContainerID: t.containerID, }) return err } -func (t *Task) Kill(ctx context.Context, s syscall.Signal) error { +func (t *task) Kill(ctx context.Context, s syscall.Signal) error { _, err := t.client.tasks().Kill(ctx, &execution.KillRequest{ Signal: uint32(s), ContainerID: t.containerID, @@ -189,21 +202,21 @@ func (t *Task) Kill(ctx context.Context, s syscall.Signal) error { return err } -func (t *Task) Pause(ctx context.Context) error { +func (t *task) Pause(ctx context.Context) error { _, err := t.client.tasks().Pause(ctx, &execution.PauseRequest{ ContainerID: t.containerID, }) return err } -func (t *Task) Resume(ctx context.Context) error { +func (t *task) Resume(ctx context.Context) error { _, err := t.client.tasks().Resume(ctx, &execution.ResumeRequest{ ContainerID: t.containerID, }) return err } -func (t *Task) Status(ctx context.Context) (string, error) { +func (t *task) Status(ctx context.Context) (string, error) { r, err := t.client.tasks().Info(ctx, &execution.InfoRequest{ ContainerID: t.containerID, }) @@ -214,7 +227,7 @@ func (t *Task) Status(ctx context.Context) (string, error) { } // Wait is a blocking call that will wait for the task to exit and return the exit status -func (t *Task) Wait(ctx context.Context) (uint32, error) { +func (t *task) Wait(ctx context.Context) (uint32, error) { events, err := t.client.tasks().Events(ctx, &execution.EventsRequest{}) if err != nil { return 255, err @@ -224,7 +237,7 @@ func (t *Task) Wait(ctx context.Context) (uint32, error) { if err != nil { return 255, err } - if e.Type != task.Event_EXIT { + if e.Type != taskapi.Event_EXIT { continue } if e.ID == t.containerID && e.Pid == t.pid { @@ -236,7 +249,7 @@ func (t *Task) Wait(ctx context.Context) (uint32, error) { // Delete deletes the task and its runtime state // it returns the exit status of the task and any errors that were encountered // during cleanup -func (t *Task) Delete(ctx context.Context) (uint32, error) { +func (t *task) Delete(ctx context.Context) (uint32, error) { cerr := t.io.Close() r, err := t.client.tasks().Delete(ctx, &execution.DeleteRequest{ ContainerID: t.containerID, From a2b08247207f4506e775f05ae3530e812e55fa5a Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 25 May 2017 11:10:34 -0700 Subject: [PATCH 11/14] Export GRPC services from client Signed-off-by: Michael Crosby --- client.go | 36 ++++++++++++++++++------------------ container.go | 8 ++++---- spec_unix.go | 2 +- task.go | 14 +++++++------- 4 files changed, 30 insertions(+), 30 deletions(-) diff --git a/client.go b/client.go index e8fbd37f3..9db4a75f7 100644 --- a/client.go +++ b/client.go @@ -84,7 +84,7 @@ type Client struct { // Containers returns all containers created in containerd func (c *Client) Containers(ctx context.Context) ([]Container, error) { - r, err := c.containers().List(ctx, &containers.ListContainersRequest{}) + r, err := c.ContainerService().List(ctx, &containers.ListContainersRequest{}) if err != nil { return nil, err } @@ -109,7 +109,7 @@ func WithContainerLabels(labels map[string]string) NewContainerOpts { func WithExistingRootFS(id string) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { // check that the snapshot exists, if not, fail on creation - if _, err := client.snapshotter().Mounts(ctx, id); err != nil { + if _, err := client.SnapshotService().Mounts(ctx, id); err != nil { return err } c.RootFS = id @@ -121,11 +121,11 @@ func WithExistingRootFS(id string) NewContainerOpts { // root filesystem in read-write mode func WithNewRootFS(id string, i Image) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { - diffIDs, err := i.(*image).i.RootFS(ctx, client.content()) + diffIDs, err := i.(*image).i.RootFS(ctx, client.ContentStore()) if err != nil { return err } - if _, err := client.snapshotter().Prepare(ctx, id, identity.ChainID(diffIDs).String()); err != nil { + if _, err := client.SnapshotService().Prepare(ctx, id, identity.ChainID(diffIDs).String()); err != nil { return err } c.RootFS = id @@ -137,11 +137,11 @@ func WithNewRootFS(id string, i Image) NewContainerOpts { // root filesystem in read-only mode func WithNewReadonlyRootFS(id string, i Image) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { - diffIDs, err := i.(*image).i.RootFS(ctx, client.content()) + diffIDs, err := i.(*image).i.RootFS(ctx, client.ContentStore()) if err != nil { return err } - if _, err := client.snapshotter().View(ctx, id, identity.ChainID(diffIDs).String()); err != nil { + if _, err := client.SnapshotService().View(ctx, id, identity.ChainID(diffIDs).String()); err != nil { return err } c.RootFS = id @@ -176,7 +176,7 @@ func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, return nil, err } } - r, err := c.containers().Create(ctx, &containers.CreateContainerRequest{ + r, err := c.ContainerService().Create(ctx, &containers.CreateContainerRequest{ Container: container, }) if err != nil { @@ -202,9 +202,9 @@ func defaultPullContext() *PullContext { func WithPullUnpack(client *Client, c *PullContext) error { c.Unpacker = &snapshotUnpacker{ - store: client.content(), - diff: client.diff(), - snapshotter: client.snapshotter(), + store: client.ContentStore(), + diff: client.DiffService(), + snapshotter: client.SnapshotService(), } return nil } @@ -265,7 +265,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (Image, return nil, err } } - store := c.content() + store := c.ContentStore() name, desc, err := pullCtx.Resolver.Resolve(ctx, ref) if err != nil { @@ -283,7 +283,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (Image, if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil { return nil, err } - is := c.images() + is := c.ImageService() if err := is.Put(ctx, name, desc); err != nil { return nil, err } @@ -307,26 +307,26 @@ func (c *Client) Close() error { return c.conn.Close() } -func (c *Client) containers() containers.ContainersClient { +func (c *Client) ContainerService() containers.ContainersClient { return containers.NewContainersClient(c.conn) } -func (c *Client) content() content.Store { +func (c *Client) ContentStore() content.Store { return contentservice.NewStoreFromClient(contentapi.NewContentClient(c.conn)) } -func (c *Client) snapshotter() snapshot.Snapshotter { +func (c *Client) SnapshotService() snapshot.Snapshotter { return snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotClient(c.conn)) } -func (c *Client) tasks() execution.TasksClient { +func (c *Client) TaskService() execution.TasksClient { return execution.NewTasksClient(c.conn) } -func (c *Client) images() images.Store { +func (c *Client) ImageService() images.Store { return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(c.conn)) } -func (c *Client) diff() diff.DiffService { +func (c *Client) DiffService() diff.DiffService { return diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn)) } diff --git a/container.go b/container.go index 328f74489..1886f227a 100644 --- a/container.go +++ b/container.go @@ -53,9 +53,9 @@ func (c *container) Spec() (*specs.Spec, error) { func (c *container) Delete(ctx context.Context) error { // TODO: should the client be the one removing resources attached // to the container at the moment before we have GC? - err := c.client.snapshotter().Remove(ctx, c.c.RootFS) + err := c.client.SnapshotService().Remove(ctx, c.c.RootFS) - if _, cerr := c.client.containers().Delete(ctx, &containers.DeleteContainerRequest{ + if _, cerr := c.client.ContainerService().Delete(ctx, &containers.DeleteContainerRequest{ ID: c.c.ID, }); err == nil { err = cerr @@ -80,7 +80,7 @@ func (c *container) NewTask(ctx context.Context, ioCreate IOCreation) (Task, err Stderr: i.Stderr, } // get the rootfs from the snapshotter and add it to the request - mounts, err := c.client.snapshotter().Mounts(ctx, c.c.RootFS) + mounts, err := c.client.SnapshotService().Mounts(ctx, c.c.RootFS) if err != nil { return nil, err } @@ -91,7 +91,7 @@ func (c *container) NewTask(ctx context.Context, ioCreate IOCreation) (Task, err Options: m.Options, }) } - response, err := c.client.tasks().Create(ctx, request) + response, err := c.client.TaskService().Create(ctx, request) if err != nil { return nil, err } diff --git a/spec_unix.go b/spec_unix.go index 81404c99a..f3ed8438c 100644 --- a/spec_unix.go +++ b/spec_unix.go @@ -186,7 +186,7 @@ func WithHostNamespace(ns specs.LinuxNamespaceType) SpecOpts { func WithImage(ctx context.Context, i Image) SpecOpts { return func(s *specs.Spec) error { image := i.(*image) - store := image.client.content() + store := image.client.ContentStore() ic, err := image.i.Config(ctx, store) if err != nil { return err diff --git a/task.go b/task.go index 62e283a91..8acf1d300 100644 --- a/task.go +++ b/task.go @@ -185,14 +185,14 @@ func (t *task) Pid() uint32 { } func (t *task) Start(ctx context.Context) error { - _, err := t.client.tasks().Start(ctx, &execution.StartRequest{ + _, err := t.client.TaskService().Start(ctx, &execution.StartRequest{ ContainerID: t.containerID, }) return err } func (t *task) Kill(ctx context.Context, s syscall.Signal) error { - _, err := t.client.tasks().Kill(ctx, &execution.KillRequest{ + _, err := t.client.TaskService().Kill(ctx, &execution.KillRequest{ Signal: uint32(s), ContainerID: t.containerID, PidOrAll: &execution.KillRequest_All{ @@ -203,21 +203,21 @@ func (t *task) Kill(ctx context.Context, s syscall.Signal) error { } func (t *task) Pause(ctx context.Context) error { - _, err := t.client.tasks().Pause(ctx, &execution.PauseRequest{ + _, err := t.client.TaskService().Pause(ctx, &execution.PauseRequest{ ContainerID: t.containerID, }) return err } func (t *task) Resume(ctx context.Context) error { - _, err := t.client.tasks().Resume(ctx, &execution.ResumeRequest{ + _, err := t.client.TaskService().Resume(ctx, &execution.ResumeRequest{ ContainerID: t.containerID, }) return err } func (t *task) Status(ctx context.Context) (string, error) { - r, err := t.client.tasks().Info(ctx, &execution.InfoRequest{ + r, err := t.client.TaskService().Info(ctx, &execution.InfoRequest{ ContainerID: t.containerID, }) if err != nil { @@ -228,7 +228,7 @@ func (t *task) Status(ctx context.Context) (string, error) { // Wait is a blocking call that will wait for the task to exit and return the exit status func (t *task) Wait(ctx context.Context) (uint32, error) { - events, err := t.client.tasks().Events(ctx, &execution.EventsRequest{}) + events, err := t.client.TaskService().Events(ctx, &execution.EventsRequest{}) if err != nil { return 255, err } @@ -251,7 +251,7 @@ func (t *task) Wait(ctx context.Context) (uint32, error) { // during cleanup func (t *task) Delete(ctx context.Context) (uint32, error) { cerr := t.io.Close() - r, err := t.client.tasks().Delete(ctx, &execution.DeleteRequest{ + r, err := t.client.TaskService().Delete(ctx, &execution.DeleteRequest{ ContainerID: t.containerID, }) if err != nil { From d0b22290ecc45d17cd040c3bba1ce6535f3d31e2 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 25 May 2017 11:17:36 -0700 Subject: [PATCH 12/14] Don't require rootfs if not set on container Signed-off-by: Michael Crosby --- client.go | 3 ++- client_test.go | 2 ++ client_windows.go | 2 +- container.go | 30 +++++++++++++++++------------- container_test.go | 2 ++ image.go | 5 +++++ spec.go | 12 +----------- task.go | 8 +++++--- 8 files changed, 35 insertions(+), 29 deletions(-) diff --git a/client.go b/client.go index 9db4a75f7..00cb3d68c 100644 --- a/client.go +++ b/client.go @@ -158,7 +158,7 @@ func WithRuntime(name string) NewContainerOpts { // NewContainer will create a new container in container with the provided id // the id must be unique within the namespace -func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, opts ...NewContainerOpts) (Container, error) { +func (c *Client) NewContainer(ctx context.Context, id string, image Image, spec *specs.Spec, opts ...NewContainerOpts) (Container, error) { data, err := json.Marshal(spec) if err != nil { return nil, err @@ -170,6 +170,7 @@ func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, TypeUrl: specs.Version, Value: data, }, + Image: image.Name(), } for _, o := range opts { if err := o(ctx, c, &container); err != nil { diff --git a/client_test.go b/client_test.go index 5671fe600..2cdcebdff 100644 --- a/client_test.go +++ b/client_test.go @@ -9,6 +9,7 @@ const defaultAddress = "/run/containerd/containerd.sock" func TestNewClient(t *testing.T) { if testing.Short() { + t.Skip() return } client, err := New(defaultAddress) @@ -25,6 +26,7 @@ func TestNewClient(t *testing.T) { func TestImagePull(t *testing.T) { if testing.Short() { + t.Skip() return } client, err := New(defaultAddress) diff --git a/client_windows.go b/client_windows.go index 7de5677a0..548024e5b 100644 --- a/client_windows.go +++ b/client_windows.go @@ -8,7 +8,7 @@ import ( ) func dialer(address string, timeout time.Duration) (net.Conn, error) { - return winio.DialPipe(bindAddress, &timeout) + return winio.DialPipe(address, &timeout) } func dialAddress(address string) string { diff --git a/container.go b/container.go index 1886f227a..7b9bffd8d 100644 --- a/container.go +++ b/container.go @@ -50,10 +50,12 @@ func (c *container) Spec() (*specs.Spec, error) { // Delete deletes an existing container // an error is returned if the container has running tasks -func (c *container) Delete(ctx context.Context) error { +func (c *container) Delete(ctx context.Context) (err error) { // TODO: should the client be the one removing resources attached // to the container at the moment before we have GC? - err := c.client.SnapshotService().Remove(ctx, c.c.RootFS) + if c.c.RootFS != "" { + err = c.client.SnapshotService().Remove(ctx, c.c.RootFS) + } if _, cerr := c.client.ContainerService().Delete(ctx, &containers.DeleteContainerRequest{ ID: c.c.ID, @@ -79,17 +81,19 @@ func (c *container) NewTask(ctx context.Context, ioCreate IOCreation) (Task, err Stdout: i.Stdout, Stderr: i.Stderr, } - // get the rootfs from the snapshotter and add it to the request - mounts, err := c.client.SnapshotService().Mounts(ctx, c.c.RootFS) - if err != nil { - return nil, err - } - for _, m := range mounts { - request.Rootfs = append(request.Rootfs, &mount.Mount{ - Type: m.Type, - Source: m.Source, - Options: m.Options, - }) + if c.c.RootFS != "" { + // get the rootfs from the snapshotter and add it to the request + mounts, err := c.client.SnapshotService().Mounts(ctx, c.c.RootFS) + if err != nil { + return nil, err + } + for _, m := range mounts { + request.Rootfs = append(request.Rootfs, &mount.Mount{ + Type: m.Type, + Source: m.Source, + Options: m.Options, + }) + } } response, err := c.client.TaskService().Create(ctx, request) if err != nil { diff --git a/container_test.go b/container_test.go index bbe67b241..309e8f4fa 100644 --- a/container_test.go +++ b/container_test.go @@ -7,6 +7,7 @@ import ( func TestContainerList(t *testing.T) { if testing.Short() { + t.Skip() return } client, err := New(defaultAddress) @@ -27,6 +28,7 @@ func TestContainerList(t *testing.T) { func TestNewContainer(t *testing.T) { if testing.Short() { + t.Skip() return } client, err := New(defaultAddress) diff --git a/image.go b/image.go index 6e48a52f3..972778daf 100644 --- a/image.go +++ b/image.go @@ -3,6 +3,7 @@ package containerd import "github.com/containerd/containerd/images" type Image interface { + Name() string } var _ = (Image)(&image{}) @@ -12,3 +13,7 @@ type image struct { i images.Image } + +func (i *image) Name() string { + return i.i.Name +} diff --git a/spec.go b/spec.go index 5fa9425d4..53e1fd5f5 100644 --- a/spec.go +++ b/spec.go @@ -4,17 +4,7 @@ import specs "github.com/opencontainers/runtime-spec/specs-go" type SpecOpts func(s *specs.Spec) error -func WithImageRef(ref string) SpecOpts { - return func(s *specs.Spec) error { - if s.Annotations == nil { - s.Annotations = make(map[string]string) - } - s.Annotations["image"] = ref - return nil - } -} - -func WithArgs(args ...string) SpecOpts { +func WithProcessArgs(args ...string) SpecOpts { return func(s *specs.Spec) error { s.Process.Args = args return nil diff --git a/task.go b/task.go index 8acf1d300..9b185ab7e 100644 --- a/task.go +++ b/task.go @@ -14,6 +14,8 @@ import ( "github.com/containerd/fifo" ) +const UnknownExitStatus = 255 + type IO struct { Terminal bool Stdin string @@ -230,12 +232,12 @@ func (t *task) Status(ctx context.Context) (string, error) { func (t *task) Wait(ctx context.Context) (uint32, error) { events, err := t.client.TaskService().Events(ctx, &execution.EventsRequest{}) if err != nil { - return 255, err + return UnknownExitStatus, err } for { e, err := events.Recv() if err != nil { - return 255, err + return UnknownExitStatus, err } if e.Type != taskapi.Event_EXIT { continue @@ -255,7 +257,7 @@ func (t *task) Delete(ctx context.Context) (uint32, error) { ContainerID: t.containerID, }) if err != nil { - return 255, err + return UnknownExitStatus, err } return r.ExitStatus, cerr } From 89037568d3c23f78fd2fc8283d8ac8bf27adeb82 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 25 May 2017 11:30:16 -0700 Subject: [PATCH 13/14] Add windows spec generation Signed-off-by: Michael Crosby --- spec_unix.go | 6 ++-- spec_windows.go | 79 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 spec_windows.go diff --git a/spec_unix.go b/spec_unix.go index f3ed8438c..e3a24eb00 100644 --- a/spec_unix.go +++ b/spec_unix.go @@ -185,8 +185,10 @@ func WithHostNamespace(ns specs.LinuxNamespaceType) SpecOpts { func WithImage(ctx context.Context, i Image) SpecOpts { return func(s *specs.Spec) error { - image := i.(*image) - store := image.client.ContentStore() + var ( + image = i.(*image) + store = image.client.ContentStore() + ) ic, err := image.i.Config(ctx, store) if err != nil { return err diff --git a/spec_windows.go b/spec_windows.go new file mode 100644 index 000000000..806d2f547 --- /dev/null +++ b/spec_windows.go @@ -0,0 +1,79 @@ +package containerd + +import ( + "context" + "encoding/json" + "fmt" + "runtime" + + "github.com/containerd/containerd/images" + "github.com/opencontainers/image-spec/specs-go/v1" + specs "github.com/opencontainers/runtime-spec/specs-go" +) + +const pipeRoot = `\\.\pipe` + +func createDefaultSpec() (*specs.Spec, error) { + return &specs.Spec{ + Version: specs.Version, + Platform: specs.Platform{ + OS: runtime.GOOS, + Arch: runtime.GOARCH, + }, + Root: specs.Root{}, + Process: specs.Process{ + Env: config.Env, + ConsoleSize: specs.Box{ + Width: 80, + Height: 20, + }, + }, + }, nil +} + +func WithImage(ctx context.Context, i Image) SpecOpts { + return func(s *specs.Spec) error { + var ( + image = i.(*image) + store = image.client.ContentStore() + ) + ic, err := image.i.Config(ctx, store) + if err != nil { + return err + } + var ( + ociimage v1.Image + config v1.ImageConfig + ) + switch ic.MediaType { + case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: + r, err := store.Reader(ctx, ic.Digest) + if err != nil { + return err + } + if err := json.NewDecoder(r).Decode(&ociimage); err != nil { + r.Close() + return err + } + r.Close() + config = ociimage.Config + default: + return fmt.Errorf("unknown image config media type %s", ic.MediaType) + } + s.Process.Env = config.Env + s.Process.Args = append(config.Entrypoint, config.Cmd...) + s.Process.User = specs.User{ + Username: config.User, + } + return nil + } +} + +func WithTTY(width, height int) SpecOpts { + func(s *specs.Spec) error { + s.Process.Terminal = true + s.Process.ConsoleSize.Width = width + s.Process.ConsoleSize.Height = height + return nil + } +} From cebe09935893fba7e0ce1f87b8e140a7a0eec249 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 25 May 2017 11:40:35 -0700 Subject: [PATCH 14/14] Add test flag for setting containerd address Signed-off-by: Michael Crosby --- client.go | 10 ++++++++-- client_test.go | 14 +++++++++----- container_test.go | 6 ++---- spec_unix.go | 2 +- spec_windows.go | 2 +- task.go | 16 +++++++++++++--- 6 files changed, 34 insertions(+), 16 deletions(-) diff --git a/client.go b/client.go index 00cb3d68c..f9160ab82 100644 --- a/client.go +++ b/client.go @@ -156,9 +156,16 @@ func WithRuntime(name string) NewContainerOpts { } } +func WithImage(i Image) NewContainerOpts { + return func(ctx context.Context, client *Client, c *containers.Container) error { + c.Image = i.Name() + return nil + } +} + // NewContainer will create a new container in container with the provided id // the id must be unique within the namespace -func (c *Client) NewContainer(ctx context.Context, id string, image Image, spec *specs.Spec, opts ...NewContainerOpts) (Container, error) { +func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, opts ...NewContainerOpts) (Container, error) { data, err := json.Marshal(spec) if err != nil { return nil, err @@ -170,7 +177,6 @@ func (c *Client) NewContainer(ctx context.Context, id string, image Image, spec TypeUrl: specs.Version, Value: data, }, - Image: image.Name(), } for _, o := range opts { if err := o(ctx, c, &container); err != nil { diff --git a/client_test.go b/client_test.go index 2cdcebdff..871b3167f 100644 --- a/client_test.go +++ b/client_test.go @@ -2,17 +2,22 @@ package containerd import ( "context" + "flag" "testing" ) -const defaultAddress = "/run/containerd/containerd.sock" +func init() { + flag.StringVar(&address, "address", "/run/containerd/containerd.sock", "The address to the containerd socket for use in the tests") + flag.Parse() +} + +var address string func TestNewClient(t *testing.T) { if testing.Short() { t.Skip() - return } - client, err := New(defaultAddress) + client, err := New(address) if err != nil { t.Fatal(err) } @@ -27,9 +32,8 @@ func TestNewClient(t *testing.T) { func TestImagePull(t *testing.T) { if testing.Short() { t.Skip() - return } - client, err := New(defaultAddress) + client, err := New(address) if err != nil { t.Fatal(err) } diff --git a/container_test.go b/container_test.go index 309e8f4fa..7f55a94c1 100644 --- a/container_test.go +++ b/container_test.go @@ -8,9 +8,8 @@ import ( func TestContainerList(t *testing.T) { if testing.Short() { t.Skip() - return } - client, err := New(defaultAddress) + client, err := New(address) if err != nil { t.Fatal(err) } @@ -29,9 +28,8 @@ func TestContainerList(t *testing.T) { func TestNewContainer(t *testing.T) { if testing.Short() { t.Skip() - return } - client, err := New(defaultAddress) + client, err := New(address) if err != nil { t.Fatal(err) } diff --git a/spec_unix.go b/spec_unix.go index e3a24eb00..ee9c26b1c 100644 --- a/spec_unix.go +++ b/spec_unix.go @@ -183,7 +183,7 @@ func WithHostNamespace(ns specs.LinuxNamespaceType) SpecOpts { } } -func WithImage(ctx context.Context, i Image) SpecOpts { +func WithImageConfig(ctx context.Context, i Image) SpecOpts { return func(s *specs.Spec) error { var ( image = i.(*image) diff --git a/spec_windows.go b/spec_windows.go index 806d2f547..a5110cf8e 100644 --- a/spec_windows.go +++ b/spec_windows.go @@ -31,7 +31,7 @@ func createDefaultSpec() (*specs.Spec, error) { }, nil } -func WithImage(ctx context.Context, i Image) SpecOpts { +func WithImageConfig(ctx context.Context, i Image) SpecOpts { return func(s *specs.Spec) error { var ( image = i.(*image) diff --git a/task.go b/task.go index 9b185ab7e..9b28281bd 100644 --- a/task.go +++ b/task.go @@ -160,6 +160,16 @@ func (g *wgCloser) Close() error { return nil } +type TaskStatus string + +const ( + Running TaskStatus = "running" + Created TaskStatus = "created" + Stopped TaskStatus = "stopped" + Paused TaskStatus = "paused" + Pausing TaskStatus = "pausing" +) + type Task interface { Delete(context.Context) (uint32, error) Kill(context.Context, syscall.Signal) error @@ -167,7 +177,7 @@ type Task interface { Resume(context.Context) error Pid() uint32 Start(context.Context) error - Status(context.Context) (string, error) + Status(context.Context) (TaskStatus, error) Wait(context.Context) (uint32, error) } @@ -218,14 +228,14 @@ func (t *task) Resume(ctx context.Context) error { return err } -func (t *task) Status(ctx context.Context) (string, error) { +func (t *task) Status(ctx context.Context) (TaskStatus, error) { r, err := t.client.TaskService().Info(ctx, &execution.InfoRequest{ ContainerID: t.containerID, }) if err != nil { return "", err } - return r.Task.Status.String(), nil + return TaskStatus(r.Task.Status.String()), nil } // Wait is a blocking call that will wait for the task to exit and return the exit status